In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "what are the relevant knowledge points in flink". Friends who are interested might as well take a look. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "what are the relevant knowledge points in flink"?
Advantages of 1.OperatorChain1.1 OperatorChain
1.1.1 reduce thread switching 1JI 1.2 reduce serialization and deserialization 1.1.3 reduce data exchange in buffers 1.1.4 reduce latency and improve throughput
1.2 OperatorChain composition conditions
1.2.1 Chain 1.2.2 the parallelism of upstream and downstream operators is consistent 1.2.3 the penetration of downstream operators is 1 (that is, there is no input from other nodes in the downstream nodes) 1.2.4 the chain policy of upstream and downstream operators in the same slot group 1.2.5 is always (can be linked with upstream and downstream) Map, flatmap, filter, etc. Default is always) chain policy with nodes on 1.2.6 is always or head (can only link with downstream, not link with above, source default is head) 1.2.7 there is no data shuffle between upstream and downstream operators (data partitioning method is forward)
1.3 several ways to disable OperatorChain
1.3.1 call startNewChain operator 1.3.2 DataStream calls disableChaining after operator operation of 1.3.1 DataStream to close Chain 1.3.3 StreamExecutionEnvironment.getExecutionEnvironment.disableOperatorChaining () globally close 1.3.4 DataStream.slotSharingGroup ("name") set the new slotgrop name 1.3.5 to change parallelism
The relationship between 2.slot2.1 slot and parallelism
The default number of task slot is consistent with the maximum parallelism of task in join
2.2 shared slot
2.2.1 the task slot required by the flink cluster is exactly the same as the maximum parallelism used in the job (premise, keep the default SlotSharingGroup). In other words, we no longer need to calculate the total number of task generated by a program. 2.2.2 properly setting slotSharingGroup can reduce the number of threads running per slot, thus reducing the overall load on the machine.
3. Accumulator and counter
Counter 3.1is the simplest accumulator. 3.2.The built-in accumulator has IntCounter,LongCounter,DoubleCounter 3.3Histogram histogram.
4. Control delay
By default, the elements in the flow are not transmitted one by one across the network (which leads to unnecessary network traffic consumption), but are cached, and the size of the cache can be controlled in the Flink configuration file, ExecutinEnvironment, or setting an operator to configure (default 100ms) the benefits of such control: increased throughput disadvantages: increased latency
How to balance: (1) for maximum throughput, you can set setBufferTimeout (- 1), which removes the timeout mechanism, and the data in the cache will be sent as soon as it is full. It is not recommended. If a message takes 4 or 5 hours to arrive, the delay will be very high. It will wait for the whole buffer to be full before processing (2) to minimize the delay. You can set the timeout to a number close to 0 (for example, 5 or 10ms) (3) the cache timeout should not be set to 0, because it will cause some performance loss.
5.min minby max maxby
The difference between min and minby is that min returns a minimum value, while minby returns the smallest element contained in its field
6.interval join
In a given period, join the two KeyedStream according to the specified key, pull the two event that meet the join condition together, and then what to do with the scenario defined by the user: pull the relevant packet data within a certain period of time into a wide table
7.connect and union
Connect is followed by connectedStreams, which applies different processing methods to the data of the two streams, and the state can be shared (such as counting) between the two streams. This is useful when the input to the first stream affects the second stream. Union merges multiple streams. The new stream contains data of all streams union is DataStream- > DataStream connect can only connect two streams, while union can connect more than two streams connect the two stream types can be inconsistent, and the flow types of union connections must be the same
8. The way data is transferred between operators
(1) One-to-one streams maintains the partition and order of elements. (2) the way of rezoning, the rezoning strategy depends on the operators keyby, broadcast, rebalance used.
DataStream.shuffle () randomly divides elements according to uniform distribution, and the network overhead is often large. The dataStream.rebalance () loop partitions the elements, creating an equal load for each partition. DataStream.rescale () is very useful when solving data skew. DataStream.rescale () is similar to rebalance, but not global. Elements are sent from a subset of upstream task to a subset of downstream task dataStream.broadcast () through polling scheduling to broadcast elements to each partition.
Comparison of three times of 9.flink 9.1 EventTime
9.1.1 the time when events are generated exists as soon as they enter the Flink. You can extract 9.1.2 from the field of event. 9.1.2 you must specify the watermarks mode of production 9.1.3 advantages: certainty, disorder, delay, or data playback can give the correct result 9.1.4 weaknesses: performance and latency are affected when dealing with unordered events.
9.2 IngerstTime
9.2.1 the time when events enter flink, that is, the current system time obtained in source. Subsequent unified use of this time 9.2.2 does not require specifying watermarks's production paradigm (automatic generation) 9.2.3 weakness: can not handle unordered events and delayed data
9.3 ProcessingTime
9.3.1 current system time of the machine performing the operation (each operator is different) 9.3.2 does not require coordination between streams and machines 9.3.3 advantages: best performance and lowest latency 9.3.4 weakness: uncertainty, vulnerable to a variety of factors (speed generated by event, speed to flink, transfer speed between operators), regardless of order and delay
9.4 comparison
Performance: ProcessingTime > IngestTime > EventTime latency: ProcessingTimeProcessIngTime
Do not set the time type. By default, processingTime sets the time type through the env.setStreamTimeCharacteristic () method.
10.watermark10.1 description
10.1.1 normally, watermark is generated in the source function, but it can also be at any stage after source, and if specified multiple times, the previous value will be overwritten. Each sub task of source generates a watermark independently. 10.1.2 when watermark passes the operation, it advances the event time of the operator operation and generates a new watermark 10.1.3 for the downstream multi-input operator (union, keyby, partition). The current event time is its input stream event time minimum.
10.2 two kinds of watermark10.2.1 periodic watermark
(1) implement AssignerWithPeriodicWatermarks interface based on time (2) ExecutionConfig.setAutoWatermarkInterval (msec) (default 200ms, set the cycle of watermarker occurrence) (3)
10.2.2 intermittent watermark
(1) production and transmission of watermark based on certain times (implemented by user code, for example, in special cases) (2) implement AssignerWithPeriodicWatermarks interface
11 processing delay data
Method 1: allowedLateness (), set the maximum delay time, trigger is delayed, should not be set too big way 2: sideOutputTag, provides a way to delay data acquisition, so that the data will not be discarded, delay data processing separately.
At the same time, the side output stream is also a way of shunting, for example, a stream can be divided into several different streams sink to different targets.
12 window 12.1 Type of window
12.1.1 Keyed Windows (build multitasking parallel window on the basis of already installed keyby grouping (KeyedStream) stream.keyBy () .window () 12.1.2 Non-Keyed Windwos (build single-task Window on ungrouped DataStream, parallelism is 1meme API with ALL suffix) stream.windowAll ()
12.2 Life cycle of the window
Create: the window is created when the first element arrives: when the time (event/process time) exceeds the window's end timestamp + user-specified delay (allowedLateness), the window will be removed
13 flip-flop and drive 13.1 flip-flop
Triggers determine when a window can be processed by a window function (triggers and signals when conditions are met) each WindowAssigner has a default trigger, which can be specified by trigger () if the default trigger does not meet the need
Triggers have five methods to allow triggers to handle different events (trigger) onElement () method each element is added to the window is called onEvenTime () when a registered event time timer is started, onProcessingTime is called when a registered processing time timer is started, onMerge is called related to state triggers when a registered processing time timer is started, and windows corresponding to two triggers are merged when session window is used Triggered when the status of merging two triggers clear corresponding window is cleared
13.2 expeller
Evictor is optional. By default, WindowAssigner does not have evictor evictor to delete useless elements from the window after Trigger is triggered and before and / or after the application window function is executed, similar to the role of filter. EvictBefore applies evictAfter before the window and applies it after the window.
14 how to allow delay
14.1 when dealing with the windwo of event-time, the element may arrive late, that is, the watermark used by flink to track the progress of event-time has passed the last time of the window to which the element belongs, and the data belonging to the current window will arrive) 14.2 by default, when the watermark has passed the last time of the window, late elements are discarded 14.3 Flink allows you to specify a maximum allowable delay for window operations, Allowed lateness specifies By default, the 014.4 water level line comes after the last time of the window. If the delay time is added before the last time of the window, it can still be calculated in the window.
Special case: when using GlobalWindows (global window), delay is not considered because the end timestamp of the window is Long.MAX_VALUE
15 state statu
Flink status: generally refers to the state of a specific task/operator in memory at a certain time (such as the value of an attribute) Note: State and checkpointing do not confuse checkpoint represents a flink job, a global status snapshot at a specific time, that is, it contains the status of all task/operator at a certain time under a job.
15.1 misuse of status
15.1.1 incremental computing a) aggregation operation b) machine learning training model keeps the current model during iterative operation 15.1.2 fault tolerance a) job fault restart b) flink program upgrade
15.2 Classification of statu
15.2.1 Operator State status of each stream normal Operator 15.2.2 status of Keyed State Keyed Streaming 15.2.3 Special: Broadcast State (1.5 start)
Data structures supported by Keyed State (1) ValueState (2) ListState (3) ReducingState (4) AggregatingState (5) FoldingState (6) MapState
Note: (1) the state is not necessarily stored internally, it may reside on disk or elsewhere. (2) the state uses the RunntimContext method, so it can only be accessed in the Rich function.
16 checkpoint state fault tolerance
With state, you naturally need state fault tolerance, otherwise it will be meaningless. The fault tolerance mechanism of flink state is that checkpoint checkpoint is implemented through distributed snapshot. Without special declaration, snapshot and checkpoint and back-up have the same meaning.
16.1 Features
(1) Asynchronous (2) both full volume and increment can be set (3) Barrier mechanism (4) checkpoint (5) periodicity that can be rolled back to the most recent success in case of failure
16.2 use of checkpoint preconditions
(1) datasource that can be traced back within a certain period of time, such as kafka, rabiitma, hdfs (2) persistent storage state storage system, usually using a distributed file system, usually hdfs,s3,nfs
Checkmode: generally choose EXACTLY_ONCE, unless the scenario requires a very low level of AT_LEAST_ONCE (milliseconds)
16.3 checkpoint Advanced option value retention Policy
By default, checkpoints are not retained and are only used to recover jobs from failures. You can enable external persistence checkpoints while specifying the retention policy checkpointConfg.enableExternalizedCheckpoints (CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) (1) CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION to retain checkpoints when the job is cancelled. After this situation is cancelled, the checkpoint must be cleared manually (2) CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION deletes the checkpoint when the job is cancelled (cancel), which means it is not enabled.
SetCheckpointTimeout sets the timeout. If the checkpoint is not completed, the minimum interval of setMinPauseBetweenCheckpoints will be terminated. At least how long will the last checkpoint wait for the next checkpoint request setMaxConcurrentCheckpoints to specify how much parallelism is in operation for checkpoint?
16.4 the second step is to select the appropriate State Backed using checkpoint
16.4.1 default State is saved in taskmanager memory 16.4.2 the checkpoint mechanism persists consistent snapshot preservation of all states determined by StateBackend. Currently, flink comes with three State Backed: (1) MemoryStateBackend (default) (2) FsStateBackend (3) RocksDBStateBackend
16.5 MemoryStateBackend
16.5.1 MemoryStateBackend is an internal state backend that maintains state on the Java heap. When the Key/value state and window operators contain a hash table of 16.5.2 Checkpoint that stores values and timers, MemoryStateBackend takes a snapshot of state and sends checkpoint confirmation messages like jobManager with this snapshot data, and then the snapshot is stored in JobManager heap memory 16.5.3 MemoeyStateBackend turns on asynchronous mode for snapshots by default. It is recommended to use asynchronous to avoid blocking. If you want to block, you can pass false as follows: val memoryStateBackend:StateBackend=new MemoryStateBackend (1010241024 false) env.setStateBackend (memoryStateBackend) 16.5.4 restriction: a single state default 5mb, which can be specified in the constructor of MemoryStateBackend. No matter how you set it, the State size cannot be greater than akka.framesize (the default 10mb for the size of the largest message sent between JobManager and TaskManager). Job Manager must have enough memory for 16.5.5 scenarios: develop and test small-state job locally, such as using only Map FlatMap Fliter or Kaka Consumer
16.6 FsStateBackend
16.6.1 FsStateBackend needs to configure a file system URL, such as hdfs://namenode:8080/flink/checkpoint 16.6.2 FsStateBackend, to hold the data being processed in TaskManager's memory. Write state snapshot to a file in the file system directory when checkpoint. 16.6.3 FsStateBackend enables asynchronous snapshots by default. The construction method is as follows: val stateBackend:StateBackend=new FsStateBackend ("hdfs://namenode:9000/flink/checkpoint", false) env.setStateBackend (stateBackend) 16.6.4 applicable scenarios: job with large state, long window and large key / value status
16.7 、 RocksDBStateBackend
16.7.1 RocksDBStateBackend requires a file system URL to be configured. For example, the running data of hdfs://namenode:8080/flink/checkpoint 16.7.2 RocksDBStateBackend is saved in the RockDB database and stored in the TaskManager data directory by default. In Checkpoint, the entire RocksDB database will be checkpointed to the configured file system and directory 16.7.3 RocksDBSateBackend is always asynchronous 16.7.4 RocksDB JNI API is based on Byte [], so key and value support up to 2 ^ 31 bytes (2GB) 16.7.5 applicable scenarios: large window, large state, large key / value state job 16.7.6 only RockDBStateBackend supports incremental checkpoint 16.7.7 state saving in data blocks It is only limited by the amount of available disk space, but it is more expensive (read / write needs to be deserialized and serialized), and a guide package is required to receive restrictions on usage:
Org.apache.flinkflink-statebackend-rocksdb_$ {scala.binary.version} ${flink.version} val stateBackend:StateBackend=new RocksDBStateBackend ("hdfs://namenode:9000/flink/checkpoint", true) env.setStateBackend (stateBackend)
Configure restart policy Flink to support different restart policies that control how to restart job env.setRestartStrategy in the event of a failure (RestartStrategies.fixedDelayRestart (3, Time.of (10, TimeUnit.SECONDS) (1) if checkpoint is not used, no restart scheme is used (2) if checkpoint is enabled but no restart scheme is configured, a fixed delay policy is used and the number of attempts is Integer.MAX_VALUE
At this point, I believe you have a deeper understanding of "what are the relevant knowledge points in flink?" you might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.