Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the common operator of Flink flow computation

2025-01-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces "what is the common operator of Flink flow calculation". In daily operation, I believe that many people have doubts about what the common operator of Flink flow calculation is. Xiaobian consulted all kinds of data and sorted out a simple and easy-to-use operation method. I hope it will be helpful for you to answer the doubt of "what is the common operator of Flink stream calculation?" Next, please follow the editor to study!

Flink, like Spark, is an one-stop processing framework; it can be processed in both batches (DataSet) and real-time (DataStream).

So the Flink operators are divided into two categories: one is DataSet, the other is DataStream.

DataSet-1, Source operator 1. FromCollection

FromCollection: reading data from a local collection

Example:

Val env = ExecutionEnvironment.getExecutionEnvironment

Val textDataSet: DataSet [String] = env.fromCollection (

List ("1, Zhang San", "2, Li Si", "3, Wang Wu", "4, Zhao Liu")

) 2. ReadTextFile

ReadTextFile: read from a file:

Val textDataSet: DataSet [String] = env.readTextFile ("/ data/a.txt") 3. ReadTextFile: traversing the directory

ReadTextFile can traverse access to all files in a file directory, including all files in all subdirectories:

Val parameters = new Configuration

/ / recursive.file.enumeration enables recursion

Parameters.setBoolean ("recursive.file.enumeration", true)

Val file = env.readTextFile ("/ data") .withParameters (parameters) 4. ReadTextFile: read compressed files

For the following compression types, there is no need to specify any additional inputformat methods, flink can automatically recognize and decompress. However, compressed files may not be read in parallel, but sequentially, which may affect the scalability of the job.

Can the compression method file extension read DEFLATE.deflatenoGZip.gz .gzipnoBzip2.bz2noXZ.xznoval file = env.readTextFile ("/ data/file.gz") 2. Transform conversion operator

Because the Transform operator is based on the Source operator operation, the Flink execution environment and the Source operator are constructed first, and the subsequent Transform operator operation is based on this:

Val env = ExecutionEnvironment.getExecutionEnvironment

Val textDataSet: DataSet [String] = env.fromCollection (

List ("Zhang San, 1", "Li Si, 2", "Wang Wu, 3", "Zhang San 4")

) 1. Map

Convert each element in the DataSet to another element:

/ / use map to convert List to a sample class of Scala

Case class User (name: String, id: String)

Val userDataSet: DataSet [User] = textDataSet.map {

Text = >

Val fieldArr = text.split (",")

User (fieldArr (0), fieldArr (1))

}

UserDataSet.print () 2. FlatMap

/ / use the flatMap operation to change the data in the collection:

/ / grouping according to the first element

/ / aggregate evaluation based on the second element

Val result = textDataSet.flatMap (line = > line)

.groupBy (0) / / grouped according to the first element

.sum (1) / / aggregate evaluation based on the second element

Result.print () 3. MapPartition

Convert an element in one partition to another:

/ / use mapPartition operation to convert List into a sample class of scala

Case class User (name: String, id: String)

Val result: DataSet [User] = textDataSet.mapPartition (line = > {

Line.map (index = > User (index._1, index._2))

})

Result.print () 4. Filter

Filter out some elements that meet the criteria and return elements with Boolean values of true:

Val source: DataSet [String] = env.fromElements ("java", "scala", "java")

Val filter:DataSet [String] = source.filter (line = > line.contains ("java")) / / filter out data with java

Filter.print () 5. Reduce

You can aggregate a dataset or a group, and finally aggregate into an element:

/ / use fromElements to build data sources

Val source = env.fromElements (("java", 1), ("scala", 1), ("java", 1))

/ / convert to DataSet tuple using map

Val mapData: DataSet [(String, Int)] = source.map (line = > line)

/ / grouped according to the first element

Val groupData = mapData.groupBy (_ .1)

/ / use reduce aggregation

Val reduceData = groupData.reduce ((x, y) = > (x.room1, x.room2 + y.room2))

/ / print test

ReduceData.print () 6. ReduceGroup

Aggregates an dataset or a group into one or more elements.

ReduceGroup is an optimization scheme of reduce.

It will first group the reduce, and then do the overall reduce;, which has the advantage of reducing the network IO:

/ / use fromElements to build data sources

Val source: DataSet [(String, Int)] = env.fromElements (("java", 1), ("scala", 1), ("java", 1))

/ / grouped according to the first element

Val groupData = source.groupBy (_ .1)

/ / use reduceGroup aggregation

