Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What are the relevant knowledge points in flink?

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "what are the relevant knowledge points in flink". Friends who are interested might as well take a look. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "what are the relevant knowledge points in flink"?

Advantages of 1.OperatorChain1.1 OperatorChain

1.1.1 reduce thread switching 1JI 1.2 reduce serialization and deserialization 1.1.3 reduce data exchange in buffers 1.1.4 reduce latency and improve throughput

1.2 OperatorChain composition conditions

1.2.1 Chain 1.2.2 the parallelism of upstream and downstream operators is consistent 1.2.3 the penetration of downstream operators is 1 (that is, there is no input from other nodes in the downstream nodes) 1.2.4 the chain policy of upstream and downstream operators in the same slot group 1.2.5 is always (can be linked with upstream and downstream) Map, flatmap, filter, etc. Default is always) chain policy with nodes on 1.2.6 is always or head (can only link with downstream, not link with above, source default is head) 1.2.7 there is no data shuffle between upstream and downstream operators (data partitioning method is forward)

1.3 several ways to disable OperatorChain

1.3.1 call startNewChain operator 1.3.2 DataStream calls disableChaining after operator operation of 1.3.1 DataStream to close Chain 1.3.3 StreamExecutionEnvironment.getExecutionEnvironment.disableOperatorChaining () globally close 1.3.4 DataStream.slotSharingGroup ("name") set the new slotgrop name 1.3.5 to change parallelism

The relationship between 2.slot2.1 slot and parallelism

The default number of task slot is consistent with the maximum parallelism of task in join

2.2 shared slot

2.2.1 the task slot required by the flink cluster is exactly the same as the maximum parallelism used in the job (premise, keep the default SlotSharingGroup). In other words, we no longer need to calculate the total number of task generated by a program. 2.2.2 properly setting slotSharingGroup can reduce the number of threads running per slot, thus reducing the overall load on the machine.

3. Accumulator and counter

Counter 3.1is the simplest accumulator. 3.2.The built-in accumulator has IntCounter,LongCounter,DoubleCounter 3.3Histogram histogram.

4. Control delay

By default, the elements in the flow are not transmitted one by one across the network (which leads to unnecessary network traffic consumption), but are cached, and the size of the cache can be controlled in the Flink configuration file, ExecutinEnvironment, or setting an operator to configure (default 100ms) the benefits of such control: increased throughput disadvantages: increased latency

How to balance: (1) for maximum throughput, you can set setBufferTimeout (- 1), which removes the timeout mechanism, and the data in the cache will be sent as soon as it is full. It is not recommended. If a message takes 4 or 5 hours to arrive, the delay will be very high. It will wait for the whole buffer to be full before processing (2) to minimize the delay. You can set the timeout to a number close to 0 (for example, 5 or 10ms) (3) the cache timeout should not be set to 0, because it will cause some performance loss.

5.min minby max maxby

The difference between min and minby is that min returns a minimum value, while minby returns the smallest element contained in its field

6.interval join

In a given period, join the two KeyedStream according to the specified key, pull the two event that meet the join condition together, and then what to do with the scenario defined by the user: pull the relevant packet data within a certain period of time into a wide table

7.connect and union

Connect is followed by connectedStreams, which applies different processing methods to the data of the two streams, and the state can be shared (such as counting) between the two streams. This is useful when the input to the first stream affects the second stream. Union merges multiple streams. The new stream contains data of all streams union is DataStream- > DataStream connect can only connect two streams, while union can connect more than two streams connect the two stream types can be inconsistent, and the flow types of union connections must be the same

8. The way data is transferred between operators

(1) One-to-one streams maintains the partition and order of elements. (2) the way of rezoning, the rezoning strategy depends on the operators keyby, broadcast, rebalance used.

DataStream.shuffle () randomly divides elements according to uniform distribution, and the network overhead is often large. The dataStream.rebalance () loop partitions the elements, creating an equal load for each partition. DataStream.rescale () is very useful when solving data skew. DataStream.rescale () is similar to rebalance, but not global. Elements are sent from a subset of upstream task to a subset of downstream task dataStream.broadcast () through polling scheduling to broadcast elements to each partition.

Comparison of three times of 9.flink 9.1 EventTime

9.1.1 the time when events are generated exists as soon as they enter the Flink. You can extract 9.1.2 from the field of event. 9.1.2 you must specify the watermarks mode of production 9.1.3 advantages: certainty, disorder, delay, or data playback can give the correct result 9.1.4 weaknesses: performance and latency are affected when dealing with unordered events.

9.2 IngerstTime

9.2.1 the time when events enter flink, that is, the current system time obtained in source. Subsequent unified use of this time 9.2.2 does not require specifying watermarks's production paradigm (automatic generation) 9.2.3 weakness: can not handle unordered events and delayed data

9.3 ProcessingTime

