Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the source code of partition policy in Flink

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail what the source code of the zoning strategy in Flink is. The editor thinks it is very practical, so I share it for you as a reference. I hope you can get something after reading this article.

Inheritance diagram interface

Name

ChannelSelector

Realize

Public interface ChannelSelector {/ * initializes the number of channels. Channel can be understood as an instance of downstream Operator (a subtask of a parallel operator). * / void setup (int numberOfChannels); / * based on the current record and the total number of Channel, * decide which Channel the record should be sent to downstream. * different partitioning strategies will implement different methods. * / int selectChannel (T record); / * whether to broadcast to all downstream operator instances * / boolean isBroadcast ();} Abstract class

Name

StreamPartitioner

Realize

Public abstract class StreamPartitioner implements ChannelSelector, Serializable {private static final long serialVersionUID = 1L; protected int numberOfChannels; @ Override public void setup (int numberOfChannels) {this.numberOfChannels = numberOfChannels;} @ Override public boolean isBroadcast () {return false;} public abstract StreamPartitioner copy ();} inheritance diagram

GlobalPartitioner

Brief introduction

The divider sends all data to an operator instance downstream (subtask id = 0).

Source code interpretation / * sends all data to the first task of the downstream operator (ID = 0) * @ param * / @ Internal public class GlobalPartitioner extends StreamPartitioner {private static final long serialVersionUID = 1L; @ Override public int selectChannel (SerializationDelegate record) {/ / only returns 0, that is, it is only sent to the first task return 0 of the downstream operator } @ Override public StreamPartitioner copy () {return this;} @ Override public String toString () {return "GLOBAL";}} diagram

ShufflePartitioner

Brief introduction

Randomly select a downstream operator instance to send

Source code interpretation / * randomly select a channel to send * @ param * / @ Internal public class ShufflePartitioner extends StreamPartitioner {private static final long serialVersionUID = 1L; private Random random = new Random (); @ Override public int selectChannel (SerializationDelegate record) {/ / generate [0dint numberOfChannels) pseudorandom numbers, which are randomly sent to a downstream task return random.nextInt (numberOfChannels). } @ Override public StreamPartitioner copy () {return new ShufflePartitioner ();} @ Override public String toString () {return "SHUFFLE";}} diagram

BroadcastPartitioner

Brief introduction

Send to all downstream operator instances

Source code interpretation / * is sent to all channel * / @ Internal public class BroadcastPartitioner extends StreamPartitioner {private static final long serialVersionUID = 1L; / * Broadcast mode is sent directly to all task downstream, so there is no need to select the sending channel * / @ Override public int selectChannel (SerializationDelegate record) {throw new UnsupportedOperationException ("Broadcast partitioner does not support select channels.") } @ Override public boolean isBroadcast () {return true;} @ Override public StreamPartitioner copy () {return this;} @ Override public String toString () {return "BROADCAST";}} diagram

RebalancePartitioner

Brief introduction

Send it to the downstream task in turn through a loop

The source code interpretation / * is sent in turn to the downstream task * @ param * / @ Internal public class RebalancePartitioner extends StreamPartitioner {private static final long serialVersionUID = 1L; private int nextChannelToSendTo; @ Override public void setup (int numberOfChannels) {super.setup (numberOfChannels); / / initializes the id of channel and returns the pseudorandom number nextChannelToSendTo = ThreadLocalRandom.current () .nextInt (numberOfChannels). } @ Override public int selectChannel (SerializationDelegate record) {/ / the loop is sent to the downstream task in turn. For example, if the initial value of nextChannelToSendTo is 0 nextChannelToSendTo (the number of instances of downstream operators, parallelism) is 2 /, then it is sent to task with ID = 1 for the first time, to task with ID = 0 for the second time, and to task at ID = 1 for the third time. Analogy nextChannelToSendTo = (nextChannelToSendTo + 1)% numberOfChannels; return nextChannelToSendTo;} public StreamPartitioner copy () {return this;} @ Override public String toString () {return "REBALANCE";}} diagram

