In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Dynamic routing:
Option 1: customize a special KafkaDynamicSink with multiple native FlinkKafkaProducer embedded, each corresponding to a downstream KAFKA queue
Read all KAFKA channel configurations in the OPEN method and build FlinkKafkaProducer and build a Map: kafka channelId-> FlinkKafkaProducer
Overloaded INVOKE method
Find all the ChannelId corresponding to the current stream data according to the routing rules (multiple are allowed), then get the FlinkKafkaProducer from MAP and call its INVOKE method
Core code:
Public class DynamicKafkaSink extends RichSinkFunction {
@ Override
Public void open (Configuration parameters) throws Exception {
List allChannels = channelRepository.getAll ()
For (ChannelModel nextChannel: allChannels) {
FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010) channelFactory.createChannelProducer (nextChannel
FlinkKafkaProducer010.class, Collections.emptyMap ()
NextProducer.setRuntimeContext (this.getRuntimeContext ())
NextProducer.open (parameters)
Producers.put (nextChannel.getChannelId (), nextProducer)
}
}
@ Override
Public void invoke (IN value) throws Exception {
List channelIds = channelRouteStrategy.route (value)
For (String nextChannelId: channelIds) {
FlinkKafkaProducer010 nextProducer = producers.get (nextChannelId)
NextProducer.invoke (converted)
}
}
}
Note:
Map cannot be initialized in the constructor, but in the OPEN method. The distributed nature of FLINK determines that the constructor and OPEN are not executed in the same JVM.
Class-level variables need to be serializable, otherwise they need to be declared as TRANSIENT
Each newly built FlinkKafkaProducer needs to be called first
SetRuntimeContext (this.getRuntimeContext ())
Then call the open method before it can be used
Advantages:
Can be routed to TOPIC on different BROKER for better isolation on different BROKER
Defect:
All FlinkKafkaProducer are created only once during OPEN. Later, if new KAFKA queues are added, they cannot be dynamically aware and routed.
Changed the process of FlinkKafkaProducer creation and initialization, transferred from the MAIN function to the OPEN method of KafkaDynamicSink, and has not been fully tested. There may be problems.
Scheme 2: the upgraded version of scheme 1, using the characteristics of FLINK SPLIT STREAM, divides the native data flow into several according to the routing rules, and each sub-data stream corresponds to a downstream KAFKA queue.
Read all KAFKA channel configurations in the FLINK Main function and build FlinkKafkaProducer and build a Map: kafka channelId-> FlinkKafkaProducer
Build a SplitStream on the input stream, and OutputSelector returns a set of ChannelId according to the routing logic
Traversing Map, for each Key (ChannelID) in Map, call the select method of SplitStream to get the corresponding branch stream data, and then route to the corresponding FlinkKafkaProducer
Core code:
Public static void main (String [] args) {
List allChannels = channelRepository.getAll ()
For (ChannelModel nextChannel: allChannels) {
FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010) channelFactory.createChannelProducer (nextChannel
FlinkKafkaProducer010.class, Collections.emptyMap ()
NextProducer.setRuntimeContext (this.getRuntimeContext ())
NextProducer.open (parameters)
Producers.put (nextChannel.getChannelId (), nextProducer)
}
DataStreamSource source =....
SplitStream splitStream = source.split (new OutputSelector () {
@ Override
Public Iterable select (String value) {
List channelIds = channelRouteStrategy.route (value)
Return channeIds
}
});
For (String nextChannel: producers.keySet ()) {
FlinkKafkaProducer010 target = producers.get (nextChannel)
SplitStream.select (nextChannel) .addSink (target)
}
}
Advantages:
Can be routed to TOPIC on different BROKER for better isolation on different BROKER
Make full use of the native features of FLINK to make it more concise and elegant, which solves the second deficiency of solution 1.
Defect:
All FlinkKafkaProducer are created only once in the MAIN function. If new KAFKA queues are added later, they cannot be dynamically aware and routed.
Scenario 3: using the getTargetTopic function in FLINK's KeyedSerializationSchema, KeyedSerializationSchema in addition to converting objects into Kafka ProducerRecord
Topic can also be specified dynamically in addition to the key-value pair of
In the FLINK Main function, the input stream is converted to Tuple2 through flatMap, where key is the Topic to which the target belongs and value is the native data.
Implement a KeyedSerializationSchema to be passed to FlinkKafkaProducer as a constructor, and overload the getTargetTopic method: return tuple2.f0
Core code:
Class DynaRouteSerializationSchema implements KeyedSerializationSchema {
String getTargetTopic (T element) {
Tuple2 tuple = (Tuple2) element
Return tuple.f0
}
}
Public static void main (String [] args) {
DataStreamSource source =....
DataStream converted = source
.flatMap (new RichFlatMapFunction () {)
@ Override
Public void flatMap (T value, Collector out)
Throws Exception {
List channelIds = channelRouteStrategy.route (value)
For (String nextChannel: channelIds) {
Out.collect (Tuple2.valueOf (nextChannel, value))
}
}
});
}
Advantages:
Make full use of the native features of FLINK, with very little code.
The newly added TOPIC can also be routed to without the need to start and stop stream processing.
Defect:
It is impossible to implement Broker-level routing like the first two schemes, only Topic-level routing
Cut-off function:
Sometimes a system upgrade or other components are not available and KAFKA PRODUCER needs to be temporarily stopped
FLINK native mechanism:
Passive reverse pressure:
Kafka09Fetcher contains a separate KafkaConsumerThread that reads data from KAFKA and gives it to HANDOVER
HANDOVER can be understood as a queue of size 1, from which Kafka09Fetcher fetches and processes data. Once the processing speed slows down, KafkaConsumerThread
Unable to write data to HANDOVER, the thread will be blocked
In addition, KeyedDeserializationSchema defines an isEndOfStream method. If true is returned, Kafka09Fetcher will stop the loop and exit, resulting in the end of the whole stream processing.
Design ideas:
SignalService: register SignalListener and use Curator TreeCache to listen to a Zookeeper path to get semaphores for start / stop flow processing
SignalListener: callback API for receiving ZOOKEEPER change information
PausableKafkaFetcher: inherits the native KafkaFetcher of Flink and listens to signal changes that block ConsumerThread processing
PausableKafkaConsumer: inherits the native KafkaConsumer of Flink and creates PausableKafkaFetcher
Core code:
Public class PausableKafkaFetcher extends Kafka010Fetcher implements SignalListener {
Private final ReentrantLock pauseLock = new ReentrantLock (true)
Private final Condition pauseCond = pauseLock.newCondition ()
Private volatile boolean paused = false
Public void onSignal (String path, String value) {
Try {
PauseLock.lockInterruptibly ()
} catch (InterruptedException e) {
}
Try {
If (SIGNAL_PAUSE.equals (value)) {
Paused = true
} else if (SIGNAL_START.equals (value)) {
Paused = false
}
PauseCond.signal ()
}
Finally {
PauseLock.unlock ()
}
}
Protected void emitRecord (T record, KafkaTopicPartitionState partition, long offset, ConsumerRecord consumerRecord) throws Exception {
Super.emitRecord (record, partition, offset, consumerRecord)
PauseLock.lockInterruptibly ()
Try {
While (paused) {
PauseCond.await ()
}
} finally {
PauseLock.unlock ()
}
}
}
Public class PausableKafkaConsumer extends FlinkKafkaConsumer010 {
Public void open (Configuration configuration) {
SignalService = ZKSignalService.getInstance ()
SignalService.initialize (zkConfig)
}
Public void cancel () {
Super.cancel ()
UnregisterSignal ()
}
Public void close () {
Super.close ()
UnregisterSignal ()
}
Private void unregisterSignal () {
If (signalService! = null) {
String fullPath = WATCH_PREFIX + "/" + watchPath
SignalService.unregisterSignalListener (fullPath)
}
}
Protected AbstractFetcher createFetcher (...) Throws Exception {
PausableKafkaFetcher fetcher = new PausableKafkaFetcher (...)
If (signalService! = null) {
String fullPath = WATCH_PREFIX + "/" + watchPath
SignalService.registerSignalListener (fullPath, fetcher)
}
Return fetcher
}
}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.