In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "what is the Transformation data processing method of flink". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Next, let the editor take you to learn "what is the Transformation data processing method of flink?"
Transformation data processing
The process of generating a new DataStream from one or more DataStream is called Transformation. During the conversion process, each operation type is defined as a topology in which a different Operator,Flink can combine multiple Transformation into a single DataFlow.
So the conversion operation of DataStream can be divided into three types: SingleDataStream, MultiDataStream, and physical partition.
SingleDataStream: processing logic for a single DataStream.
MultiDataStream: processing logic for multiple DataStream.
Physical partitioning: processing logic for adjusting and transforming parallelism and data partitions in a dataset.
SingleDataStreamMap
It is commonly used to clarify and transform the data in the dataset. For example, add 1 to each value of the input data, and output the data to the downstream.
Val dataStream = evn.formElements (("a", 3), ("d", 4), ("c", 4), ("c", 5), ("a", 5)) / / method-val mapStream:DataStream [(String,Int)] = dataStream.map (t = > (t. New MapFunction. String,Int. (String) / / method 2 val mapStream:DataStream [(String,Int)] = dataStream.map (new MapFunction [(String,Int), (String,Int)] {override def map (t: (String)) Int)): (String,Int) = {(t.room1, t.room1)}}) FlatMap
It is mainly used to process the input of one element into multiple element scenarios, such as WordCount, to segment the unlined text data and generate word sequences.
Val dataStream:DataStream [String] = environment.fromCollections () val resultStream [String] = dataStream.flatMap {str = > str.split (")} Filter
Filter the input dataset according to the criteria and output the data that meets the criteria.
/ / wildcard val filter:DataStream [Int] = dataStream.filter {_% 2 = = 0} / / Operation expression val filter:DataStream [Int] = dataStream.filter {x = > x% 2 = = 0} KeyBy
Partition the input dataset according to the specified key, placing the data with the same key value in the same area.
Put the same data with subscript 1 in a partition
Val dataStream = env.fromElements ((1jre 5), (2jue 2), (2je 4), (1pm 3)) / / specify the first field as partition keyval keyedStream: KeyedStream [(String,Int), Tuple] = dataSteam.keyBy (0) Reduce
Basically consistent with the principle of reduce in MapReduce, the input KeyedStream is scrolled into the user-defined ReduceFunction for data aggregation, and the defined ReduceFunction must meet the operation combination law and exchange law.
Val dataStream = env.fromElements (("a", 3), ("d", 4), ("c", 2), ("c", 5), ("a", 5)) / / specify the first field as the partition keyval keyedStream: KeyedStream [(String,Int), Tuple] = dataSteam.keyBy (0) / / implementation 1: scroll the second field to add the sum val reduceStream = keyedStream.reduce {(t 1) T2) = > (t 1.q1) T1._2+t2._2)} / / implementation 2: implement ReduceFunctionval reduceStream1 = keyedStream.reduce (new ReduceFunction [(String,Int)] {override def reduce (T1: (String,Int), T2: (String,Int)): (String, int) = {(t1.room1, t1._2+t2._2)}))
The running result is: (cMagne2) (cMagne7) (aMagne3) (dmagin4) (aPreline 8), the result is not the final summation value, but the output of each record after summing up.
Aggregations
The aggregation operator provided by DataStream performs aggregation operations according to the specified fields and scrolls to produce a series of data aggregation results. In fact, the functions in the Reduce operator are encapsulated, and the encapsulated aggregation operations include sum, min, minBy, max, maxBy and so on. This eliminates the need for users to define their own Reduce functions.
Val dataStream = env.fromElements ((1jre 5), (2jue 2), (2je 4), (1je 3)) / / specify the first field as partition keyval keyedStream: KeyedStream [(String,Int), Tuple] = dataSteam.keyBy (0) / / A pair of second fields do sum statistics val sumStream: DataStream [(Int,Int)] = keyedStream.sum (1) / / output statistical results sumStream.print ()
The arguments passed in the aggregate function must be numeric, otherwise an exception will be thrown.
/ / Statistical calculation specifies key minimum value val minStream: DataStream [(Int,Int)] = keyedStream.min (1) / / Statistical calculation specifies key maximum value val maxStream: DataStream [(Int,Int)] = keyedStream.max (1) / / Statistical calculation specifies key minimum value, and returns the minimum corresponding element val minByStream: DataStream [(Int,Int)] = keyedStream.minBy (1) / / Statistical calculation specifies key maximum value Returns the maximum corresponding element val maxByStream: DataStream [(Int,Int)] = keyedStream.maxBy (1) MultiDataStreamUnio
To merge two or more input datasets into one dataset, you need to ensure that the input dataset to be merged and the output dataset have the same format.
/ / create different datasets val dataStream1: DataStream [(String, Int)] = env.fromElements (("a", 3), ("d", 4), ("c", 2), ("c", 5), ("a", 5)) val dataStream2: DataStream [(String, Int)] = env.fromElements (("d", 1), ("s", 2), ("a", 4), ("e", 5), ("a", 6) val dataStream3 [(String) Int)] = env.fromElements (("a", 2), ("d", 1), ("s", 2), ("c", 3), ("b", 1)) / / merge two datasets val unionStream = dataStream1.union (dataStream2) / / merge multiple datasets val allUnionStream = dataStream1.union (dataStream2,dataStream3) Connect,CoMap,CoflatMap
In order to merge two or more different types of data sets, the operator retains the number types of the original data sets after the merge. The connection operation allows you to share state data, that is, you can manipulate and view the status of each other's datasets between multiple datasets.
Example: dataStream1 dataset is (String,Int) meta-ancestor type, and dataStream2 dataset is Int type. The two types of data are combined together through connect connection to form a dataset with the format of ConnectedStream. The internal data is a mixed data type of [(String,Int), Int], and the data types of the two datasets are retained.
Val dataStream1: DataStream [(String, Int)] = env.fromElements (("a", 3), ("d", 4), ("c", 2), ("c", 5), ("a", 5)) val dataStream2: DataStream [Int] = env.fromElements (1, String, Int) / / join two datasets val connectedStream: ConnectedStreams [(String, Int), Int] = dataStream1.connect (dataStream2)
Note: a dataset of type ConnectedStreams cannot be operated like Print () and needs to be converted to a dataset of type DataStream.
When ConnectedStreams provides map () and flatMap (), you need to define either CoMapFunction or CoFlatMapFunction to process the input DataStream dataset, or pass it directly into MapFunction to handle the two datasets, respectively.
An example of map () is as follows:
Val resultStream = connectedStream.map (new CoMapFunction [(String,Int), Int, (Int, String)] {/ / define the first dataset function processing logic Input value is the first DataStream override def map1 (in1: (String,Int)): (Int,String) = {(int1._2, in1._1)} / / define the second dataset function processing logic override def amp2 (in2: Int): (Int,String) = {(int2, "default")}})
In the above example, the two functions are executed alternately by multiple threads to produce the result, and finally the target dataset is generated according to the definition.
CoFlatMapFunction is specified in the flatMap () method. The two functions share the number variable, as follows:
Val resultStream2 = connectedStream.flatMap (new CoFlatMapFunction [(String,Int), Int, (String,Int, Int)] {/ / define the shared variable var number=0 / / define the first dataset processing function override def flatMap1 (in1: (String,Int), collector: Collector [(String,Int, Int)]): Unit = {collector.collect ((in1._1,in1._2,number))} / / define the second dataset handler override def flatMap2 (in2: Int) Collector: Collector [(String, Int, Int)]): Unit = {number=in2}})
If you want to associate two datasets with specified conditions, you can use keyBy Han Shuo or broadcast broadcast variables to do so. KeyBy routes data from the same key in the same Operator. Broadcast broadcasts the DataStream2 dataset to all parallel computing Operator before executing the computing logic, and then associates the dataset according to conditions. These two methods are essentially the basic implementation of distributed join operators.
/ / connect two datasets val keyedConnect according to the specified key through the keyby function: ConnectedStreams [(String, Int), Int] = dataStream1.connect (dataStream2) .keyby (1J0) / / Associate two datasets val broadcastConnect through broadcast: BroadcastConnectedStream [(String, Int), Int] = dataStream1.connect (dataStream2.broadcast () split)
The process of splitting a DataStream data set according to conditions to form two data sets, the reverse operation of union. Example: call the split function, specify the condition judgment, mark the data set according to the parity of the second field, mark the even number as event, the odd number as odd, and then return the tag through the collection, and finally generate the SplitStream data set.
/ / create a dataset val DataStream1: DataStream [(String,Int)] = env.fromElements (("a", 3), ("d", 4), ("c", 2), ("c", 5), ("a", 5)) / / merge a DataStream dataset val splitedStream: SplitStream [(String,Int)] = dataStream1.split (t = > if (t.dataset 2% = 0) Seq ("even") else Seq ("odd")) Select
The split function just marks the data and does not split the data, so you need the select function to cut the data into different datasets based on the tag.
/ / filter out even data sets val evenStream: DataStream [(String,Int)] = splitedStream.select ("even") / / filter out odd data sets val oddStream: DataStream [(String,Int)] = splitedStream.select ("odd") / / filter out even and odd data sets val allStream: DataStream [(String,Int)] = splitedStream.select ("even", "odd") Iterate
Iterate is suitable for iterative calculation, through each iterative calculation, and the calculation results are fed back to the next iterative calculation.
/ / create a dataset, which is processed by map to balance the data partition according to the default parallelism val DataStream = env.fromElements. Map {t:Int = > t} val iterated = dataStream.iterate ((input: ConnectedStreams [Int, String]) = > {/ / define two map processing datasets, the first map feedback operation The second map outputs data to downstream val head= input.map (I = > (iTun1) .toString, s = > s) (head.filter (_ = = "2"), head.filter (_! = "2")}, 1000) / / more than 1000ms without data access to terminate iterative physical partition
According to the specified partitioning policy, the data is redistributed to the Task instances of different nodes to optimize the partitioning control of the DataStream itself API to the data.
Random partitioning (Random Partitioning)
If the data in the dataset is randomly assigned to each partition of the downstream operator, the advantages of the data are relatively balanced, while the disadvantages of losing the partition structure of the original data.
Val shuffleStream=dataStream.shuffle balanced partition (Roundrobin Partitioning)
The cycle repartitions the data in the data set, which can ensure the data balance of each partition as much as possible, and can effectively solve the skew problem of the data set.
Val shuffleStream= dataStream.rebalance (); Rescaling partitioning
A partition strategy for data rebalancing through a circular way, which is different from Roundrobin Partitioning in that it only rebalances the operator data inherited from the upstream and downstream, which is mainly determined by the parallelism of the upstream and downstream operators. For example, if the concurrency degree of the upstream operator is 2 and that of the downstream operator is 4, the first partition data in the upstream operator routes the data in the same proportion in the two fixed partitions downstream, and the other partition is the same.
/ / implement Rescaling Partitioning operation val shuffleStream = dataStream.rescale () by calling the rescale () method in DataStream API; broadcast operation
Copy the input data set to the parallel Tasks instance of the downstream operator, and the downstream operator Tasks can obtain the broadcast data set directly from the local memory and is no longer dependent on network transmission.
This partitioning strategy is suitable for small clusters. For example, when big data sets are associated with small datasets, small data can be distributed to the partitions of operators by broadcast.
/ / realize the custom partition of broadcast partition val shuffleStream= dataStream.broadcast () through the broadcast () method of DataStream API
Implement a custom divider and call the partitionCustom () method on DataStream API to apply the created divider to the dataset.
As follows, the custom divider implements that the data containing the flink keyword in the field is placed in the partition with a partition of 0, and the rest of the data implements a random partitioning policy, where num Partitions is the parallelism parameter obtained from the system.
Object customPartitioner extends Partitioner [String] {/ / get random number generator val r=scala.util.Random override def partition (key: String, numPartitions: Int): Int = {/ / define partition policy. If an is included in key, it is put into 0 partition. In other cases, if (key.contains ("flink")) 0 else r.nextInt (numPartitions)} is randomly partitioned according to Partitions num.
Complete the custom divider and call DataStream API's partitionCustom application divider. The second parameter specifies the field used by the divider. For Tuple type data, the partition field can be specified by field name, and other types of data sets can be specified by location index.
/ / specify the partition field dataStream.partitionCustom (customPartitioner, "filed_name") through the dataset field name; / / specify the partition field dataStream.partitionCustom (customPartitioner,0) through the dataset field index. I believe you have a deeper understanding of "what is the Transformation data processing method of flink". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.