Val result: DataSet [(String, Int)] = groupData.reduceGroup {

(in: Iterator [(String, Int)], out: Collector [(String, Int)]) = >

Val tuple = in.reduce ((x, y) = > (x.room1, x.room2 + y.room2))

Out.collect (tuple)

}

/ / print test

Result.print () 7. MinBy and maxBy

Select the element with the minimum or maximum value:

/ / use minBy operation to find the minimum value of everyone in List

/ List ("Zhang San, 1", "Li Si, 2", "Wang Wu, 3", "Zhang San, 4")

Case class User (name: String, id: String)

/ / convert List to a sample class of scala

Val text: DataSet [User] = textDataSet.mapPartition (line = > {

Line.map (index = > User (index._1, index._2))

})

Val result = text

.groupBy (0) / / grouped by name

.minBy (1) / / minimum value per person 8. Aggregate

Aggregate the maximum (maximum and minimum) values on the dataset:

Val data = new mutable.MutableList [(Int, String, Double)]

Data.+= ((1, "yuwen", 89.0)

Data.+= ((2, "shuxue", 92.2)

Data.+= ((3, "yuwen", 89.99)

/ / use fromElements to build data sources

Val input: DataSet [(Int, String, Double)] = env.fromCollection (data)

/ / use group to perform grouping operations

Val value = input.groupBy (1)

/ / use aggregate to find the maximum element

.clients (Aggregations.MAX, 2)

/ / print test

Value.print ()

Aggregate can only act on tuples

Note:

To use aggregate, you can only use the field index name or index name to group groupBy (0), otherwise an error will be reported:

Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.

9. Distinct

Remove duplicate data:

/ / the data source uses the previous question.

/ / use distinct operation to remove duplicate tuple data in the collection according to the account

Val value: DataSet [(Int, String, Double)] = input.distinct (1)

Value.print () 10. First

Take the number of top N:

Input.first (2) / / take the first two numbers 11. Join

Connect two DataSet together according to certain conditions to form a new DataSet:

/ / the data sets S1 and S2 are in the following format:

/ / DataSet [(Int, String,String, Double)]

Val joinData = s1.join (S2) / / S1 dataset join S2 dataset

Conditions for .where (0) .equalTo (0) {/ / join

(S1, S2) = > (s1.room1, s1.room2, s2.room2, s1.room3)

} 12. LeftOuterJoin

Left outer link, each element in the Dataset on the left, to connect the element on the right

There are also:

RightOuterJoin: right outer link, every element in the left Dataset, to connect the left element

FullOuterJoin: full external connection, left and right elements, all connected

Here is an example of leftOuterJoin:

Val data1 = ListBuffer [Tuple2 [Int,String]] ()

Data1.append ((1, zhangsan))

Data1.append (2, "lisi"))

Data1.append (3, "wangwu"))

Data1.append (4, "zhaoliu"))

Val data2 = ListBuffer [Tuple2 [Int,String]] ()

Data2.append ((1, beijing))

Data2.append (2, "shanghai"))

Data2.append (4, "guangzhou"))

Val text1 = env.fromCollection (data1)

Val text2 = env.fromCollection (data2)

Text1.leftOuterJoin (text2) .where (0) .equalTo (0). Apply ((first,second) = > {

If (second==null) {

(first._1,first._2, "null")

} else {

(first._1,first._2,second._2)

}

}). Print () 13. Cross

Cross operation to create a new data set by forming the Cartesian product of this data set and other data sets

Similar to join, but this crossover operation produces Cartesian product, which is a memory-consuming operation when the data is large:

Val cross = input1.cross (input2) {

(input1, input2) = > (input1._1,input1._2,input1._3,input2._2)

}

Cross.print () 14. Union

A joint operation to create a new dataset that contains elements from this dataset and other datasets without duplicating:

Val unionData: DataSet [String] = elements1.union (elements2) .union (elements3)

/ / remove duplicate data

Val value = unionData.distinct (line = > line) 15. Rebalance

Flink also has data skew, for example, there are about 1 billion pieces of data to be processed, and the situation shown in the figure may occur during the processing:

At this time, the problem that the overall amount of data only needed 10 minutes to be solved, the data was skewed, and the task on machine 1 took four hours to complete. then the other three machines have to wait for machine 1 to complete the task as a whole. So in the actual work, the better solution to this situation is the next introduction-rebalance, which internally uses the round robin method to break up the data evenly. This is a good choice when the data is skewed. )

/ / use rebalance operation to avoid data skew

Val rebalance = filterData.rebalance () 16. PartitionByHash

Partition the hash according to the specified key:

Val data = new mutable.MutableList [(Int, Long, String)]

Data.+= ((1,1L, "Hi"))

Data.+= ((2,2L, "Hello"))