9.3.1 current system time of the machine performing the operation (each operator is different) 9.3.2 does not require coordination between streams and machines 9.3.3 advantages: best performance and lowest latency 9.3.4 weakness: uncertainty, vulnerable to a variety of factors (speed generated by event, speed to flink, transfer speed between operators), regardless of order and delay

9.4 comparison

Performance: ProcessingTime > IngestTime > EventTime latency: ProcessingTimeProcessIngTime

Do not set the time type. By default, processingTime sets the time type through the env.setStreamTimeCharacteristic () method.

10.watermark10.1 description

10.1.1 normally, watermark is generated in the source function, but it can also be at any stage after source, and if specified multiple times, the previous value will be overwritten. Each sub task of source generates a watermark independently. 10.1.2 when watermark passes the operation, it advances the event time of the operator operation and generates a new watermark 10.1.3 for the downstream multi-input operator (union, keyby, partition). The current event time is its input stream event time minimum.

10.2 two kinds of watermark10.2.1 periodic watermark

(1) implement AssignerWithPeriodicWatermarks interface based on time (2) ExecutionConfig.setAutoWatermarkInterval (msec) (default 200ms, set the cycle of watermarker occurrence) (3)

10.2.2 intermittent watermark

(1) production and transmission of watermark based on certain times (implemented by user code, for example, in special cases) (2) implement AssignerWithPeriodicWatermarks interface

11 processing delay data

Method 1: allowedLateness (), set the maximum delay time, trigger is delayed, should not be set too big way 2: sideOutputTag, provides a way to delay data acquisition, so that the data will not be discarded, delay data processing separately.

At the same time, the side output stream is also a way of shunting, for example, a stream can be divided into several different streams sink to different targets.

12 window 12.1 Type of window

12.1.1 Keyed Windows (build multitasking parallel window on the basis of already installed keyby grouping (KeyedStream) stream.keyBy () .window () 12.1.2 Non-Keyed Windwos (build single-task Window on ungrouped DataStream, parallelism is 1meme API with ALL suffix) stream.windowAll ()

12.2 Life cycle of the window

Create: the window is created when the first element arrives: when the time (event/process time) exceeds the window's end timestamp + user-specified delay (allowedLateness), the window will be removed

13 flip-flop and drive 13.1 flip-flop

Triggers determine when a window can be processed by a window function (triggers and signals when conditions are met) each WindowAssigner has a default trigger, which can be specified by trigger () if the default trigger does not meet the need

Triggers have five methods to allow triggers to handle different events (trigger) onElement () method each element is added to the window is called onEvenTime () when a registered event time timer is started, onProcessingTime is called when a registered processing time timer is started, onMerge is called related to state triggers when a registered processing time timer is started, and windows corresponding to two triggers are merged when session window is used Triggered when the status of merging two triggers clear corresponding window is cleared

13.2 expeller

Evictor is optional. By default, WindowAssigner does not have evictor evictor to delete useless elements from the window after Trigger is triggered and before and / or after the application window function is executed, similar to the role of filter. EvictBefore applies evictAfter before the window and applies it after the window.

14 how to allow delay

14.1 when dealing with the windwo of event-time, the element may arrive late, that is, the watermark used by flink to track the progress of event-time has passed the last time of the window to which the element belongs, and the data belonging to the current window will arrive) 14.2 by default, when the watermark has passed the last time of the window, late elements are discarded 14.3 Flink allows you to specify a maximum allowable delay for window operations, Allowed lateness specifies By default, the 014.4 water level line comes after the last time of the window. If the delay time is added before the last time of the window, it can still be calculated in the window.

Special case: when using GlobalWindows (global window), delay is not considered because the end timestamp of the window is Long.MAX_VALUE

15 state statu

Flink status: generally refers to the state of a specific task/operator in memory at a certain time (such as the value of an attribute) Note: State and checkpointing do not confuse checkpoint represents a flink job, a global status snapshot at a specific time, that is, it contains the status of all task/operator at a certain time under a job.

15.1 misuse of status

15.1.1 incremental computing a) aggregation operation b) machine learning training model keeps the current model during iterative operation 15.1.2 fault tolerance a) job fault restart b) flink program upgrade

15.2 Classification of statu

15.2.1 Operator State status of each stream normal Operator 15.2.2 status of Keyed State Keyed Streaming 15.2.3 Special: Broadcast State (1.5 start)

Data structures supported by Keyed State (1) ValueState (2) ListState (3) ReducingState (4) AggregatingState (5) FoldingState (6) MapState

Note: (1) the state is not necessarily stored internally, it may reside on disk or elsewhere. (2) the state uses the RunntimContext method, so it can only be accessed in the Rich function.

16 checkpoint state fault tolerance

With state, you naturally need state fault tolerance, otherwise it will be meaningless. The fault tolerance mechanism of flink state is that checkpoint checkpoint is implemented through distributed snapshot. Without special declaration, snapshot and checkpoint and back-up have the same meaning.

16.1 Features