RescalePartitioner

Brief introduction

Based on the parallelism of the upstream and downstream Operator, the records are output cyclically to each instance of the downstream Operator.

For example, if the upstream parallelism is 2 and the downstream parallelism is 4, then one upstream parallelism will output the record to the two downstream parallelism in a circular manner, and the other upstream parallelism will output the record to the other two downstream parallelism in a cyclic manner.

If the upstream parallelism is 4 and the downstream parallelism is 2, the two upstream parallelism will output the record to one downstream parallelism, and the other two upstream parallelism will output the record to the other downstream parallelism.

Source code interpretation @ Internal public class RescalePartitioner extends StreamPartitioner {private static final long serialVersionUID = 1L; private int nextChannelToSendTo =-1; @ Override public int selectChannel (SerializationDelegate record) {if (+ + nextChannelToSendTo > = numberOfChannels) {nextChannelToSendTo = 0;} return nextChannelToSendTo;} public StreamPartitioner copy () {return this;} @ Override public String toString () {return "RESCALE" }} diagram

Scream hint

The execution diagram in Flink can be divided into four layers: StreamGraph-> JobGraph-> ExecutionGraph-> physical execution diagram.

StreamGraph: is the original diagram generated from code written by the user through Stream API. Used to represent the topology of the program.

JobGraph:StreamGraph is optimized to generate JobGraph, which is submitted to the data structure of JobManager. The main optimization is to chain multiple eligible nodes together as a single node, which reduces the serialization / deserialization / transmission consumption required for data to flow between nodes.

ExecutionGraph:JobManager generates ExecutionGraph from JobGraph.

Physical execution diagram: the "diagram" formed after JobManager dispatches Job according to ExecutionGraph and deploys Task on each TaskManager is not a specific data structure.

And StreamingJobGraphGenerator is the conversion from StreamGraph to JobGraph. In this class, ForwardPartitioner and RescalePartitioner are listed as POINTWISE allocation mode, and the rest is ALL_TO_ALL allocation mode. The code is as follows:

