In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Author: Nico Kruber
Translator: Cao Yingjie
Flink's network protocol stack is one of the core components of the flink-runtime Please add link description module, which is the core of every Flink job. It connects the various subtasks (Subtask) of all TaskManager, so it is critical to the performance of Flink jobs, including throughput and latency. Unlike the control channel that communicates between TaskManager and JobManager through Akka-based RPC, the network protocol stack between TaskManager depends on a lower layer of Netty API.
This article will first introduce the high-level abstraction of Flink exposure to flow operator (Stream operator), and then introduce in detail the physical implementation of Flink network protocol stack and various optimizations, the effects of optimization, and the tradeoff between throughput and latency of Flink.
1. Logical view
Flink's network protocol stack provides the following logical view of the subtasks that communicate with each other, such as data Shuffle through the keyBy () operation at A:
Cdn.xitu.io/2019/6/25/16b8d5ce9bfaa319?w=1080&h=859&f=png&s=393657 ">
This process is based on the following three basic concepts:
▼ subtask output type (ResultPartitionType):
Pipelined (finite or infinite): once data is generated, finite or infinite data streams can be continuously sent downstream.
Blocking: send data to the downstream only after the complete result is generated.
▼ scheduling policy:
Schedule all tasks simultaneously (Eager): deploy all subtasks of the job at the same time (for flow jobs).
Upstream generates the first record deployment downstream (Lazy): as soon as any producer generates any output, the downstream task is deployed immediately.
Upstream generates complete data to deploy downstream: when any or all producers generate complete data, deploy downstream tasks.
▼ data transfer:
High throughput: instead of sending each record one by one, Flink buffers several records into its network buffer and sends them at once. This reduces the cost of sending each record and thus increases the throughput.
Low latency: timeout is triggered when the network buffer is not filled for a certain period of time. By reducing the timeout, a lower delay can be achieved by sacrificing a certain amount of throughput.
We will see the optimization of throughput latency as we drill down into the physical implementation of the Flink network protocol stack below. For this section, let's elaborate on the output type and scheduling strategy. First of all, it is important to know that the output type of the subtask and the scheduling policy are closely related, and only some specific combination of the two is effective.
The Pipelined result is streaming and requires that the target Subtask is running in order to receive data. Therefore, you need to schedule the downstream target Task to run before the upstream Task generates data or when the first piece of data is generated. Batch jobs generate bounded result data, while streaming jobs produce infinite result data.
Batch jobs may also produce results in a blocking manner, depending on the operator and connection mode used. In this case, you must wait for the upstream Mr. Task to complete the result before dispatching the downstream receiving Task to run. This improves the efficiency of batch jobs and consumes less resources.
The following table summarizes the Task output types and effective combinations of scheduling policies:
Note:
[1] Flink is not currently in use
[2] after the unified completion of batch / stream calculation, it may be suitable for streaming operations.
In addition, for subtasks with multiple inputs, scheduling starts in two ways: when all or any of the upstream tasks generate the first piece of data or complete data. To adjust the output types and scheduling policies in batch jobs, you can refer to ExecutionConfig#setExecutionMode ()-- especially ExecutionMode, and ExecutionConfig#setDefaultInputDependencyConstraint ().
two。 Physical data transmission
To understand physical data connections, recall that in Flink, different tasks can share the same Slot through Slotsharing group. TaskManager can also provide multiple Slot to allow multiple subtasks of the same task to be scheduled to the same TaskManager.
For the example shown in the following figure, we assume that two tasks with a concurrency of 4 are deployed on two TaskManager, with two Slot for each TaskManager. TaskManager 1 executes subtasks A.1 Magi A.2 Magi B.1 and B.2 Magi TaskManager 2 executes subtasks A.3 Magi A.4 Magi B.3 and B.4. Between An and B is the Shuffle connection type, such as the keyBy () operation from A, where there are 2x4 logical connections on each TaskManager, some of which are local and some remote:
Each network connection between different tasks (remote) will get its own TCP channel in the Flink's network stack. However, if different subtasks of the same task are scheduled to the same TaskManager, their network connections to the same TaskManager will be multiplexed and share the same TCP channel to reduce resource usage. In our example, this applies to A.1 → B.3, A.1 → B.4, and A.2 → B.3 and A.2 → B.4, as shown in the following figure:
The output of each subtask is called ResultPartition, and each ResultPartition is divided into multiple separate ResultSubpartition-, one for each logical channel. At this point, Flink's network protocol stack no longer processes individual records, but fills a set of serialized records into the network buffer for processing. The maximum number of Buffer available in the local buffer of each subtask is (one for each sender and one for each receiver):
# channels * buffers-per-channel + floating-buffers-per-gate
The total number of network layer Buffer on a single TaskManager usually does not need to be configured. For more information about how to configure when needed, see the documentation for configuring network buffers.
Reverse pressure caused by ▼ (1)
Each time the subtask's data send buffer runs out-the data resides in Subpartition's buffer queue or in a lower Netty-based network stack, the producer is blocked and cannot continue to send data, resulting in reverse pressure. The receiver works in a similar way: any data received by the Netty needs to be passed to the Flink through the network Buffer. If there is not enough network Buffer,Flink available in the network buffer for the corresponding subtask, it will stop reading from that channel until Buffer is available. This reverses all sending subtasks on the multiplexing, thus limiting other receiving subtasks. The following figure illustrates the overloaded subtask B.4, which can lead to the reverse pressure of multiplexing and the inability of subtask B.3 to accept and process data, even if B.3 has sufficient processing power.
To prevent this from happening, Flink 1.5 introduces its own flow control mechanism.
3.Credit-based flow control
Credit-based flow control ensures that the receiver has sufficient Buffer to receive any data that the sender has sent. The new flow control mechanism is based on the availability of network buffers and is a natural extension of the previous mechanism of Flink. Each remote input channel (RemoteInputChannel) now has its own set of exclusive buffers (Exclusive buffer), rather than just a shared local buffer pool (LocalBufferPool). Unlike before, buffers in the local buffer pool are called flow buffers (Floating buffer) because they flow between output channels and are available for each input channel.
The data receiver notifies the data sender of its own available Buffer as Credit (1 buffer = 1 credit). Each Subpartition tracks the Credit of the downstream receiver (that is, the number of Buffer that can be used to receive data). Only when the corresponding channel (Channel) has Credit will the Flink send data to the lower network protocol stack (at the granularity of Buffer), and each time a Buffer is sent, the Credit on the corresponding channel will be reduced by 1. In addition to sending the data itself, the data sender will also send the number of Buffer queued in the corresponding Subpartition (called Backlog) to the downstream. The data receiver will use this information (Backlog) to apply for an appropriate number of Floating buffer to receive data from the sender, which can speed up the processing of stacked data on the sender. The receiver will first apply for a Buffer equal to the number of Backlog, but it may not be able to apply for all or even one of them. In this case, the receiver will use the applied Buffer to receive data and listen to whether a new Buffer is available.
Credit-based 's flow control uses Buffers-per-channel to specify how many exclusive Buffer each Channel has, and uses Floating-buffers-per-gate to specify the shared local buffer pool (Local buffer pool) size (optional 3). By sharing the local buffer pool, the number of Buffer that can be used by Credit-based flow control can reach the same size as the original non-Credit-based flow control. The default values of these two parameters are carefully selected to ensure that the new Credit-based traffic control can at least achieve the same throughput as the original strategy when the network health delay is normal. These two parameters can be adjusted according to the actual network RRT (round-trip-time) and bandwidth.
Note 3: if there is not enough Buffer available, each buffer pool will get the same share of the globally available Buffer (±1).
Reverse pressure caused by ▼ (2)
Unlike the reverse pressure mechanism at the receiver without flow control, Credit provides more direct control: if the processing speed of the receiver fails to keep up, its Credit will eventually be reduced to 0, and the sender will not send data to the network (the data will be serialized to the Buffer and cached at the sender). Since reverse voltage occurs only on logical links, it is not necessary to block the reading of data from multiplexed TCP connections and does not affect other receivers to receive and process data.
Advantages and problems of ▼ Credit-based
Because through the Credit-based flow control mechanism, one channel in multiplexing will not block other logical channels due to backpressure, so the overall resource utilization will increase. In addition, by fully controlling the amount of data being sent, we can speed up Checkpoint alignment: without flow control, it takes a while for the channel to fill the internal buffer of the network protocol stack and indicate that the receiver is no longer reading data. During this time, a large amount of Buffer will not be processed. Any Checkpoint barrier (messages that trigger Checkpoint) must be queued after the data Buffer, so Checkpoint cannot be triggered until all of this data is processed ("Barrier is not processed before the data!" ).
However, additional notification messages from the receiver (notifying the sender of the Credit) may incur some additional overhead, especially in scenarios where SSL encrypted channels are used. In addition, a single input channel (Input channel) cannot use all the Buffer in the buffer pool because there is an Exclusive buffer that cannot be shared. The new flow control protocol may not be able to send as much data as possible immediately (if the speed of generating data is faster than the speed of Credit feedback from the receiver), it may increase the time to send data. Although this may affect the performance of the job, the new flow control usually performs better because of all its advantages. This may increase the memory overhead by increasing the number of exclusive Buffer for a single channel. However, the overall memory usage may still be lower than the previous implementation, because the underlying network protocol stack no longer needs to cache large amounts of data, because we can always transfer it to Flink immediately (there must be corresponding Buffer to receive data).
When using the new Credit-based flow control, you may also notice another thing: because we buffer less data between the sender and receiver, the backpressure may come earlier. However, this is what we expect, because caching more data doesn't really get any benefit. If you want to cache more data and retain Credit-based flow control, consider increasing the number of shared Buffer for a single input.
Note: if you need to turn off Credit-based flow control, you can add this configuration to flink-conf.yaml: taskmanager.network.credit-model:false. However, this parameter is obsolete and will eventually be removed along with non-Credit-based flow control code.
4. Serial number and deserialization
The following figure extends a higher-level view from the above to include more details about the network protocol stack and its surrounding components, from the sending operator sending record (Record) to the receiving operator getting it:
After the Record is generated and passed out, for example, through Collector#collect (), it is passed to RecordWriter,RecordWriter to serialize the Java object into a sequence of bytes that is eventually stored in Buffer and processed in the network protocol stack as described above. RecordWriter first uses SpanningRecordSerializer to serialize Record into a flexible array of bytes on the heap. It then attempts to write these bytes to the Buffer of the target network Channel. We will return to this section in the following chapters.
At the receiver, the underlying network protocol stack (Netty) writes the received Buffer to the corresponding input channel (Channel). The thread of the stream task eventually reads from these queues and attempts to deserialize the accumulated bytes into Java objects through SpillingAdaptiveSpanningRecordDeserializer with the help of RecordReader. Like serializers, this deserializer must also handle special cases, such as Record across multiple network Buffer, either because the record itself is larger than the network buffer (32KB by default, set through taskmanager.memory.segment-size) or because when serializing Record, there is not enough space left in the target Buffer to hold the serialized byte data, in which case Flink will use this byte space and continue to write the remaining bytes to the new network Buffer.
4.1 write network Buffer to Netty
In the figure above, the Credit-based flow control mechanism is actually located inside the "Netty Server" (and "Netty Client") components, and the Buffer written by RecordWriter is always added to the Subpartition in an empty state (no data), and then gradually populates it with serialized records. But when does Netty actually get and send these Buffer? Obviously, you can't send data as long as there is data in Buffer, because data exchange and synchronization across threads (writer thread and sending thread) will cause a lot of additional overhead and make the cache itself meaningless (if so, it is better to send the serialized bytes directly to the network without introducing the intermediate Buffer).
In Flink, there are three situations in which a Netty server can use (send) network Buffer:
The Buffer becomes full when writing to Record, or the Buffer timeout is not sent, or a special message, such as Checkpoint barrier, is sent.
▼ is sent when Buffer is full.
RecordWriter serializes the Record into a local serialization buffer and gradually writes these serialized bytes to one or more network Buffer located in the corresponding Result subpartition queue. Although a single RecordWriter can handle multiple Subpartition, only one RecordWriter per Subpartition writes data to it. On the other hand, the Netty server thread reads from multiple Result subpartition and writes the data to the appropriate multiplexed channel as described above. This is a typical producer-consumer model, with the network buffer between the producer and the consumer, as shown in the following figure. After (1) serialization and (2) writing data to Buffer, RecordWriter updates the write index of the buffer accordingly. Once the Buffer is fully filled, the RecordWriter (3) fetches the new Buffer from its local buffer pool for the remaining bytes of the current Record or the next Record, and adds the new Buffer to the queue of the corresponding Subpartition. This will (4) notify the Netty server thread that there is new data to send (if Netty does not already know that data is available 4). Whenever Netty has the ability to process these notifications, it will (5) get the available Buffer from the queue and send it over the appropriate TCP channel.
Note 4: if there are more completed Buffer in the queue, we can assume that Netty has been notified.
▼ is sent after Buffer timeout
To support low-latency applications, we can't just wait until the Buffer is full before sending data downstream. Because this may be the case, a certain communication channel does not have much data, and when the Buffer is full, it will unnecessarily increase the processing delay of these small amounts of Record. Therefore, Flink provides a periodic Flush thread (the output flusher) that writes out any cached data at regular intervals. The interval of Flush can be configured through StreamExecutionEnvironment#setBufferTimeout as the upper limit of delay 5 (for low-throughput channels). The following figure shows how it interacts with other components: RecordWriter serializes the data and writes it to the network Buffer, but at the same time, if the Netty does not know that there is data to send, Output flusher notifies the Netty server thread that the data is readable (similar to the "buffer full" scenario above). When the Netty processes this notification (5), it consumes (gets and sends) the data available in the Buffer and updates the read index of the Buffer. The Buffer remains in the queue-- any further operations on this Buffer from the Netty server will continue to be read from the read index the next time.
Note 5: strictly speaking, Output flusher does not provide any guarantees-it only sends notifications to Netty, while the Netty thread processes as it is capable and willing. This also means that if there is reverse pressure, then Output flusher is invalid.
Send after ▼ special message
Some special messages, if sent through RecordWriter, will also trigger immediate Flush cached data. The most important messages include Checkpoint barrier and end-of-partition events, which should be sent as soon as possible, rather than waiting for Buffer to fill up or Output flusher's next Flush.
Further discussion by ▼
Unlike Flink versions smaller than 1.5, note that (a) the network Buffer is now placed directly in the queue of the Subpartition, and (b) the network Buffer will not be shut down after the Flush. This has brought us some benefits:
Less synchronization overhead (Output flusher and RecordWriter are independent of each other) in the case of high load, Netty is the bottleneck (direct network bottleneck or backpressure), we can still populate the unfinished Buffer with data Netty notifications significantly reduced
However, under low load conditions, there may be an increase in CPU usage and TCP packet rates. This is because Flink will use any available CPU computing power to try to maintain the required latency. Once the load increases, Flink adjusts itself by populating more Buffer. Due to the reduction of synchronization overhead, high-load scenarios will not be affected, and even higher throughput can be achieved.
4.2 BufferBuilder and BufferConsumer
To learn more about how the producer-consumer mechanism is implemented in Flink, you need to take a closer look at the BufferBuilder and BufferConsumer classes introduced in Flink 1.5. Although reading is at the granularity of Buffer, writing to it is done at Record, so it is the core path for all network traffic in Flink. Therefore, we need to implement a lightweight connection between the task thread (Task thread) and the Netty thread, which means minimal synchronization overhead. You can get more detailed information by viewing the source code.
5. Delay and throughput
The purpose of introducing network Buffer is to achieve higher resource utilization and higher throughput, at the cost of making Record wait in Buffer for a period of time. Although the upper limit of this wait time can be given through the Buffer timeout, you may want to know more about the trade-off between the two dimensions (latency and throughput). Obviously, you can't have both at the same time. The following figure shows the throughput under different Buffer timeouts, starting from 0 (direct Flush per Record) to 100ms (default). The test runs on a cluster with 100 nodes with 8 Slot per node, and each node runs Task without business logic, so it is only used to test the ability of the network protocol stack. For comparison purposes, we also tested the version of Flink 1.4 prior to the low latency improvement (as described above).
As shown in the figure, with Flink 1.5 timeouts, even very low Buffer timeouts (such as 1ms) (for low-latency scenarios) provide maximum throughput of up to 75% of the timeout default parameter (100ms), but less data is cached.
6. Conclusion
Understanding the different network connections and scheduling types of Result partition, batch and streaming computing, Credit-Based flow control and the working mechanism of the Flink network protocol stack will help to better understand the parameters related to the network protocol stack and the behavior of the job. In the future, we will introduce more relevant contents of the Flink network stack and go into more details, including operation and maintenance-related monitoring indicators (Metrics), further network tuning strategies and common errors that need to be avoided.
Via:
Https://flink.apache.org/2019/06/05/flink-network-stack.html
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.