In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "how to use Flink data flow DataStream and DataSet". In daily operation, I believe many people have doubts about how to use Flink data flow DataStream and DataSet. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to use Flink data flow DataStream and DataSet". Next, please follow the editor to study!
Flink is mainly used to deal with data flow, so from an abstract point of view, it is the processing of data flow, just as big data developed-Flink- architecture & & running architecture mentioned that writing Flink programs is actually writing DataSource, Transformation, Sink.
DataSource is the data source input of the program. You can add a data source to the program through StreamExecutionEnvironment.addSource (sourceFuntion).
Transformation is a concrete operation that computes one or more input data sources, such as Map, FlatMap, and Filter.
Sink is the output of a program, which can output the data processed by Transformation to a specified storage medium.
Three kinds of flow processing ApiDataSource of DataStream
Flink provides two implementation data sources for DataStream, which can be summarized into the following four categories:
File based
ReadTextFile (path) reads the text file, which follows the TextInputFormat read rule line by line and returns
Based on Socket
SocketTextStream reads data from Socket, and elements can be separated by a delimiter
Based on set
FromCollection (Collection) creates a data stream through the Collection collection of Java. All elements in the collection must be of the same type. It is important to note that if the elements in the collection are to be identified as POJO, the following conditions need to be met
Summary: the above requirements are actually designed to make it easy for Flink to serialize and deserialize these objects into data streams
This class has a common no-parameter construction method.
The class is common and independent (no non-static inner class)
All properties in the class (and parent class) that are not modified by static and transient are either public (and not modified by final) or contain public getter and setter methods that follow the java bean naming convention
Custom Source
Use StreamExecutionEnvironment.addSource (sourceFunction) to add a streaming data source to the program, specifically whether the sourceFunction is a non-parallel source implements SourceFunction, or a parallel source implements ParallelSourceFunction interface, or extends RichParallelSourceFunction. For custom Source,Sink, Flink has the following built-in Connector
Does the connector provide Source support? does it provide Sink support? does it support Apache Kafka? is it HDFS? is it Twitter Streaming PI?
For the use of Source, in fact, it is relatively simple, here is a more commonly used custom Source KafaSource use example. For more related source codes, please see:
Package com.hoult.stream;public class SourceFromKafka {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); String topic = "animalN"; Properties props = new Properties (); props.put ("bootstrap.servers", "linux121:9092"); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer (topic, new SimpleStringSchema (), props); DataStreamSource data = env.addSource (consumer) SingleOutputStreamOperator maped = data.map (new MapFunction () {@ Override public Tuple2 map (String value) throws Exception {System.out.println (value); Tuple2 t = new Tuple2 (0lmem0l); String [] split = value.split (",") Try {t = new Tuple2 (Long.valueOf (split [0]), Long.valueOf (split [1]));} catch (Exception e) {e.printStackTrace ();} return t;}}); KeyedStream keyed = maped.keyBy (value-> value.f0) / / according to key grouping policy, call stateful processing of streaming data SingleOutputStreamOperator flatMaped = keyed.flatMap (new RichFlatMapFunction () {ValueState sumState) @ Override public void open (Configuration parameters) throws Exception {/ / make State ValueStateDescriptor descriptor = new ValueStateDescriptor ("average", TypeInformation.of (new TypeHint () {}), Tuple2.of (0L) in the open method 0L)) SumState = getRuntimeContext () .getState (descriptor); / / super.open (parameters);} @ Override public void flatMap (Tuple2 value, Collector out) throws Exception {/ / in the flatMap method, update State Tuple2 currentSum = sumState.value (); currentSum.f0 + = 1 CurrentSum.f1 + = value.f1; sumState.update (currentSum); out.collect (currentSum); / * if (currentSum.f0 = = 2) {long avarage = currentSum.f1 / currentSum.f0; out.collect (new Tuple2 (value.f0, avarage)); sumState.clear () } * /}); flatMaped.print (); env.execute ();}} Transformation
For Transformation, Flink provides a number of operators
Map
DataStream → DataStream Takes one element and produces one element. A map function that doubles the values of the input stream:
DataStream dataStream = / /... dataStream.map (new MapFunction () {@ Override public Integer map (Integer value) throws Exception {return 2 * value;}})
FlatMap
DataStream → DataStream Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
DataStream.flatMap (new FlatMapFunction () {@ Override public void flatMap (String value, Collector out) throws Exception {for (String word: value.split ("")) {out.collect (word);})
Filter
DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
DataStream.filter (new FilterFunction () {@ Override public boolean filter (Integer value) throws Exception {return value! = 0;}})
KeyBy
DataStream → KeyedStream Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy () is implemented with hash partitioning. There are different ways to specify keys. This transformation returns a KeyedStream, which is, among other things, required to use keyed state.
Attention A type cannot be a key if:
Fold
Aggregation
Window/windowAll/window.apply/window.reduce/window.fold/window.aggregation
DataStream.keyBy (value-> value.getSomeKey ()) / / Key by field "someKey" dataStream.keyBy (value-> value.f0) / / Key by the first element of a Tuple
More operator operations can be found on the official website, which is well written: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/
Sink
Flink provides a large number of implemented data destinations (Sink) for DataStream, as shown below
WriteAsText (): elements are written line by line as strings, which are obtained by calling the toString () method of each element.
Print () / printToErr (): prints the value of the toString () method of each element to the standard output or standard error output stream
Custom output: addSink can output data to third-party storage media. Flink provides a number of built-in Connector, some of which provide corresponding Sink support.
Here's a common example, lower layer to Kafka.
Import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;public class StreamToKafka {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); DataStreamSource data = env.socketTextStream ("teacher2", 7777); String brokerList = "teacher2:9092"; String topic = "mytopic2" FlinkKafkaProducer producer = new FlinkKafkaProducer (brokerList, topic, new SimpleStringSchema ()); data.addSink (producer); env.execute ();}} common ApiDataSource of DataSet
For DataSet batches, the more frequent operation is to read file data in HDFS, because the two DataSource components are mainly introduced here
Collection-based, used to test similar to DataStream
File-based readTextFile....
Transformation
Sink
Flink provides a large number of implemented data destinations (Sink) for DataStream, as shown below
WriteAsText (): writes elements line by line as strings, which are obtained by calling the toString () method of each element
WriteAsCsv (): writes tuples to the file separated by commas, the separation between lines and fields is configurable, and the value of each field comes from the object's
ToString () method
Print () / pringToErr (): prints the value of the toString () method of each element to the standard output or standard error output stream. Flink provides a number of built-in Connector, some of which provide corresponding Sink support.
At this point, the study on "how to use Flink data flow DataStream and DataSet" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.