In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces how to deal with the data flow of Apache Flink Task implementation, the article is very detailed, has a certain reference value, interested friends must read it!
Get stream data
The code submitted by the user is finally encapsulated as org.apache.flink.runtime.taskmanager.Task,Task is a Runnable, so the core code is in the run method, which calls the doRun method and invokable.invoke () in doRun, where the whole processing flow of Task is actually in it. Org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable is an abstract class whose subclasses are different types of Task. Here we focus on the invoke method of org.apache.flink.streaming.runtime.tasks.StreamTask,StreamTask that executes the runMailboxLoop () method related to the flow processing task.
The runMailboxLoop () method is the runMailboxLoop method that executes org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor. MailboxProcessor is a threading model. RunMailboxLoop continuously performs tasks and default actions in while polling, where the default action is the processInput method of StreamTask, which calls the inputProcessor method of StreamInputProcessor, in which the stream data is obtained and processed. StreamOneInputProcessor and StreamTwoInputProcessor, subclasses of StreamInputProcessor, are used to deal with Task with 1 and 2 degrees of entry (regardless of StreamMultipleInputProcessor). In StreamOneInputProcessor, there is one StreamTaskInput to get data and one DataOutput to collect data from StreamTaskInput; similarly, StreamTwoInputProcessor has two StreamTaskInput and two DataOutput. StreamTaskNetworkInput, a subclass of StreamTaskInput, is used to obtain stream data from the network. By calling him, its emitNext not only deals with stream data but also deals with checkpoint barrier. This article only focuses on the processing flow of data flow. StreamTaskNetworkInput gets the complete stream data from the deserializer and gives it to DataOutput. DataOutput also has subclasses that handle 1 degree and 2 degrees, all of which hold a reference to the first operator in OperatorChain, called headOperator,DataOutput, which gets the data from StreamTaskInput and gives it to headOperator to process. At this point, the stream data is fetched and passed into the OperatorChain. Here's a summary: StreamTask's processInput method is called repeatedly in MailboxProcessor, and StreamTask uses StreamInputProcessor to get and process stream data in the processInput method. The StreamTaskInput in StreamInputProcessor is used to obtain data, and the obtained data is handed over to DataOutput,DataOutput to pass the data into the first operator of OperatorChain. Both StreamTask,StreamInputProcessor and DataOutput have subclasses that deal with 1 entry and 2 entry degrees.
Data flows through OperatorChain
After the first operator of OperatorChain gets the data, how does the data flow through the OperatorChain? First of all, OperatorChain,StreamOperatorWrapper is every node of chain, and each node has a reference to the next or previous node, so OperatorChain is a two-way linked list. But the flow of data does not depend on this chain structure. We mentioned above that the first node that DataOutput gives data to headOperator,OperatorChain is a subclass of StreamOperator, and the filer operator and map operator we write will eventually be encapsulated into StreamOperator, for example, the subclass StreamFlatMap executes the flatMap method, StreamFilter executes the fliter method, and so on. When these methods are executed, org.apache.flink.streaming.api.operators.Output is used to collect the processed results. For example, StreamFilter collects data when FilterFunction returns true, while StreamFlatMap passes Output into the flatMap method to collect data by user code. How is the collected data passed to the next node of the OperatorChain? It turns out that holding the OneInputStreamOperator variable in Output points to the operator of the next node in chain. Calling the collect method of Output calls the processElement of the next operator, and the data is passed throughout the OperatorChain.
Send to downstream Task
How does the data get to the next Task when it reaches the last operator of OperatorChain? The last Output implementation class owned by the operator is org.apache.flink.streaming.runtime.io.RecordWriterOutput. The org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit method called by RecordWriterOutput's collect method is used to send data, which copies the data from the serializer to BufferBuilder. BufferBuilder maintains a memory fragment MemorySegment and can create corresponding consumers. RecordWriter has two implementation classes, ChannelSelectorRecordWriter and BroadcastRecordWriter. Task sends data to multiple parallelism degrees of downstream nodes, each of which corresponds to a channel. ChannelSelectorRecordWriter saves a BufferBuilder for each chanel and adds BufferConsumer separately:
BufferBuilder bufferBuilder = super.requestNewBufferBuilder (targetChannel); / / press channel to get BufferBuilderaddBufferConsumer (bufferBuilder.createBufferConsumer (), targetChannel); / / press channel to add BufferConsumer bufferBuilders [targetChannel] = bufferBuilder
BroadcastRecordWriter has only one BufferBuilder, and use the same BufferBuilder to add BufferConsumer to all channel:
Try (BufferConsumer bufferConsumer = builder.createBufferConsumer ()) {for (int channel = 0; channel < numberOfChannels; channel++) {addBufferConsumer (bufferConsumer.copy (), channel); / / all channel use the same BufferBuilder for broadcasting}}
The RecordWriter#requestNewBufferBuilder method gets the BufferBuilder, and if it fails, it will cause the Task thread of execution to block and cause reverse pressure.
Public BufferBuilder requestNewBufferBuilder (int targetChannel) throws IOException, InterruptedException {BufferBuilder builder = targetPartition.tryGetBufferBuilder (targetChannel); / / attempt to get, but cannot return null if (builder = = null) {long start = System.currentTimeMillis (); builder = targetPartition.getBufferBuilder (targetChannel); / / block acquisition, resulting in reverse pressure idleTimeMsPerSecond.markEvent (System.currentTimeMillis ()-start);} return builder;}
BufferBuilder ultimately comes from LocalBufferPool,LocalBufferPool with several important attributes:
/ / taskmanager's network cache pool, from which MemorySegment gets the MemorySegment that private final NetworkBufferPool networkBufferPool;// has acquired and is organized into a queue private final ArrayDeque availableMemorySegments = new ArrayDeque (); / / the size of the current localBufferPool, the maximum number of BufferBuilder that each channel can get at the same time private final int maxBuffersPerChannel;//subpartition is channel, and the array stores the number of BufferBuilder that each channel uses simultaneously, private final int [] subpartitionBuffersCount.
BufferBuilder is obtained by the requestMemorySegment method and the requestMemorySegmentBlocking method. The requestMemorySegmentBlocking method also calls the requestMemorySegment method and blocks it through the get method of AvailableFuture when the MemorySegment is not obtained. AvailableFuture is a state bit represented by CompletableFuture. Here, the get method of CompletableFuture will block until the property of complete, the unfinished future represents unavailable, and the finished one represents available. In the requestMemorySegment method, if the obtained MemorySegment (numberOfRequestedMemorySegments) is greater than the size of the localBufferPool (currentPoolSize), you need to return the excess MemorySegment to networkBufferPool first. Then get MemorySegment, set AvailableFuture to unavailable if not available, otherwise record the number of MemorySegment used by channel, and set AvailableFuture to unavailable if it is greater than maxBuffersPerChannel.
@ Nullableprivate MemorySegment requestMemorySegment (int targetChannel) throws IOException {MemorySegment segment = null; synchronized (availableMemorySegments) {returnExcessMemorySegments (); / / return the excess segment to networkBufferPool if (availableMemorySegments.isEmpty ()) {segment = requestMemorySegmentFromGlobal (); / / Global acquisition} / / segment may have been released by buffer pool owner if (segment = = null) {segment = availableMemorySegments.poll () / / Local get} if (segment = = null) {availabilityHelper.resetUnavailable (); / / cannot be set to unavailable} / / record the number of segment being used by channel, if the setting exceeds the setting to unavailable if (segment! = null & & targetChannel! = UNKNOWN_CHANNEL) {if (subpartitionBuffersCounting [target Channel] + + = = maxBuffersPerChannel) {unavailableSubpartitionsCount++ Collection of availabilityHelper.resetUnavailable ();} return segment;} backpressure
The setting of the AvailableFuture mentioned above is actually related to the backpressure, and the isBackPressured method of the Task returns whether the Task produces a backpressure.
Public boolean isBackPressured () {if (invokable = = null | | consumableNotifyingPartitionWriters.length = = 0 | |! isRunning ()) {return false;} / / get all AvailableFuture. If it is not completed, there is reverse pressure final CompletableFuture [] outputFutures = new CompletableFuture [consumableNotifyingPartitionWriters.length]; for (int I = 0; I < outputFutures.length; + + I) {outputFutures [I] = originableNotifyingPartitionWriting [I] .getAvailableFuture ();} return! CompletableFuture.allOf (outputFutures). IsDone () } these are all the contents of the article "how to handle the data flow executed by Apache Flink Task". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.