If (partitioner instanceof ForwardPartitioner | | partitioner instanceof RescalePartitioner) {jobEdge = downStreamVertex.connectNewDataSetAsInput (headVertex, / / instance (subtask) of upstream operator (production side) connects one or more instances (subtask) DistributionPattern.POINTWISE, resultPartitionType of downstream operator (consumer side) } else {jobEdge = downStreamVertex.connectNewDataSetAsInput (headVertex, / / subtask of upstream operator (production side) connects all instances (subtask) DistributionPattern.ALL_TO_ALL, resultPartitionType of downstream operator (consumer side);}

ForwardPartitioner

Brief introduction

Send it to the first task corresponding to the downstream to ensure that the parallelism of the upstream and downstream operators is the same, that is, the relationship between the upper operator and the downstream operator is 1:1

Source code interpretation / * send to the first downstream corresponding task * @ param * / @ Internal public class ForwardPartitioner extends StreamPartitioner {private static final long serialVersionUID = 1L; @ Override public int selectChannel (SerializationDelegate record) {return 0;} public StreamPartitioner copy () {return this;} @ Override public String toString () {return "FORWARD";}} diagram

Scream hint

If the upstream and downstream operators do not specify a divider, ForwardPartitioner is used if the parallelism of the upstream and downstream operators is the same, otherwise RebalancePartitioner is used. For ForwardPartitioner, the parallelism of the upstream and downstream operators must be ensured, otherwise an exception will be thrown

/ / if the upstream and downstream operators do not specify a divider, use ForwardPartitioner if the parallelism of the upstream and downstream operators is the same, otherwise use RebalancePartitioner if (partitioner = = null & & upstreamNode.getParallelism () = = downstreamNode.getParallelism ()) {partitioner = new ForwardPartitioner ();} else if (partitioner = = null) {partitioner = new RebalancePartitioner () } if (partitioner instanceof ForwardPartitioner) {/ / if the parallelism of upstream and downstream is inconsistent, an exception if (upstreamNode.getParallelism ()! = downstreamNode.getParallelism ()) {throw new UnsupportedOperationException ("Forward partitioning does not allow" + "change of parallelism. Upstream operation: "+ upstreamNode +" parallelism: "+ upstreamNode.getParallelism () +", downstream operation: "+ downstreamNode +" parallelism: "+ downstreamNode.getParallelism () +" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.);}}

KeyGroupStreamPartitioner

Brief introduction

Send it to the corresponding downstream subtask according to the packet index of key

Source code interpretation / * send to the corresponding downstream subtask * @ param * @ param * / @ Internal public class KeyGroupStreamPartitioner extends StreamPartitioner implements ConfigurableStreamPartitioner {... @ Override public int selectChannel (SerializationDelegate record) {K key; try {key = keySelector.getKey (record.getInstance (). GetValue ()) according to the packet index selection of the key. } catch (Exception e) {throw new RuntimeException ("Could not extract key from" + record.getInstance (). GetValue (), e);} / / call the assignKeyToParallelOperator method of the KeyGroupRangeAssignment class, the code is as follows: return KeyGroupRangeAssignment.assignKeyToParallelOperator (key, maxParallelism, numberOfChannels);}.}

Org.apache.flink.runtime.state.KeyGroupRangeAssignment

Public final class KeyGroupRangeAssignment {. / * assigns an index of a parallel operator instance according to key, which is the routing information of the downstream operator instance to be sent by the key, * that is, to which task * / public static int assignKeyToParallelOperator (Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull (key, "Assigned key must not be null!") the key is sent. Return computeOperatorIndexForKeyGroup (maxParallelism, parallelism, assignToKeyGroup (key, maxParallelism));} / * assign a packet id (keyGroupId) * / public static int assignToKeyGroup (Object key, int maxParallelism) {Preconditions.checkNotNull (key, "Assigned key must not be null!") according to key; / / get hashcode return computeKeyGroupForKeyHash (key.hashCode (), maxParallelism) of key } / * assign a packet id (keyGroupId) according to key, * / public static int computeKeyGroupForKeyHash (int keyHash, int maxParallelism) {/ / take the remainder with maxParallelism to get keyGroupId return MathUtils.murmurHash (keyHash)% maxParallelism } / / calculate the partition index, that is, which operator instance public static int computeOperatorIndexForKeyGroup (int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;} the key group should be sent downstream. Graphic illustration

CustomPartitionerWrapper

Brief introduction

The record is output downstream through the partition method (custom) of the Partitioner instance.

Public class CustomPartitionerWrapper extends StreamPartitioner {private static final long serialVersionUID = 1L; Partitioner partitioner; KeySelector keySelector; public CustomPartitionerWrapper (Partitioner partitioner, KeySelector keySelector) {this.partitioner = partitioner; this.keySelector = keySelector;} @ Override public int selectChannel (SerializationDelegate record) {K key; try {key = keySelector.getKey (record.getInstance (). GetValue ()) } catch (Exception e) {throw new RuntimeException ("Could not extract key from" + record.getInstance (), e);} / / implement the Partitioner interface and rewrite the partition method return partitioner.partition (key, numberOfChannels);} @ Override public StreamPartitioner copy () {return this;} @ Override public String toString () {return "CUSTOM";}}

For example:

Public class CustomPartitioner implements Partitioner {/ / key: partition / / numPartitions according to the value of key: downstream operator parallelism @ Override public int partition (String key, int numPartitions) {return key.length ()% numPartitions / / define partition policy here}} this is the end of the article on "what is the source code of partition policy in Flink". I hope the above content can be helpful to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report