Data.+= ((3,2L, "Hello world"))

Val collection = env.fromCollection (data)

Val unique = collection.partitionByHash (1) .mapPartition {

Line = >

Line.map (x = > (x.room1, x.room2, x.room3))

}

Unique.writeAsText ("hashPartition", WriteMode.NO_OVERWRITE)

Env.execute () 17. PartitionByRange

Range partitioning the dataset according to the specified key:

Val data = new mutable.MutableList [(Int, Long, String)]

Data.+= ((1,1L, "Hi"))

Data.+= ((2,2L, "Hello"))

Data.+= ((3,2L, "Hello world"))

Data.+= ((4,3L, "Hello world, how are you?"))

Val collection = env.fromCollection (data)

Val unique = collection.partitionByRange (x = > x.map1) .mapPartition (line = > line.map {

X = >

(x.room1, x.room2, x.room3)

})

Unique.writeAsText ("rangePartition", WriteMode.OVERWRITE)

Env.execute () 18. SortPartition

Sorts partitions according to the specified field values:

Val data = new mutable.MutableList [(Int, Long, String)]

Data.+= ((1,1L, "Hi"))

Data.+= ((2,2L, "Hello"))

Data.+= ((3,2L, "Hello world"))

Data.+= ((4,3L, "Hello world, how are you?"))

Val ds = env.fromCollection (data)

Val result = ds

.map {x = > x} .setParallelism (2)

.sortPartition (1, Order.DESCENDING) / / the first parameter represents which field to partition

.mapPartition (line = > line)

.clients ()

Println (result) 3, Sink operator 1. Collect

Export the data to the local collection:

Result.collect () 2. WriteAsText

Export data to a file

Flink supports files on a variety of storage devices, including local files, hdfs files, etc.

Flink supports a variety of file storage formats, including text files, CSV files, etc.

/ / write data to a local file

Result.writeAsText ("/ data/a", WriteMode.OVERWRITE)

/ / write data to HDFS

Result.writeAsText ("hdfs://node01:9000/data/a", WriteMode.OVERWRITE) DataStream

Like DataSet, DataStream includes a series of Transformation operations.

1. Source operator

Flink can use StreamExecutionEnvironment.addSource (source) to add data sources to our program.

Flink has provided several well-implemented source functions, of course, we can also customize non-parallel source by implementing SourceFunction, or implement ParallelSourceFunction interface or extend RichParallelSourceFunction to customize parallel source.

Flink's source on streaming is basically the same as source on batch processing. There are roughly four categories:

Local collection-based source (Collection-based-source) file-based source (File-based-source)-reads a text file, that is, a file that conforms to the TextInputFormat specification, and returns it as a string to network socket-based source (Socket-based-source)-read from socket. Elements can be segmented with delimiters. Custom source (Custom-source)

The following uses addSource to write Kafka data to Flink as an example:

If you need external data source docking, you can use addSource. For example, if you write Kafka data to Flink, introduce dependencies first:

Org.apache.flink

Flink-connector-kafka-0.11_2.11

1.10.0

Write Kafka data to Flink:

Val properties = new Properties ()

Properties.setProperty ("bootstrap.servers", "localhost:9092")

Properties.setProperty ("group.id", "consumer-group")

Properties.setProperty ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

Properties.setProperty ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

Properties.setProperty ("auto.offset.reset", "latest")

Val source = env.addSource (new FlinkKafkaConsumer011 [String] ("sensor", new SimpleStringSchema (), properties))

Based on network sockets:

Val source = env.socketTextStream ("IP", PORT) II. Transform conversion operator 1. Map

Convert each element in the DataSet to another element:

DataStream.map {x = > x * 2} 2. FlatMap

Take one data element and generate zero, one or more data elements. A flatmap function that divides a sentence into words:

DataStream.flatMap {str = > str.split (")} 3. Filter

Calculates the Boolean function for each data element and saves the data elements that the function returns to true. A filter that filters out zero values:

DataStream.filter {_! = 0} 4. KeyBy

Logically partition the stream into disjoint partitions. All records with the same Keys are assigned to the same partition. Internally, keyBy () is implemented using hash partitions. There are different ways to specify keys.

This transformation returns KeyedStream, which includes the KeyedStream required to use the Keys state:

DataStream.keyBy (0) 5. Reduce

A "scrolling" Reduce on a data stream that is keyed. Merges the current data element with the value group of the last Reduce to issue a new value:

KeyedStream.reduce {_ + _} 6. Fold

A "scrolling" collapse on a Keys data stream with an initial value. Merge the current data element with the last collapsed value group to emit a new value:

Val result: DataStream [String] = keyedStream.fold ("start") ((str, I) = > {str + "-" + I})

/ / explanation: when the above code is applied to the sequence (1 start-1 2, 3 4), the output results are "start-1", "start-1-2", "start-1-2-3",... 7. Aggregations

Scroll aggregation on the Keys data stream. The difference between min and minBy is that min returns the minimum value, while minBy returns the data element with the lowest value in the field (max is the same as maxBy):

KeyedStream.sum (0)

KeyedStream.min (0)

KeyedStream.max (0)

KeyedStream.minBy (0)

KeyedStream.maxBy (0); 8. Window

You can define a Windows on a KeyedStream that is already partitioned. Windows groups the data in each Keys according to certain characteristics, such as data that arrives within the last 5 seconds. The window is no longer explained in detail here. For a complete description of the window, please check out this article: detailed parsing of Time and Window, which are extremely important in Flink.

DataStream.keyBy (0) .window (TumblingEventTimeWindows.of (Time.seconds (5); 9. WindowAll

Windows can be defined on regular DataStream. Windows groups all flow events based on certain characteristics, such as data that arrives in the last 5 seconds.

Note: in many cases, this is a non-parallel transformation. All records will be collected in one task of the windowAll operator.

DataStream.windowAll (TumblingEventTimeWindows.of (Time.seconds (5) 10. Window Apply

Apply a general function to the entire window.

Note: if you are using windowAll transformations, you need to use AllWindowFunction.

The following is a function for manually summing window data elements:

WindowedStream.apply {WindowFunction}

AllWindowedStream.apply {AllWindowFunction} 11. Window Reduce

Apply the function reduction function to the window and return the reduced value:

WindowedStream.reduce {_ + _} 12. Window Fold

Apply the function collapse function to the window and return the fold value:

Val result: DataStream [String] = windowedStream.fold ("start", (str, I) = > {str + "-" + I})

/ / when the above code is applied to the sequence, fold the sequence into the string "start-1-2-3-4-5" 13. Union

A union of two or more data streams to create a new stream that contains all data elements from all streams. Note: if you federate the data flow with itself, you will get the data elements twice in the result stream:

DataStream.union (otherStream1, otherStream2,...) 14. Window Join

Connect two data streams on a given Keys and a common window:

DataStream.join (otherStream)

.where () .equalTo ()

.window (TumblingEventTimeWindows.of (Time.seconds (3)

.apply (new JoinFunction () {...}) 15. Interval Join

Use the common Keys to associate two data elements E1 and e2 of the two keyed data streams within a given time interval so that e1.timestamp + lowerBound false) connectedStreams.flatMap (

(_: Int) = > true

(_: String) = > false) 19. Split

Split a stream into two or more streams according to some criteria:

Val split = someDataStream.split (

(num: Int) = >

(num 2) match {

Case 0 = > List ("even")

Case 1 = > List ("odd")

}) 20. Select

Select one or more streams from the split stream:

SplitStream split;DataStream even = split.select ("even"); DataStream odd = split.select ("odd"); DataStream all = split.select ("even", "odd") III. Sink operator

Support for exporting data to:

Local files (reference batch) local collection (reference batch) HDFS (reference batch)

In addition, it also supports:

Sink to kafkasink to mysqlsink to redis

Take sink to kafka as an example:

Val sinkTopic = "test"

/ / sample class

Case class Student (id: Int, name: String, addr: String, sex: String)

Val mapper: ObjectMapper = new ObjectMapper ()

/ / convert an object to a string

Def toJsonString (T: Object): String = {

Mapper.registerModule (DefaultScalaModule)

Mapper.writeValueAsString (T)

}

Def main (args: Array [String]): Unit = {

/ / 1. Create a flow execution environment

Val env = StreamExecutionEnvironment.getExecutionEnvironment

/ / 2. Prepare data

Val dataStream: DataStream [Student] = env.fromElements (

Student (8, "xiaoming", "beijing biejing", "female")

)

/ / convert student to string

Val studentStream: DataStream [String] = dataStream.map (student = >

ToJsonString (student) / / one of the SerializerFeature needs to be displayed here, otherwise an error matching both methods will be reported.

)

/ / studentStream.print ()

Val prop = new Properties ()

Prop.setProperty ("bootstrap.servers", "node01:9092")

Val myProducer = new FlinkKafkaProducer011 [String] (sinkTopic, new KeyedSerializationSchemaWrapper [String] (new SimpleStringSchema ()), prop)

StudentStream.addSink (myProducer)

StudentStream.print ()

Env.execute ("Flink add sink")

At this point, the study of "what is the common operator of Flink stream computing" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report