In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "what is the common operator of Flink flow calculation". In daily operation, I believe that many people have doubts about what the common operator of Flink flow calculation is. Xiaobian consulted all kinds of data and sorted out a simple and easy-to-use operation method. I hope it will be helpful for you to answer the doubt of "what is the common operator of Flink stream calculation?" Next, please follow the editor to study!
Flink, like Spark, is an one-stop processing framework; it can be processed in both batches (DataSet) and real-time (DataStream).
So the Flink operators are divided into two categories: one is DataSet, the other is DataStream.
DataSet-1, Source operator 1. FromCollection
FromCollection: reading data from a local collection
Example:
Val env = ExecutionEnvironment.getExecutionEnvironment
Val textDataSet: DataSet [String] = env.fromCollection (
List ("1, Zhang San", "2, Li Si", "3, Wang Wu", "4, Zhao Liu")
) 2. ReadTextFile
ReadTextFile: read from a file:
Val textDataSet: DataSet [String] = env.readTextFile ("/ data/a.txt") 3. ReadTextFile: traversing the directory
ReadTextFile can traverse access to all files in a file directory, including all files in all subdirectories:
Val parameters = new Configuration
/ / recursive.file.enumeration enables recursion
Parameters.setBoolean ("recursive.file.enumeration", true)
Val file = env.readTextFile ("/ data") .withParameters (parameters) 4. ReadTextFile: read compressed files
For the following compression types, there is no need to specify any additional inputformat methods, flink can automatically recognize and decompress. However, compressed files may not be read in parallel, but sequentially, which may affect the scalability of the job.
Can the compression method file extension read DEFLATE.deflatenoGZip.gz .gzipnoBzip2.bz2noXZ.xznoval file = env.readTextFile ("/ data/file.gz") 2. Transform conversion operator
Because the Transform operator is based on the Source operator operation, the Flink execution environment and the Source operator are constructed first, and the subsequent Transform operator operation is based on this:
Val env = ExecutionEnvironment.getExecutionEnvironment
Val textDataSet: DataSet [String] = env.fromCollection (
List ("Zhang San, 1", "Li Si, 2", "Wang Wu, 3", "Zhang San 4")
) 1. Map
Convert each element in the DataSet to another element:
/ / use map to convert List to a sample class of Scala
Case class User (name: String, id: String)
Val userDataSet: DataSet [User] = textDataSet.map {
Text = >
Val fieldArr = text.split (",")
User (fieldArr (0), fieldArr (1))
}
UserDataSet.print () 2. FlatMap
/ / use the flatMap operation to change the data in the collection:
/ / grouping according to the first element
/ / aggregate evaluation based on the second element
Val result = textDataSet.flatMap (line = > line)
.groupBy (0) / / grouped according to the first element
.sum (1) / / aggregate evaluation based on the second element
Result.print () 3. MapPartition
Convert an element in one partition to another:
/ / use mapPartition operation to convert List into a sample class of scala
Case class User (name: String, id: String)
Val result: DataSet [User] = textDataSet.mapPartition (line = > {
Line.map (index = > User (index._1, index._2))
})
Result.print () 4. Filter
Filter out some elements that meet the criteria and return elements with Boolean values of true:
Val source: DataSet [String] = env.fromElements ("java", "scala", "java")
Val filter:DataSet [String] = source.filter (line = > line.contains ("java")) / / filter out data with java
Filter.print () 5. Reduce
You can aggregate a dataset or a group, and finally aggregate into an element:
/ / use fromElements to build data sources
Val source = env.fromElements (("java", 1), ("scala", 1), ("java", 1))
/ / convert to DataSet tuple using map
Val mapData: DataSet [(String, Int)] = source.map (line = > line)
/ / grouped according to the first element
Val groupData = mapData.groupBy (_ .1)
/ / use reduce aggregation
Val reduceData = groupData.reduce ((x, y) = > (x.room1, x.room2 + y.room2))
/ / print test
ReduceData.print () 6. ReduceGroup
Aggregates an dataset or a group into one or more elements.
ReduceGroup is an optimization scheme of reduce.
It will first group the reduce, and then do the overall reduce;, which has the advantage of reducing the network IO:
/ / use fromElements to build data sources
Val source: DataSet [(String, Int)] = env.fromElements (("java", 1), ("scala", 1), ("java", 1))
/ / grouped according to the first element
Val groupData = source.groupBy (_ .1)
/ / use reduceGroup aggregation
Val result: DataSet [(String, Int)] = groupData.reduceGroup {
(in: Iterator [(String, Int)], out: Collector [(String, Int)]) = >
Val tuple = in.reduce ((x, y) = > (x.room1, x.room2 + y.room2))
Out.collect (tuple)
}
/ / print test
Result.print () 7. MinBy and maxBy
Select the element with the minimum or maximum value:
/ / use minBy operation to find the minimum value of everyone in List
/ List ("Zhang San, 1", "Li Si, 2", "Wang Wu, 3", "Zhang San, 4")
Case class User (name: String, id: String)
/ / convert List to a sample class of scala
Val text: DataSet [User] = textDataSet.mapPartition (line = > {
Line.map (index = > User (index._1, index._2))
})
Val result = text
.groupBy (0) / / grouped by name
.minBy (1) / / minimum value per person 8. Aggregate
Aggregate the maximum (maximum and minimum) values on the dataset:
Val data = new mutable.MutableList [(Int, String, Double)]
Data.+= ((1, "yuwen", 89.0)
Data.+= ((2, "shuxue", 92.2)
Data.+= ((3, "yuwen", 89.99)
/ / use fromElements to build data sources
Val input: DataSet [(Int, String, Double)] = env.fromCollection (data)
/ / use group to perform grouping operations
Val value = input.groupBy (1)
/ / use aggregate to find the maximum element
.clients (Aggregations.MAX, 2)
/ / print test
Value.print ()
Aggregate can only act on tuples
Note:
To use aggregate, you can only use the field index name or index name to group groupBy (0), otherwise an error will be reported:
Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.
9. Distinct
Remove duplicate data:
/ / the data source uses the previous question.
/ / use distinct operation to remove duplicate tuple data in the collection according to the account
Val value: DataSet [(Int, String, Double)] = input.distinct (1)
Value.print () 10. First
Take the number of top N:
Input.first (2) / / take the first two numbers 11. Join
Connect two DataSet together according to certain conditions to form a new DataSet:
/ / the data sets S1 and S2 are in the following format:
/ / DataSet [(Int, String,String, Double)]
Val joinData = s1.join (S2) / / S1 dataset join S2 dataset
Conditions for .where (0) .equalTo (0) {/ / join
(S1, S2) = > (s1.room1, s1.room2, s2.room2, s1.room3)
} 12. LeftOuterJoin
Left outer link, each element in the Dataset on the left, to connect the element on the right
There are also:
RightOuterJoin: right outer link, every element in the left Dataset, to connect the left element
FullOuterJoin: full external connection, left and right elements, all connected
Here is an example of leftOuterJoin:
Val data1 = ListBuffer [Tuple2 [Int,String]] ()
Data1.append ((1, zhangsan))
Data1.append (2, "lisi"))
Data1.append (3, "wangwu"))
Data1.append (4, "zhaoliu"))
Val data2 = ListBuffer [Tuple2 [Int,String]] ()
Data2.append ((1, beijing))
Data2.append (2, "shanghai"))
Data2.append (4, "guangzhou"))
Val text1 = env.fromCollection (data1)
Val text2 = env.fromCollection (data2)
Text1.leftOuterJoin (text2) .where (0) .equalTo (0). Apply ((first,second) = > {
If (second==null) {
(first._1,first._2, "null")
} else {
(first._1,first._2,second._2)
}
}). Print () 13. Cross
Cross operation to create a new data set by forming the Cartesian product of this data set and other data sets
Similar to join, but this crossover operation produces Cartesian product, which is a memory-consuming operation when the data is large:
Val cross = input1.cross (input2) {
(input1, input2) = > (input1._1,input1._2,input1._3,input2._2)
}
Cross.print () 14. Union
A joint operation to create a new dataset that contains elements from this dataset and other datasets without duplicating:
Val unionData: DataSet [String] = elements1.union (elements2) .union (elements3)
/ / remove duplicate data
Val value = unionData.distinct (line = > line) 15. Rebalance
Flink also has data skew, for example, there are about 1 billion pieces of data to be processed, and the situation shown in the figure may occur during the processing:
At this time, the problem that the overall amount of data only needed 10 minutes to be solved, the data was skewed, and the task on machine 1 took four hours to complete. then the other three machines have to wait for machine 1 to complete the task as a whole. So in the actual work, the better solution to this situation is the next introduction-rebalance, which internally uses the round robin method to break up the data evenly. This is a good choice when the data is skewed. )
/ / use rebalance operation to avoid data skew
Val rebalance = filterData.rebalance () 16. PartitionByHash
Partition the hash according to the specified key:
Val data = new mutable.MutableList [(Int, Long, String)]
Data.+= ((1,1L, "Hi"))
Data.+= ((2,2L, "Hello"))
Data.+= ((3,2L, "Hello world"))
Val collection = env.fromCollection (data)
Val unique = collection.partitionByHash (1) .mapPartition {
Line = >
Line.map (x = > (x.room1, x.room2, x.room3))
}
Unique.writeAsText ("hashPartition", WriteMode.NO_OVERWRITE)
Env.execute () 17. PartitionByRange
Range partitioning the dataset according to the specified key:
Val data = new mutable.MutableList [(Int, Long, String)]
Data.+= ((1,1L, "Hi"))
Data.+= ((2,2L, "Hello"))
Data.+= ((3,2L, "Hello world"))
Data.+= ((4,3L, "Hello world, how are you?"))
Val collection = env.fromCollection (data)
Val unique = collection.partitionByRange (x = > x.map1) .mapPartition (line = > line.map {
X = >
(x.room1, x.room2, x.room3)
})
Unique.writeAsText ("rangePartition", WriteMode.OVERWRITE)
Env.execute () 18. SortPartition
Sorts partitions according to the specified field values:
Val data = new mutable.MutableList [(Int, Long, String)]
Data.+= ((1,1L, "Hi"))
Data.+= ((2,2L, "Hello"))
Data.+= ((3,2L, "Hello world"))
Data.+= ((4,3L, "Hello world, how are you?"))
Val ds = env.fromCollection (data)
Val result = ds
.map {x = > x} .setParallelism (2)
.sortPartition (1, Order.DESCENDING) / / the first parameter represents which field to partition
.mapPartition (line = > line)
.clients ()
Println (result) 3, Sink operator 1. Collect
Export the data to the local collection:
Result.collect () 2. WriteAsText
Export data to a file
Flink supports files on a variety of storage devices, including local files, hdfs files, etc.
Flink supports a variety of file storage formats, including text files, CSV files, etc.
/ / write data to a local file
Result.writeAsText ("/ data/a", WriteMode.OVERWRITE)
/ / write data to HDFS
Result.writeAsText ("hdfs://node01:9000/data/a", WriteMode.OVERWRITE) DataStream
Like DataSet, DataStream includes a series of Transformation operations.
1. Source operator
Flink can use StreamExecutionEnvironment.addSource (source) to add data sources to our program.
Flink has provided several well-implemented source functions, of course, we can also customize non-parallel source by implementing SourceFunction, or implement ParallelSourceFunction interface or extend RichParallelSourceFunction to customize parallel source.
Flink's source on streaming is basically the same as source on batch processing. There are roughly four categories:
Local collection-based source (Collection-based-source) file-based source (File-based-source)-reads a text file, that is, a file that conforms to the TextInputFormat specification, and returns it as a string to network socket-based source (Socket-based-source)-read from socket. Elements can be segmented with delimiters. Custom source (Custom-source)
The following uses addSource to write Kafka data to Flink as an example:
If you need external data source docking, you can use addSource. For example, if you write Kafka data to Flink, introduce dependencies first:
Org.apache.flink
Flink-connector-kafka-0.11_2.11
1.10.0
Write Kafka data to Flink:
Val properties = new Properties ()
Properties.setProperty ("bootstrap.servers", "localhost:9092")
Properties.setProperty ("group.id", "consumer-group")
Properties.setProperty ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
Properties.setProperty ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
Properties.setProperty ("auto.offset.reset", "latest")
Val source = env.addSource (new FlinkKafkaConsumer011 [String] ("sensor", new SimpleStringSchema (), properties))
Based on network sockets:
Val source = env.socketTextStream ("IP", PORT) II. Transform conversion operator 1. Map
Convert each element in the DataSet to another element:
DataStream.map {x = > x * 2} 2. FlatMap
Take one data element and generate zero, one or more data elements. A flatmap function that divides a sentence into words:
DataStream.flatMap {str = > str.split (")} 3. Filter
Calculates the Boolean function for each data element and saves the data elements that the function returns to true. A filter that filters out zero values:
DataStream.filter {_! = 0} 4. KeyBy
Logically partition the stream into disjoint partitions. All records with the same Keys are assigned to the same partition. Internally, keyBy () is implemented using hash partitions. There are different ways to specify keys.
This transformation returns KeyedStream, which includes the KeyedStream required to use the Keys state:
DataStream.keyBy (0) 5. Reduce
A "scrolling" Reduce on a data stream that is keyed. Merges the current data element with the value group of the last Reduce to issue a new value:
KeyedStream.reduce {_ + _} 6. Fold
A "scrolling" collapse on a Keys data stream with an initial value. Merge the current data element with the last collapsed value group to emit a new value:
Val result: DataStream [String] = keyedStream.fold ("start") ((str, I) = > {str + "-" + I})
/ / explanation: when the above code is applied to the sequence (1 start-1 2, 3 4), the output results are "start-1", "start-1-2", "start-1-2-3",... 7. Aggregations
Scroll aggregation on the Keys data stream. The difference between min and minBy is that min returns the minimum value, while minBy returns the data element with the lowest value in the field (max is the same as maxBy):
KeyedStream.sum (0)
KeyedStream.min (0)
KeyedStream.max (0)
KeyedStream.minBy (0)
KeyedStream.maxBy (0); 8. Window
You can define a Windows on a KeyedStream that is already partitioned. Windows groups the data in each Keys according to certain characteristics, such as data that arrives within the last 5 seconds. The window is no longer explained in detail here. For a complete description of the window, please check out this article: detailed parsing of Time and Window, which are extremely important in Flink.
DataStream.keyBy (0) .window (TumblingEventTimeWindows.of (Time.seconds (5); 9. WindowAll
Windows can be defined on regular DataStream. Windows groups all flow events based on certain characteristics, such as data that arrives in the last 5 seconds.
Note: in many cases, this is a non-parallel transformation. All records will be collected in one task of the windowAll operator.
DataStream.windowAll (TumblingEventTimeWindows.of (Time.seconds (5) 10. Window Apply
Apply a general function to the entire window.
Note: if you are using windowAll transformations, you need to use AllWindowFunction.
The following is a function for manually summing window data elements:
WindowedStream.apply {WindowFunction}
AllWindowedStream.apply {AllWindowFunction} 11. Window Reduce
Apply the function reduction function to the window and return the reduced value:
WindowedStream.reduce {_ + _} 12. Window Fold
Apply the function collapse function to the window and return the fold value:
Val result: DataStream [String] = windowedStream.fold ("start", (str, I) = > {str + "-" + I})
/ / when the above code is applied to the sequence, fold the sequence into the string "start-1-2-3-4-5" 13. Union
A union of two or more data streams to create a new stream that contains all data elements from all streams. Note: if you federate the data flow with itself, you will get the data elements twice in the result stream:
DataStream.union (otherStream1, otherStream2,...) 14. Window Join
Connect two data streams on a given Keys and a common window:
DataStream.join (otherStream)
.where () .equalTo ()
.window (TumblingEventTimeWindows.of (Time.seconds (3)
.apply (new JoinFunction () {...}) 15. Interval Join
Use the common Keys to associate two data elements E1 and e2 of the two keyed data streams within a given time interval so that e1.timestamp + lowerBound false) connectedStreams.flatMap (
(_: Int) = > true
(_: String) = > false) 19. Split
Split a stream into two or more streams according to some criteria:
Val split = someDataStream.split (
(num: Int) = >
(num 2) match {
Case 0 = > List ("even")
Case 1 = > List ("odd")
}) 20. Select
Select one or more streams from the split stream:
SplitStream split;DataStream even = split.select ("even"); DataStream odd = split.select ("odd"); DataStream all = split.select ("even", "odd") III. Sink operator
Support for exporting data to:
Local files (reference batch) local collection (reference batch) HDFS (reference batch)
In addition, it also supports:
Sink to kafkasink to mysqlsink to redis
Take sink to kafka as an example:
Val sinkTopic = "test"
/ / sample class
Case class Student (id: Int, name: String, addr: String, sex: String)
Val mapper: ObjectMapper = new ObjectMapper ()
/ / convert an object to a string
Def toJsonString (T: Object): String = {
Mapper.registerModule (DefaultScalaModule)
Mapper.writeValueAsString (T)
}
Def main (args: Array [String]): Unit = {
/ / 1. Create a flow execution environment
Val env = StreamExecutionEnvironment.getExecutionEnvironment
/ / 2. Prepare data
Val dataStream: DataStream [Student] = env.fromElements (
Student (8, "xiaoming", "beijing biejing", "female")
)
/ / convert student to string
Val studentStream: DataStream [String] = dataStream.map (student = >
ToJsonString (student) / / one of the SerializerFeature needs to be displayed here, otherwise an error matching both methods will be reported.
)
/ / studentStream.print ()
Val prop = new Properties ()
Prop.setProperty ("bootstrap.servers", "node01:9092")
Val myProducer = new FlinkKafkaProducer011 [String] (sinkTopic, new KeyedSerializationSchemaWrapper [String] (new SimpleStringSchema ()), prop)
StudentStream.addSink (myProducer)
StudentStream.print ()
Env.execute ("Flink add sink")
At this point, the study of "what is the common operator of Flink stream computing" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.