In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail the example analysis of flink state management keyed for you. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.
Flink has two main basic types of states: keyed state and operator state.
1Keyed State
Keyed State is always related to keys and can only be used for functions and operations on KeyedStream.
You can think of a Keyed State as an Operator State that has been fragmented or partitioned, and each key has one and only one state-partition. Each keyed-state is logically bound to a unique combination, and since each key "belongs" to a parallel instance of keyed operator, we can simply think of it as.
Keyed State is further organized into what is called Key Groups. Key Groups is the atomic unit in which Flink can reassign keyed State. The number of Key Groups is equal to the maximum parallelism defined. During the execution of a parallel instance of keyed operator, it works with one or more Key Groups.
2Raw and Managed State
There are two forms of Keyed State and Operator State: managed and raw.
Managed State indicates that the data structure is controlled by Flink runtime, such as an internal hash table or RocksDB. For example, "ValueState", "ListState", etc. The runtime layer of Flink encodes State and writes it to checkpoint.
Raw State is the state in which operators are stored in its data structure. When checkpoint is performed, it only writes a sequence of bytes to the checkpoint. Flink does not know the data structure of the state and can only see raw bytes.
All data flow functions can use managed state, but the raw state interface can only be used in the implementation class of the operator. Managed state (rather than raw state) is recommended because with managed state, Flink can automatically redistribute state or do better memory management when parallelism changes.
Note that if your managed state requires custom serialization logic, you need custom serialization of managed state to ensure future compatibility. Flink default serialization does not require special handling.
3Managed Keyed State
The managed keyed state interface provides access to different types of state of the key of the current input element. This means that this type of state can only be used in KeyedStream, which can be used through stream.keyBy (.) Create.
Now, let's first look at the different types of states, and then show how to use them in the program. The available states are:
ValueState: it holds a value that can be updated and queried (subject to the key of the input element mentioned above, each key seen by the operator may have only one value). You can update and query values using update (T) and T value ().
ListState: it holds a list of elements. You can add elements and retrieve Iterable to get all the currently stored elements. Add elements using the add (T) or addAll (List) method, and get the Iterable using the Iterable get () method. You can also use update (List) to overwrite an existing list.
ReducingState: it holds a result that aggregates all the values added to this state. The interface is the same as ListState, but the essence of using the add (T) method is to use the aggregate behavior of the specified ReduceFunction.
AggregatingState: it holds a result that aggregates all the values added to this state. Unlike ReducingState, the type of aggregation may be different from the type of elements added to the state. The interface is the same as ListState, but the elements added using add (IN) are essentially aggregated using the specified AggregateFunction.
FoldingState: it holds a result that aggregates all the values added to this state. Unlike ReducingState, the type of aggregation may be different from the type of elements added to the state. The interface is the same as ListState, but the elements added using add (IN) are essentially aggregated using the specified FoldFunction fold.
MapState: it holds a list of mappings. You can put key-value pairs into a state and retrieve all currently stored mapping relationships through Iterable. Add a mapping using put (UK, UV) or putAll (Map). Use get (UK) to get the value related to key. Use entries (), keys (), and values () to get views of mapping, key, and value, respectively.
All types of states have a clear () method that clears the state of the currently active key (that is, the key of the input element).
Note that FoldingState and FoldingStateDescriptor have been deprecated in Flink1.4 and may be completely deleted in the future. Please use AggregatingState and AggregatingStateDescriptor instead.
The first thing to remember is that these state objects can only be used to interact with states. State is not necessarily stored in memory, but it may be stored on disk or somewhere else. The second thing to keep in mind is that the value obtained from the state depends on the key of the input element. So if you include a different key, the value obtained by one call in your user function may be different from that obtained by another call.
To get the status handle, you must create a StateDescriptor. It maintains the names of states (as you will see later, you can create multiple states, so they must have unique names so that you can reference them), the type of values maintained by the state, and user-definable function, such as ReduceFunction. Depending on the type of status you want to query, you can create ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor or MapStateDescriptor.
Use RuntimeContext to access state, so it can only be used in richfunction. Information about rich function can be found here, but we will see an example soon. In RichFunction, RuntimeContext has these methods to access the status:
ValueState getState (ValueStateDescriptor) ReducingState getReducingState (ReducingStateDescriptor) ListState getListState (ListStateDescriptor) AggregatingState getAggregatingState (AggregatingState) FoldingState getFoldingState (FoldingStateDescriptor) MapState getMapState (MapStateDescriptor) public class CountWindowAverage extends RichFlatMapFunction {/ *. The first field is the count, the second field a running sum. * / private transient ValueState sum; @ Override public void flatMap (Tuple2 input, Collector out) throws Exception {/ / access the state value Tuple2 currentSum = sum.value (); / / update the count currentSum.f0 + = 1; / / add the second field of the input value currentSum.f1 + = input.f1; / / update the state sum.update (currentSum) / / if the count reaches 2, emit the average and clear the state if (currentSum.f0 > = 2) {out.collect (new Tuple2 (input.f0, currentSum.f1 / currentSum.f0)); sum.clear () } @ Override public void open (Configuration config) {ValueStateDescriptor descriptor = new ValueStateDescriptor ("average", / / the state name TypeInformation.of (new TypeHint () {}), / / type information Tuple2.of (0L, 0L)) / / default value of the state, if nothing was set sum = getRuntimeContext (). GetState (descriptor);}} / / this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env) env.fromElements (Tuple2.of (1L, 3L), Tuple2.of (1L, 5L), Tuple2.of (1L, 7L), Tuple2.of (1L, 4L), Tuple2.of (1L, 2L) .keyby (0) .flatMap (new CountWindowAverage ()) .print () / / the printed output will be (1heli4) and (1mem5) 4State Time-To-Live (TTL)
TTL can be used for any type of keyed state. If TTL is configured, a status value times out, and the stored value will be deleted at the appropriate time, as we will see later.
All state collection types support per-entry TTL. It means that the element of list and the entry of map can set the timeout separately.
The use of TTL is also very simple. You can refer to the following code:
Import org.apache.flink.api.common.state.StateTtlConfig;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig .newBuilder (Time.seconds (1)) .setUpdateType (StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility (StateTtlConfig.StateVisibility.NeverReturnExpired) .build (); ValueStateDescriptor stateDescriptor = new ValueStateDescriptor ("text state", String.class); stateDescriptor.enableTimeToLive (ttlConfig)
The newBuilder method is required.
There are two configurations for the Update type:
StateTtlConfig.UpdateType.OnCreateAndWrite: create and write StateTtlConfig.UpdateType.OnReadAndWrite: also have read function
Visual, that is, whether the data can still be read before it is deleted after the timeout, can be configured:
StateTtlConfig.StateVisibility.NeverReturnExpired-timeout elements never return StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp-if the data has not been deleted.
NeverReturnExpired once this parameter is configured, the timeout state can be regarded as no longer exists, even if it has not been deleted. This option is reliable in scenarios where TTL timeouts are stringent, such as dealing with privacy-sensitive data.
Tips:
The status backend (statebackend) stores a timestamp for each value of the user, which means that the storage cost is increased. The heap state backend (heap state backend) stores an additional java object (with a reference to the user state object) and an original long value in memory. The RocksDB state backend adds 8byte to each stored value (list entry or map entry).
Currently, TTL only supports processing time.
If you want to use a savepoint that does not use TTL to restore the currently specified TTL application, an exception will be reported.
Map state with TTL supports user's null values only if the serializer supports handling null values. If the serializer does not support null values, you can use nullableSerializer to wrap null values, which, of course, incur additional storage overhead.
5 timeout status clear
In the current case, the timeout state is deleted only when it is read, such as calling ValueState.value ().
Note: this means that if the timeout state is not read, it will not be deleted, and then the state will continue to grow. Look forward to changes in the future.
In addition, you can configure to delete the state when the full status snapshot (full state snapshot) is completed, which can also reduce the state size. The local state is not cleared under the current implementation mechanism, but deleted timeout snapshots are not protected during recovery from previous snapshots. The configuration method is as follows:
Import org.apache.flink.api.common.state.StateTtlConfig;import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig .newBuilder (Time.seconds (1)) .cleanupFullSnapshot () .build ()
This configuration is not suitable for incremental snapshot mechanisms, that is, the status back end cannot be RocksDB.
This is the end of the sample analysis of flink state management keyed. I hope the above content can be helpful to you and learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 292
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.