In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains the "kafka data source Flink Kafka Consumer analysis", the article explains the content is simple and clear, easy to learn and understand, now please follow the editor's ideas slowly in depth, together to study and learn "kafka data source Flink Kafka Consumer analysis" bar!
I. timing of calling the open () method
FlinkKafkaConsumer inherits from RichFunction and has a lifecycle method open (). So when did flink call the open () method of FlinkKafkaConsumer?
Before invoking the operator program, StreamTask executes the beforeInvoke () method, where it initializes the operator and executes the open () method:
OperatorChain.initializeStateAndOpenOperators (createStreamTaskStateInitializer ())
Initialize the loop pair operator in the initializeStateAndOpenOperators () method:
Protected void initializeStateAndOpenOperators (StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {for (StreamOperatorWrapper operatorWrapper: getAllOperators (true)) {StreamOperator operator = operatorWrapper.getStreamOperator (); operator.initializeState (streamTaskStateInitializer); operator.open ();}}
The operator corresponding to kafka source is StreamSource, and its open () method is
Public void open () throws Exception {super.open (); FunctionUtils.openFunction (userFunction, new Configuration ();}
The openFunction () of FunctionUtils is the open () method that executes the operator (to inherit RichFunction):
Public static void openFunction (Function function, Configuration parameters) throws Exception {if (function instanceof RichFunction) {RichFunction richFunction = (RichFunction) function; richFunction.open (parameters);}} 2.When is the runtime context RuntimeContext assigned?
In StreamTask.beforeInvoke ()-> new OperatorChain ()-> StreamOperatorFactoryUtil.createOperator (), in the constructor of OperatorChain, create the StreamOperator through the factory class StreamOperatorFactory. The StreamOperatorFactory corresponding to kafka source is the setup () method that calls StreamOperator in the SimpleOperatorFactory,createStreamOperator () method:
Public T createStreamOperator (StreamOperatorParameters parameters) {if (operator instanceof AbstractStreamOperator) {((AbstractStreamOperator) operator) .setProcessingTimeService (processingTimeService) } if (operator instanceof SetupableStreamOperator) {((SetupableStreamOperator) operator) .setup (parameters.getContainingTask (), parameters.getStreamConfig (), parameters.getOutput ()) } return (T) operator;}
The StreamOperator corresponding to kafka source is StreamSource, which implements the SetupableStreamOperator interface. Its setup method is in the parent class AbstractUdfStreamOperator:
Public void setup (StreamTask containingTask, StreamConfig config, Output output) {super.setup (containingTask, config, output); FunctionUtils.setFunctionRuntimeContext (userFunction, getRuntimeContext ();}
FunctionUtils.setFunctionRuntimeContext () sets the RuntimeContext for the operator. The set RuntimeContext is StreamingRuntimeContext in the setup () method of AbstractStreamOperator:
This.runtimeContext = new StreamingRuntimeContext (environment, environment.getAccumulatorRegistry (). GetUserMap (), getMetricGroup (), getOperatorID (), getProcessingTimeService (), null, environment.getExternalResourceInfoProvider ()) Third, the run () method of FlinkKafkaConsumer
Flink calls the run () method of FlinkKafkaConsumer to produce the data. Processing logic for the run () method:
① creates a KafkaFetcher to pull data
This.kafkaFetcher = createFetcher (sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext (), offsetCommitMode GetRuntimeContext (). GetMetricGroup (). AddGroup (KAFKA_CONSUMER_METRICS_GROUP), useMetrics)
Create a KafkaConsumerThread thread in runFetchLoop () of ② KafkaFetcher to loop through kafka data. KafkaConsumerThread pulls kafka data through KafkaConsumer and gives it to Handover
If (records = = null) {try {records = consumer.poll (pollTimeout) } catch (WakeupException we) {continue }} try {handover.produce (records); records = null;}
KafkaFetcher acquires pulled kafka data through Handover
While (running) {/ / this blocks until we get the next records / / it automatically re-throws exceptions encountered in the consumer thread final ConsumerRecords records = handover.pollNext () / / get the records for each topic partition for (KafkaTopicPartitionState partition: subscribedPartitionStates ()) {List partitionRecords = records.records (partition.getKafkaPartitionHandle ()) PartitionConsumerRecordsHandler (partitionRecords, partition);}}
③ sends data to the next operator through Output in SourceContext
Public void collect (T element) {synchronized (lock) {output.collect (reuse.replace (element));}}
SourceContext is created through StreamSourceContexts.getSourceContext () in the run () method of StreamSource. Output is created in createOutputCollector () of OperatorChain and returns a value for it.
For (StreamEdge outputEdge: operatorConfig.getNonChainedOutputs (userCodeClassloader)) {@ SuppressWarnings ("unchecked") RecordWriterOutput output = (RecordWriterOutput) streamOutputs.get (outputEdge); allOutputs.add (new Tuple2 (output, outputEdge));}
When there is one output, when there are multiple RecordWriterOutput;, it is CopyingDirectedOutput or DirectedOutput
When ④ outputs a single RecordWriterOutput, it is output through a member property RecordWriter instance. RecordWriter is created by createRecordWriterDelegate () of StreamTask. RecordWriterDelegate is the proxy class of RecordWriter, and the RecordWriter instance is held internally:
Public static RecordWriterDelegate createRecordWriterDelegate (StreamConfig configuration, Environment environment) {List recordWrites = createRecordWriters (configuration, environment); if (recordWrites.size () = = 1) {return new SingleRecordWriter (recordWrites.get (0)) } else if (recordWrites.size () = = 0) {return new NonRecordWriter ();} else {return new MultipleRecordWriters (recordWrites) }} private static List createRecordWriters (StreamConfig configuration, Environment environment) {List recordWriters = new ArrayList (); List outEdgesInOrder = configuration.getOutEdgesInOrder (environment.getUserClassLoader ()); for (int I = 0; I
< outEdgesInOrder.size(); i++) { StreamEdge edge = outEdgesInOrder.get(i); recordWriters.add( createRecordWriter( edge, i, environment, environment.getTaskInfo().getTaskName(), edge.getBufferTimeout())); } return recordWriters; } outEdgesInOrder来源于StreamGraph中的StreamNode的List outEdges。 创建RecordWriter时,根据StreamEdge的StreamPartitioner outputPartitioner的isBroadcast()方法判断是BroadcastRecordWriter还是ChannelSelectorRecordWriter: public RecordWriter build(ResultPartitionWriter writer) { if (selector.isBroadcast()) { return new BroadcastRecordWriter(writer, timeout, taskName); } else { return new ChannelSelectorRecordWriter(writer, selector, timeout, taskName); } } outputPartitioner是根据上下游节点并行度是否一致来确定: if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner(); } else if (partitioner == null) { partitioner = new RebalancePartitioner(); } BroadcastRecordWriter和ChannelSelectorRecordWriter最终都会调用成员属性ResultPartitionWriter targetPartition的flush()方法来输出数据。ResultPartitionWriter 在ConsumableNotifyingResultPartitionWriterDecorator的decorate()生成。根据对应的ResultPartitionDeploymentDescriptor来判断是ConsumableNotifyingResultPartitionWriterDecorator还是直接传入的partitionWriters。ConsumableNotifyingResultPartitionWriterDecorator会把消息直接传给下个节点消费,通过ResultPartitionConsumableNotifier来通知: public static ResultPartitionWriter[] decorate( Collection descs, ResultPartitionWriter[] partitionWriters, TaskActions taskActions, JobID jobId, ResultPartitionConsumableNotifier notifier) { ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length]; int counter = 0; for (ResultPartitionDeploymentDescriptor desc : descs) { if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) { consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriterDecorator( taskActions, jobId, partitionWriters[counter], notifier); } else { consumableNotifyingPartitionWriters[counter] = partitionWriters[counter]; } counter++; } return consumableNotifyingPartitionWriters; } partitionWriters通过 NettyShuffleEnvironment的createResultPartitionWriters() ->Create () creation of ResultPartitionFactory. The output of the ResultPartition is done through the member attribute ResultSubpartition [] subpartitions. Subpartitions is generated in createSubpartitions () of ResultPartitionFactory:
Private void createSubpartitions (ResultPartition partition, ResultPartitionType type, BoundedBlockingSubpartitionType blockingSubpartitionType, ResultSubpartition [] subpartitions) {/ / Create the subpartitions. If (type.isBlocking ()) {initializeBoundedBlockingPartitions (subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager) } else {for (int I = 0; I
< subpartitions.length; i++) { subpartitions[i] = new PipelinedSubpartition(i, partition); } } } 流式任务时,ResultSubpartition为PipelinedSubpartition。 四、数据写出4.1 ResultPartitionConsumableNotifier通知 ResultPartitionConsumableNotifier在TaskExecutor的associateWithJobManager()中生成: private JobTable.Connection associateWithJobManager( JobTable.Job job, ResourceID resourceID, JobMasterGateway jobMasterGateway) { ...... ...... ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( jobMasterGateway, getRpcService().getExecutor(), taskManagerConfiguration.getTimeout()); ...... ...... } RpcResultPartitionConsumableNotifier远程调用JobMaster的scheduleOrUpdateConsumers()方法,传入ResultPartitionID partitionId 4.1.1 JobMaster的scheduleOrUpdateConsumers() JobMaster通过ExecutionGraph的scheduleOrUpdateConsumers()通知下游消费算子。 这里有两个关键代码: ①从本算子ExecutionVertex的成员Map resultPartitions中取出该分区对应的生产消费信息,这些信息存储在IntermediateResultPartition中; void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { ....... final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId()); ....... if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition execution.scheduleOrUpdateConsumers(partition.getConsumers()); } else { throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" + "pipelined partitions."); } } 从IntermediateResultPartition取出消费者List allConsumers; 从ExecutionEdge的ExecutionVertex target的Execution currentExecution中取出执行任务; ②Execution的sendUpdatePartitionInfoRpcCall()方法通过rpc调用TaskExcutor的updatePartitions()方法来执行下游消费者算子 private void sendUpdatePartitionInfoRpcCall( final Iterable partitionInfos) { final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation(); CompletableFuture updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout); updatePartitionsResultFuture.whenCompleteAsync( (ack, failure) ->{/ / fail if there was a failure if (failure! = null) {fail (new IllegalStateException ("Update to task [" + getVertexWithAttempt ()) + "] on TaskManager" + taskManagerLocation + "failed" Failure)) }, getVertex (). GetExecutionGraph (). GetJobMasterMainThreadExecutor ();} 4.1.2 TaskExecutor updatePartitions ()
UpdatePartitions () of TaskExecutor to update partition information. If the InputChannel was previously unknown, update it. UpdateInputChannel () of SimpleInputGate:
Public void updateInputChannel (ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException {synchronized (requestLock) {if (closeFuture.isDone ()) {/ / There was a race with a task failure/cancel return } IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID (). GetPartitionId (); InputChannel current = inputChannels.get (partitionId); if (current instanceof UnknownInputChannel) {UnknownInputChannel unknownChannel = (UnknownInputChannel) current; boolean isLocal = shuffleDescriptor.isLocalTo (localLocation) InputChannel newChannel; if (isLocal) {newChannel = unknownChannel.toLocalInputChannel () } else {RemoteInputChannel remoteInputChannel = unknownChannel.toRemoteInputChannel (shuffleDescriptor.getConnectionId ()); remoteInputChannel.assignExclusiveSegments (); newChannel = remoteInputChannel } LOG.debug ("{}: Updated unknown input channel to {}.", owningTaskName, newChannel); inputChannels.put (partitionId, newChannel); channels [current.getChannelIndex ()] = newChannel If (requestedPartitionsFlag) {newChannel.requestSubpartition (consumedSubpartitionIndex);} for (TaskEvent event: pendingEvents) {newChannel.sendTaskEvent (event) } if (--numberOfUninitializedChannels = = 0) {pendingEvents.clear ();}} 4.2 PipelinedSubpartition write out
The record is written to the cache ArrayDeque buffers and then notified by the notifyDataAvailable () method of PipelinedSubpartitionView readView's notifyDataAvailable ()-> BufferAvailabilityListener availabilityListener.
4.2.1 when is the BufferAvailabilityListener created?
When creating a ShuffleEnvironment, ① TaskManagerServices obtains the processor PartitionRequestServerHandler of the Netty server through NettyShuffleServiceFactory's createNettyShuffleEnvironment ()-> new NettyConnectionManager ()-> new NettyServer ()-> ServerChannelInitializer's initChannel ()-> NettyProtocol's getServerChannelHandlers ():
Public ChannelHandler [] getServerChannelHandlers () {PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue (); PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler (partitionProvider, taskEventPublisher, queueOfPartitionQueues) Return new ChannelHandler [] {messageEncoder, new NettyMessage.NettyMessageDecoder (), serverHandler, queueOfPartitionQueues};}
When ② PartitionRequestServerHandler gets the PartitionRequest message sent by the client, it creates the CreditBasedSequenceNumberingViewReader and sets the CreditBasedSequenceNumberingViewReader through requestSubpartitionView ()-> ResultPartitionManager's createSubpartitionView ()-> ResultPartition's createSubpartitionView ()
The notifyDataAvailable () method of ③ CreditBasedSequenceNumberingViewReader calls notifyReaderNonEmpty () of PartitionRequestQueue to notify the downstream operator:
Void notifyReaderNonEmpty (final NetworkSequenceViewReader reader) {/ / The notification might come from the same thread. For the initial writes this / / might happen before the reader has set its reference to the view, because / / creating the queue and the initial notification happen in the same method call. / / This can be resolved by separating the creation of the view and allowing / / notifications. / / TODO This could potentially have a bad performance impact as in the / / worst case (network consumes faster than the producer) each buffer / / will trigger a separate event loop task being scheduled. Ctx.executor (). Execute (()-> ctx.pipeline (). FireUserEventTriggered (reader));} Thank you for reading, the above is the content of "Flink Kafka Consumer Analysis of kafka data sources". After the study of this article, I believe you have a deeper understanding of the problem of Flink Kafka Consumer analysis of kafka data sources, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.