(1) Asynchronous (2) both full volume and increment can be set (3) Barrier mechanism (4) checkpoint (5) periodicity that can be rolled back to the most recent success in case of failure

16.2 use of checkpoint preconditions

(1) datasource that can be traced back within a certain period of time, such as kafka, rabiitma, hdfs (2) persistent storage state storage system, usually using a distributed file system, usually hdfs,s3,nfs

Checkmode: generally choose EXACTLY_ONCE, unless the scenario requires a very low level of AT_LEAST_ONCE (milliseconds)

16.3 checkpoint Advanced option value retention Policy

By default, checkpoints are not retained and are only used to recover jobs from failures. You can enable external persistence checkpoints while specifying the retention policy checkpointConfg.enableExternalizedCheckpoints (CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) (1) CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION to retain checkpoints when the job is cancelled. After this situation is cancelled, the checkpoint must be cleared manually (2) CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION deletes the checkpoint when the job is cancelled (cancel), which means it is not enabled.

SetCheckpointTimeout sets the timeout. If the checkpoint is not completed, the minimum interval of setMinPauseBetweenCheckpoints will be terminated. At least how long will the last checkpoint wait for the next checkpoint request setMaxConcurrentCheckpoints to specify how much parallelism is in operation for checkpoint?

16.4 the second step is to select the appropriate State Backed using checkpoint

16.4.1 default State is saved in taskmanager memory 16.4.2 the checkpoint mechanism persists consistent snapshot preservation of all states determined by StateBackend. Currently, flink comes with three State Backed: (1) MemoryStateBackend (default) (2) FsStateBackend (3) RocksDBStateBackend

16.5 MemoryStateBackend

16.5.1 MemoryStateBackend is an internal state backend that maintains state on the Java heap. When the Key/value state and window operators contain a hash table of 16.5.2 Checkpoint that stores values and timers, MemoryStateBackend takes a snapshot of state and sends checkpoint confirmation messages like jobManager with this snapshot data, and then the snapshot is stored in JobManager heap memory 16.5.3 MemoeyStateBackend turns on asynchronous mode for snapshots by default. It is recommended to use asynchronous to avoid blocking. If you want to block, you can pass false as follows: val memoryStateBackend:StateBackend=new MemoryStateBackend (1010241024 false) env.setStateBackend (memoryStateBackend) 16.5.4 restriction: a single state default 5mb, which can be specified in the constructor of MemoryStateBackend. No matter how you set it, the State size cannot be greater than akka.framesize (the default 10mb for the size of the largest message sent between JobManager and TaskManager). Job Manager must have enough memory for 16.5.5 scenarios: develop and test small-state job locally, such as using only Map FlatMap Fliter or Kaka Consumer

16.6 FsStateBackend

16.6.1 FsStateBackend needs to configure a file system URL, such as hdfs://namenode:8080/flink/checkpoint 16.6.2 FsStateBackend, to hold the data being processed in TaskManager's memory. Write state snapshot to a file in the file system directory when checkpoint. 16.6.3 FsStateBackend enables asynchronous snapshots by default. The construction method is as follows: val stateBackend:StateBackend=new FsStateBackend ("hdfs://namenode:9000/flink/checkpoint", false) env.setStateBackend (stateBackend) 16.6.4 applicable scenarios: job with large state, long window and large key / value status

16.7 、 RocksDBStateBackend

16.7.1 RocksDBStateBackend requires a file system URL to be configured. For example, the running data of hdfs://namenode:8080/flink/checkpoint 16.7.2 RocksDBStateBackend is saved in the RockDB database and stored in the TaskManager data directory by default. In Checkpoint, the entire RocksDB database will be checkpointed to the configured file system and directory 16.7.3 RocksDBSateBackend is always asynchronous 16.7.4 RocksDB JNI API is based on Byte [], so key and value support up to 2 ^ 31 bytes (2GB) 16.7.5 applicable scenarios: large window, large state, large key / value state job 16.7.6 only RockDBStateBackend supports incremental checkpoint 16.7.7 state saving in data blocks It is only limited by the amount of available disk space, but it is more expensive (read / write needs to be deserialized and serialized), and a guide package is required to receive restrictions on usage:

Org.apache.flinkflink-statebackend-rocksdb_$ {scala.binary.version} ${flink.version} val stateBackend:StateBackend=new RocksDBStateBackend ("hdfs://namenode:9000/flink/checkpoint", true) env.setStateBackend (stateBackend)

Configure restart policy Flink to support different restart policies that control how to restart job env.setRestartStrategy in the event of a failure (RestartStrategies.fixedDelayRestart (3, Time.of (10, TimeUnit.SECONDS) (1) if checkpoint is not used, no restart scheme is used (2) if checkpoint is enabled but no restart scheme is configured, a fixed delay policy is used and the number of attempts is Integer.MAX_VALUE

At this point, I believe you have a deeper understanding of "what are the relevant knowledge points in flink?" you might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report