Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Chapter II Data Processing Using the DataStream API

2025-01-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Chapter II Data Processing Using the DataStream API (Chinese and English)

Reprint should be marked with the source mythmoon@163.com.

Real-time analytics is currently an important issue. Many different domains need to process data in real time. So far there have been multiple technologies trying to provide this capability. Technologies such as Storm and Spark have been on the market for a long time now. Applications derived from the Internet of Things (IoT) need data to be stored, processed, and analyzed in real or near real time. In order to cater for such needs, Flink provides a streaming data processing API called DataStream API. Real-time analysis is an important problem at present. Many different domains need to process data in real time. So far, a variety of technologies have tried to provide this capability. Technologies such as Storm and Spark have been on the market for a long time. Applications derived from the Internet of things (IoT) need to store, process, and analyze data in real or near real time. To meet these requirements, Flink provides a streaming data processing API called DataStream API.

In this chapter, we are going to look at the details relating to DataStream API, covering the following topics: in this chapter, we will introduce the details related to datastream api, including the following topics:

L Execution environment execution environment

L Data sources data source

L Transformations transformation

L Data sinks data aggregation

L Connectors connector

L Use case-sensor data analytics use example-- ­data sensor analysis

Any Flink program works on a certain defined anatomy as follows: an analysis of how any Flink program works in a certain definition, as follows:

We will be looking at each step and how we can use DataStream API with this anatomy. We will examine each step and how to use DataStream API in this analysis structure.

Execution environment

In order to start writing a Flink program, we first need to get an existing execution environment or create one. Depending upon what you are trying to do, Flink supports: in order to start writing Flink programs, we first need to obtain an existing execution environment or create an execution environment. Depending on what you want to do, Flink supports:

L Getting an already existing Flink environment gets the existing Flink environment

L Creating a local environment creates a local environment

L Creating a remote environment creates a remote environment

Typically, you only need to use getExecutionEnvironment (). This will do the right thing based on your context. If you are executing on a local environment in an IDE then it will start a local execution environment. Otherwise, if you are executing the JAR then the Flink cluster manager will execute the program in a distributed manner. Typically, you only need to use "getExecutionEnvironment ()". Depending on your context, this will do the right thing. If executed on a local environment in IDE, it starts the local execution environment. Otherwise, if you are performing JAR, Flink Cluster Administrator will execute the program in a distributed manner.

If you want to create a local or remote environment on your own then you can also choose do so by using methods such as createLocalEnvironment () and createRemoteEnvironment (String host, int port, String, and. Jar files).

If you want to choose your local or remote environment, you can also use createLocalEnvironment () and createRemoteEnvironment (String host, int port, String, and. Jar files). And other methods to set.

Data sources data source

Sources are places where the Flink program expects to get its data from. This is a second step in the Flink program's anatomy. Flink supports a number of pre-implemented data source functions. It also supports writing custom data source functions so anything that is not supported can be programmed easily. First let's try to understand the built-in source functions. The source is where the Flink program wants to get the data. This is the second step in the dissection of the Flink program. Flink supports many pre-implemented data source functions. It also supports writing custom data source functions so that you can easily program anything that is not supported. First, let's try to understand the built-in source functions.

Socket-based is socket-based

DataStream API supports reading data from a socket. You just need to specify the host and port to read the data from and it will do the work: DataStream API supports reading data from sockets. Simply specify the host and port from which you want to read the data, and it can do the following:

SocketTextStream (hostName, port)

You can also choose to specify the delimiter: you can also choose to specify the delimiter:

SocketTextStream (hostName,port,delimiter)

You can also specify the maximum number of times the API should try to fetch the data: you can also specify the maximum number of times API should try to get data:

SocketTextStream (hostName,port,delimiter, maxRetry)

File-based is file-based

You can also choose to stream data from a file source using file-based source functions in Flink. You can use readTextFile (String path) to stream data from a file specified in the path. By default it will read TextInputFormat and will read strings line by line. You can also choose to use the file-based source functions in Flink to stream data from the file source. You can use readTextFile (String path) to stream data from the file specified in the path. By default, it reads the TextInputFormat and reads the string line by line.

If the file format is other than text, you can specify the same using these functions: if the file format is not text, you can specify the same format using the following functions:

ReadFile (FileInputFormat inputFormat, String path)

Flink also supports reading file streams as they are produced using the readFileStream ()

Function: Flink also supports reading file streams generated using the readFileStream () function:

ReadFileStream (String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType)

You just need to specify the file path, the polling interval in which the file path should be polled, and the watch type. Watch types consist of three types: you only need to specify the file path, the polling interval for the file path that should be polled, and the monitoring type. Monitoring types consist of three types:

FileMonitoringFunction.WatchType.ONLY_NEW_FILES is used when the system should process only new files FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED is used when the system should process only appended contents of files FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED is used when the system should re-process not only the appended contents of files but also the previous content in the file

If the file is not a text file, then we do have an option to use following function, which lets us define the file input format: if the file is not a text file, then we do have an option to use the following function, which allows us to define the file input format:

ReadFile (fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

Internally, it divides the reading file task into two sub-tasks. One sub task only monitors the file path based on the WatchType given. The second sub-task does the actual file reading in parallel. The sub-task which monitors the file path is a non-parallel sub-task. Its job is to keep scanning the file path based on the polling interval and report files to be processed, split the files, and assign the splits to the respective downstream threads: internally, it divides the task of reading files into two subtasks. A subtask monitors only file paths based on a given monitoring type. The second subtask performs the actual file reading in parallel. The subtasks that monitor the file path are non-parallel subtasks. Its job is to continue to scan the file path, split the file, and assign the split to their respective downstream threads based on the polling interval and report files to be processed:

Transformations transformation

Data transformations transform the data stream from one form into another. The input could be one or more data streams and the output could also be zero, or one or more data streams. Now let's try to understand each transformation one by one. Data transformation converts a data stream from one form to another. The input can be one or more data streams, the output can be zero, or one or more data streams. Now, let's try to understand each transformation one by one.

Map mapping

This is one of the simplest transformations, where the input is one data stream and the output is also one data stream. This is one of the simplest transformations, where the input is a data stream and the output is a data stream.

In Java:

InputStream.map (new MapFunction () {

@ Override

PublicInteger map (Integer value) throws Exception {return 5 * value

}

});

In Scala:

InputStream.map {x = > x * 5}

FlatMap

FlatMap takes one record and outputs zero, one, or more than one record. A flat map takes a record and outputs zero, one or more records.

In Java:

InputStream.flatMap (new FlatMapFunction () {

@ Override

Public void flatMap (String value, Collector out) throws Exception {

For (String word: value.split ("")) {out.collect (word)

}

}

});

In Scala:

InputStream.flatMap {str = > str.split ("")}

Filter

Filter functions evaluate the conditions and then, if they result as true, only emit the record. Filter functions can output zero records. The filter function calculates the conditions, and then, if their result is true, only records are emitted. The filtering function can output zero records.

In Java:

InputStream.filter (new FilterFunction () {

@ Override

Public boolean filter (Integer value) throws Exception {return value! = 1

}

});

In Scala:

InputStream.filter {_! = 1}

KeyBy

KeyBy logically partitions the stream-based on the key. Internally it uses hash functions to partition the stream. It returns KeyedDataStream. Logically flow-based partitioning of keys. Internally, it uses hash function convection for partitioning. It returns KeyedDataStream.

In Java:

InputStream.keyBy ("someKey")

In Scala:

InputStream.keyBy ("someKey")

Reduce reduction

Reduce rolls out the KeyedDataStream by reducing the last reduced value with the current value. The following code does the sum reduce of a KeyedDataStream. Reduce the scrolling of KeyedDataStream by reducing the relationship between the current value and the current value. The following code performs the sum reduction of KeyedDataStream

In Java:

KeyedInputStream. Reduce (new ReduceFunction () {

@ Override

Public Integer reduce (Integer value1, Integer value2) throws Exception {

Return value1 + value2

}

});

In Scala:

KeyedInputStream. Reduce {_ + _}

Fold folding

Fold rolls out the KeyedDataStream by combining the last folder stream with the current record. It emits a data stream back. Collapse scrolls the KeyedDataStream by combining the last package with the current record. It sends the data stream back.

In Java:

KeyedInputStream keyedStream.fold ("Start", new FoldFunction () {

@ Override

Public String fold (String current, Integer value) {return current + "=" + value

}

});

In Scala:

KeyedInputStream.fold ("Start") ((str, I) = > {str + "=" + I})

The preceding given function when applied on a stream of (1, 2, 3, 4) would emit a stream like this: Start=1=2=3=4=5

Aggregations aggregation

DataStream API supports various aggregations such as min, max, sum, and so on. These functions can be applied on KeyedDataStream in order to get rolling aggregations. DataStream API supports various aggregations, such as MIN, MAX, SUM, and so on. These functions are applied to KeyedDataStream to get scrolling aggregations.

In Java:

KeyedInputStream.sum (0) keyedInputStream.sum ("key") keyedInputStream.min (0) keyedInputStream.min ("key") keyedInputStream.max (0) keyedInputStream.max ("key") keyedInputStream.minBy (0) keyedInputStream.minBy ("key") keyedInputStream.maxBy (0) keyedInputStream.maxBy ("key")

In Scala:

KeyedInputStream.sum (0) keyedInputStream.sum ("key") keyedInputStream.min (0) keyedInputStream.min ("key") keyedInputStream.max (0) keyedInputStream.max ("key") keyedInputStream.minBy (0) keyedInputStream.minBy ("key") keyedInputStream.maxBy (0) keyedInputStream.maxBy ("key")

The difference between max and maxBy is that max returns the maximum value in a stream but maxBy returns a key that has a maximum value. The same applies to min and minBy. The difference between max and maxBy is that the maximum value returns the maximum value in the stream, but maxBy returns the key with the maximum value. The same applies to the smallest and the smallest.

Window window

The window function allows the grouping of existing KeyedDataStreams by time or other conditions. The following transformation emits groups of records by a time window of 10 seconds. The window function allows existing KeyedDataStreams to be grouped by time or other conditions. The following conversion sends out the record group in a 10-second time window.

In Java:

InputStream.keyBy (0) .window (TumblingEventTimeWindows.of (Time.seconds (10)

In Scala:

InputStream.keyBy (0) .window (TumblingEventTimeWindows.of (Time.seconds (10)

Flink defines slices of data in order to process (potentially) infinite data streams. These slices are called windows. This slicing helps processing data in chunks by applying transformations. To do windowing on a stream, we need to assign a key on which the distribution can be made and a function which describes what transformations to perform on a windowed stream. Flink defines data slices to handle (possibly) infinite data streams. These slices are called windows. This slice helps you process the data in the block by applying the transformation. To do window processing on the flow, we need to assign a key that can be distributed on it and a function that describes the transformation to be performed on the window flow.

To slice streams into windows, we can use pre-implemented Flink window assigners. We have options such as, tumbling windows, sliding windows, global and session windows. Flink also allows you to write custom window assigners by extending WindowAssginer class. Let's try to understand how these various assigners work.

To slice the stream into a window, we can use the preimplemented Flink window allocator. We have a variety of options, such as tumble window, sliding window, global and session window. Flink also allows you to write custom window allocators by extending the WindowAssginer class. Let's try to understand how these different allocation procedures work.

Global windows

Global windows are never-ending windows unless specified by a trigger. Generally in this case, each element is assigned to one single per-key global Window. If we don't specify any trigger, no computation will ever get triggered. A global window is a never-ending window unless specified by a trigger. Typically in this case, each element is assigned to a per-key global window. If we do not specify any triggers, no calculation will be triggered.

Tumbling windows

Tumbling windows are created based on certain times. They are fixed-length windows and non over lapping. Tumbling windows should be useful when you need to do computation of elements in specific time. For example, tumbling window of 10 minutes can be used to compute a group of events occurring in 10 minutes time. The tumble window is created at a specific time. They are fixed-length windows that do not overlap. Tumbling windows should be useful when calculating elements at a specific time. For example, a 10-minute tumbling window can be used to calculate a set of events that occur within 10 minutes.

Sliding windows

Sliding windows are like tumbling windows but they are overlapping. They are fixed- length windows overlapping the previous ones by a user given window slide parameter. This type of windowing is useful when you want to compute something out of a group of events occurring in a certain time frame. Sliding windows are like tumbling windows, but they overlap. They are fixed-length windows that overlap the previous window by the given window slide parameters. This type of window is useful when you want to calculate something from a set of events that occur within a specific time range.

Session windows

Session windows are useful when windows boundaries need to be decided upon the input data. Session windows allows flexibility in window start time and window size. We can also provide session gap configuration parameter which indicates how long to wait before considering the session in closed. Session windows are useful when you need to determine window boundaries based on input data. The session window allows flexibility in window start time and window size. We can also provide a session gap configuration parameter that indicates how long to wait before considering the session at the end.

WindowAll

The windowAll function allows the grouping of regular data streams. Generally this is a non-parallel data transformation as it runs on non-partitioned streams of data. The windowAll function allows you to group regular data streams. Typically, this is a non-parallel data transformation because it runs on non-partitioned data streams.

In Java:

InputStream.windowAll (TumblingEventTimeWindows.of (Time.seconds (10)

In Scala:

InputStream.windowAll (TumblingEventTimeWindows.of (Time.seconds (10)

Similar to regular data stream functions, we have window data stream functions as well. The only difference is they work on windowed data streams. So window reduce works like the Reduce function, Window fold works like the Fold function, and there are aggregations as well. Similar to regular data flow functions, we also have window data flow functions. The only difference is that they apply to window data streams. As a result, window reduction works like a reduction function, window folding works like a fold function, and there are aggregations.

Union

The Union function performs the union of two or more data streams together. This does the combining of data streams in parallel. If we combine one stream with itself then it outputs each record twice. The Union function merges two or more data streams together. This merges data streams in parallel. If we combine a stream with itself, it will output each record twice.

In Java:

InputStream. Union (inputStream1, inputStream2,...)

In Scala:

InputStream. Union (inputStream1, inputStream2,...)

Window join

We can also join two data streams by some keys in a common window. The following example shows the joining of two streams in a Window of 5 seconds where the joining condition of the first attribute of the first stream is equal to the second attribute of the other stream. We can also connect two data streams through some keys in the common window. The following example shows connecting two streams in a 5-second window, where the connection condition of the first property of the first stream is equal to the second property of the other stream.

In Java:

InputStream. Join (inputStream1)

.where (0) .equalTo (1)

.window (TumblingEventTimeWindows.of (Time.seconds (5)

.apply (new JoinFunction () {...})

In Scala:

InputStream. Join (inputStream1)

.where (0) .equalTo (1)

.window (TumblingEventTimeWindows.of (Time.seconds (5)

.apply {...}

Split

This function splits the stream into two or more streams based on the criteria. This can be used when you get a mixed stream and you may want to process each data separately. This function splits the stream into two or more streams based on conditions. You can use this method when you have a mixed stream, and you may want to process each data separately.

In Java:

SplitStream split = inputStream.split (new OutputSelector () {

@ Override

Public Iterable select (Integer value) {List output = new ArrayList (); if (value% 2 = = 0) {

Output.add ("even")

}

Else {

Output.add ("odd")

}

});

In Scala:

}

Return output

Val split = inputStream.split ((num: Int) = >

(num 2) match {

Case 0 = > List ("even") case 1 = > List ("odd")

}

)

Select

This function allows you to select a specific stream from the split stream. This function allows you to select a specific stream from a split stream.

In Java:

SplitStream split

DataStream even = split.select ("even"); DataStream odd = split.select ("odd"); DataStream all = split.select ("even", "odd")

In Scala:

Val even = split select "even" val odd = split select "odd"

Val all = split.select ("even", "odd")

Project

The Project function allows you to select a sub-set of attributes from the event stream and only sends selected elements to the next processing stream. The Project function allows you to select a subset of attributes from the event flow and send only the selected elements to the next processing flow.

In Java:

DataStream in = / / [...] DataStream out = in.project (3jue 2)

In Scala:

Val in: DataStream [(Int,Double,String)] = / / [...] Val out = in.project (3jue 2)

The preceding function selects the attribute numbers 2 and 3 from the given records. The following is the sample input and output records: the previous function selects attribute numbers 2 and 3 from a given record. The following are sample input and output records:

(1mai 10.0, A, B) = > (BMaga)

(2meme 20.0, C, D) = > (DMague C)

Physical partitioning

Flink allows us to perform physical partitioning of the stream data. You have an option to provide custom partitioning. Let us have a look at the different types of partitioning. Flink allows us to stream data to perform physical partitioning. You can choose to provide custom partitions. Let's look at different types of partitions.

Custom partitioning

As mentioned earlier, you can provide custom implementation of a partitioner. As mentioned earlier, you can provide a custom implementation of the partitioning program.

In Java:

InputStream.partitionCustom (partitioner, "someKey"); inputStream.partitionCustom (partitioner, 0)

In Scala:

InputStream.partitionCustom (partitioner, "someKey") inputStream.partitionCustom (partitioner, 0)

While writing a custom partitioner you need make sure you implement an efficient hash function. When writing a custom partitioning program, you need to ensure that you implement a valid hash function.

Random partitioning

Random partitioning randomly partitions data streams in an evenly manner. Random partitioning randomly partitions the data stream in a uniform manner.

In Java:

InputStream.shuffle ()

In Scala:

InputStream.shuffle ()

Rebalancing partitioning

This type of partitioning helps distribute the data evenly. It uses a round robin method for distribution. This type of partitioning is good when data is skewed. This type of partition helps to distribute data evenly. It is distributed using a circular method. This type of partition is good when the data is skewed.

In Java:

InputStream.rebalance ()

In Scala:

InputStream.rebalance ()

Rescaling

Rescaling is used to distribute the data across operations, perform transformations on sub- sets of data and combine them together. This rebalancing happens over a single node only, hence it does not require any data transfer across networks.

Rescaling is used to distribute data across operations, perform transformations on subsets of data, and group them together. This rebalancing occurs only on a single node, so no data transfer across the network is required.

The following diagram shows the distribution:

In Java:

InputStream.rescale ()

In Scala:

InputStream.rescale ()

Broadcasting

Broadcasting distributes all records to each partition. This fans out each and every element to all partitions. The broadcast assigns all records to each partition. This propagates to every element of all partitions.

In Java:

InputStream.broadcast ()

In Scala:

InputStream.broadcast ()

Data sinks

After the data transformations are done, we need to save results into some place. The following are some options Flink provides us to save results: after the data conversion is complete, we need to save the results to a location. Here are some of the options Flink provides for us to save the results:

WriteAsText (): Writes records one line at a time as strings. Write records write one line of string at a time.

WriteAsCsV (): Writes tuples as comma separated value files. Row and fields delimiter can also be configured. Writes tuples to a comma-delimited values file. You can also configure line and field delimiters.

Print () / printErr (): Writes records to the standard output. You can also choose to write to the standard error. Writes records to standard output. You can also choose to write standard error.

WriteUsingOutputFormat (): You can also choose to provide a custom output format. While defining the custom format you need to extend the OutputFormat which takes care of serialization and deserialization. You can also choose to provide a custom output format. When defining custom formats, you need to extend OutputFormat to handle serialization and deserialization.

WriteToSocket (): Flink supports writing data to a specific socket as well. It is required to define SerializationSchema for proper serialization and formatting. Flink also supports writing data to specific sockets. It needs to define a SerializationSchema for proper serialization and formatting.

Event time and watermarks

Flink Streaming API takes inspiration from Google Data Flow model. It supports different concepts of time for its streaming API. In general, there three places where we can capture time in a streaming environment. They are as follows:Flink Streaming API takes inspiration from the google data flow model. It supports streaming API's different concepts of time. In general, there are three places where we can capture time in a streaming environment. They are as follows:

Event time

The time at which event occurred on its producing device. For example in IoT project, the time at which sensor captures a reading. Generally these event times needs to embed in the record before they enter Flink. At the time processing, these timestamps are extracted and considering for windowing. Event time processing can be used for out of order events.

The time when the event occurred on its production equipment. For example, in an Internet of things project, the sensor captures the time of the reading. Typically, these event times need to be embedded in the record before entering the flink. When processing, extract these timestamps and consider the window. Event time processing can be used for unordered events.

Processing time

Processing time is the time of machine executing the stream of data processing. Processing time windowing considers only that timestamps where event is getting processed.

Processing time is simplest way of stream processing as it does not require any synchronization between processing machines and producing machines. In distributed asynchronous environment processing time does not provide determinism as it is dependent on the speed at which records flow in the system. The processing time is the time that the machine executes the data processing flow. The processing time window only considers the timestamp of the processing event. Processing time is the easiest way to stream because it does not need to handle any synchronization between the computer and the production machine. In a distributed asynchronous environment, processing time does not provide certainty because it depends on the flow speed recorded in the system.

Ingestion time

This is time at which a particular event enters Flink. All time based operations refer to this timestamp. Ingestion time is more expensive operation than processing but it gives predictable results. Ingestion time programs cannot handle any out of order events as it assigs timestamp only after the event is entered the Flink system. This is the time when a specific event enters the Flink. This timestamp is referenced by all time-based operations. Ingestion time is more expensive than treatment, but it provides predictable results. The ingestion time program cannot handle any unordered events because it only timestamps the events after they enter the Flink system.

Here is an example which shows how to set event time and watermarks. In case of ingestion time and processing time, we just need to the time characteristics and watermark generation is taken care automatically. Following is a code snippet for the same. The following is an example of how to set the event time and watermark. In the case of ingesting time and processing time, only the time feature and watermark generation need to be processed automatically. Here is the same code snippet.

In Java:

Final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.ProcessingTime)

/ / or env.setStreamTimeCharacteristic (TimeCharacteristic.IngestionTime)

In Scala:

Val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic (TimeCharacteristic.ProcessingTime)

/ / or env.setStreamTimeCharacteristic (TimeCharacteristic.IngestionTime)

In case of event time stream programs, we need to specify the way to assign watermarks and timestamps. There are two ways of assigning watermarks and timestamps: in the case of an event time stream program, we need to specify how to allocate the watermark and timestamp. There are two ways to assign watermarks and timestamps:

Directly from data source attribute Using a timestamp assigner

To work with event time streams, we need to assign the time characteristic as follows to handle the event time stream, we need to assign the time characteristics as follows

In Java:

Final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime

In Scala:

Val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime)

It is always best to store event time while storing the record in source. Flink also supports some pre-defined timestamp extractors and watermark generators. Refer to https://ci.ap ache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractor s.html.

Connectors

Apache Flink supports various connectors that allow data readIwrites across various technologies. Let's learn more about this. Apache Flink supports various connectors that allow data to be read and written across a variety of technologies. Let's learn more about this.

Kafka connector

Kafka is a publish-subscribe, distributed, message queuing system that allows users to publish messages to a certain topic; this is then distributed to the subscribers of the topic. Flink provides options to define a Kafka consumer as a data source in Flink Streaming. In order to use the Flink Kafka connector, we need to use a specific JAR file.

Kafka is a publish-subscribe, distributed, message queuing system that allows users to publish messages to a topic and then distribute it to subscribers to that topic. Flink provides the option to define Kafka users as data sources in Flink Streaming. To use the Flink Kafka connector, we need to use a specific JAR file.

The following diagram shows how the Flink Kafka connector works: the following figure shows how the Flink Kafka connector works:

We need to use the following Maven dependency to use the connector. I have been using Kafka version 0.9 so I will be adding the following dependency in pom.xml: we need to use the following maven dependencies to use connectors. I've been using Kafka version 0. 9, so I'll be in pom. Add the following dependencies to xml:

Org.apache.flink

Flink-connector-kafka-0.9_2.11/artifactId >

1.1.4

Now let's try to understand how to use the Kafka consumer as the Kafka source now let's try to understand how to use Kafka consumers as Kafka sources:

In Java:

Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "localhost:9092"); properties.setProperty ("group.id", "test")

DataStream input = env.addSource (new FlinkKafkaConsumer09 ("mytopic", new SimpleStringSchema (), properties))

In Scala:

Val properties = new Properties (); properties.setProperty ("bootstrap.servers", "localhost:9092")

/ / only required for Kafka 0.8 properties.setProperty ("zookeeper.connect", "localhost:2181"); properties.setProperty ("group.id", "test")

Stream = env

.addSource (new FlinkKafkaConsumer09 [String] ("mytopic", new)

SimpleStringSchema (), properties))

.print

In the preceding code, we first set the properties of the Kafka host and the zookeeper host and port. Next we need to specify the topic name, in this case mytopic. So if any messages get published to the mytopic topic, they will be processed by the Flink streams. In the previous code, we first set the properties of the Kafka host and zookeeper host and port. Next, in this case, we need to specify the topic name mytopic. Therefore, if any messages are published to the topic mytopic, they will be processed by the Flink stream.

If you get data in a different format, then you can also specify your custom schema for deserialization. By default, Flink supports string and JSON deserializers. If you get the data in a different format, you can also specify a custom schema for deserialization. By default, Flink supports string and JSON deserializers.

In order to enable fault tolerance, we need to enable checkpointing in Flink. Flink is keen on taking snapshots of the state in a periodic manner. In the case of failure, it will restore to the last checkpoint and then restart the processing. To enable fault tolerance, we need to enable checkpoints in Flink. Flink is keen on snapshots of periodic (periodic) states. In the event of failure, it reverts to the last checkpoint and then restarts the process.

We can also define the Kafka producer as a sink. This will write the data to a Kafka topic. The following is a way to write data to a Kafka topic: we can also define Kafka producers as slots (channels). This will write the data to the Kafka topic. Here is one way to write data to a Kafka topic:

In Scala:

Stream.addSink (new FlinkKafkaProducer09 ("localhost:9092", "mytopic", new SimpleStringSchema ()

In Java:

Stream.addSink (new FlinkKafkaProducer09 [String] ("localhost:9092", "mytopic", new SimpleStringSchema ()

Use case-sensor data analytics

Now that we have looked at various aspects of DataStream API, let's try to use these concepts to solve a real world use case. Consider a machine which has sensor installed on it and we wish to collect data from these sensors and calculate average temperature per sensor every five minutes.

Now that we have studied all aspects of DataStream API, let's try to use these concepts to solve a real-world use case. Consider a machine with sensors. We want to collect data from these sensors and calculate the average temperature of each sensor every five minutes.

Following would be the architecture:

In this scenario, we assume that sensors are sending information to Kafka topic called temr with information as (timestamp, temperature, sensor-ID). Now we need to write code to read data from Kafka topics and processing it using Flink transformation. In this case, we assume that the sensor sends information to a Kafka topic called temr, which contains information (timestamp, temperature, sensor-ID). Now we need to write code to read the data in the Kafka topic and process it using the Kafka transformation.

Here important thing to consider is as we already have timestamp values coming from sensor, we can use Event Time computations for time factors. This means we would be able to take care of events even if they reach out of order. The important thing to consider here is that since we already have the timestamp value from the sensor, we can use the event time calculation to calculate the time factor. This means that even if events are in a normal state, we can handle them well.

We start with simple streaming execution environment which will be reading data from Kafka. Since we have timestamps in events, we will be writing a custom timestamp and watermark extractor to read the timestamp values and do window processing based on that. Here is code snippet for the same. Let's start with a simple flow execution environment, which will be reading Kafka data. Because there is a timestamp in the event, we will write a custom timestamp and water mark extractor to read the timestamp value and perform window processing on that basis. Here is the same code snippet.

/ / set up the streaming execution environment final StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment ()

/ / env.enableCheckpointing (5000); nv.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "localhost:9092")

Properties.setProperty ("zookeeper.connect", "localhost:2181"); properties.setProperty ("group.id", "test")

FlinkKafkaConsumer09 myConsumer = new FlinkKafkaConsumer09 ("temp", new SimpleStringSchema ()

Properties); myConsumer.assignTimestampsAndWatermarks (new CustomWatermarkEmitter ())

Here we assume that we receive events in Kafka topics as strings and in the format:

Timestamp,Temperature,Sensor-Id

The following an example code to extract timestamp from record:

Public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks {

Private static final long serialVersionUID = 1L

@ Override

Public long extractTimestamp (String arg0, long arg1) {if (null! = arg0 & & arg0.contains (",")) {

String parts [] = arg0.split (","); return Long.parseLong (parts [0])

}

Return 0

}

@ Override

Public Watermark checkAndGetNextWatermark (String arg0, long arg1) {if (null! = arg0 & & arg0.contains (",")) {

String parts [] = arg0.split (",")

Return new Watermark (Long.parseLong (parts [0]))

}

Return null

}

}

Now we simply created keyed data stream and perform average calculation on temperature values as shown in the following code snippet:

DataStream keyedStream = env.addSource (myConsumer) .flatMap (new Splitter ()) .keyby (0)

.timewindow (Time.seconds (300))

.apply (new WindowFunction () {)

@ Override

Public void apply (Tuple key, TimeWindow window, Iterable input, Collector out) throws Exception {

Double sum = 0L; int count = 0

For (Tuple2 record: input) {sum + = record.f1

Count++

}

Tuple2 result = input.iterator () .next (); result.f1 = (sum/count)

}

});

Out.collect (result)

When execute the preceding given code, and if proper sensor events are published on Kafka topics then we will get the average temperature per sensor every five minutes. When executing the code given earlier, if the correct sensor event is posted on the Kafka topic, then we will get the average temperature of each sensor every five minutes.

The complete code is available on GitHub at https://github.com/deshpandetanmay/mast ering-flink/tree/master/chapter02/flink-streaming.

Summary summary

In this chapter, we started with Flink's most powerful API: DataStream API. We looked at how data sources, transformations, and sinks work together. Then we looked at various technology connectors such as ElasticSearch, Cassandra, Kafka, RabbitMQ, and so on.

At the end, we also tried to apply our learning to solve a real-world sensor data analytics use case.

In the next chapter, we are going to learn about another very important API from Flink's ecosystem point of view the DataSet API.

In this chapter, we start with Flink's most powerful API: DataStream API. We studied how data sources, transformations, and receivers work together. Then we looked at various technology connectors, such as ElasticSearch, Cassandra, Kafka, RabbitMQ, and so on. Finally, we try to apply our learning to solve a real sensor data analysis use case. In the next chapter, we will learn about another very important content, DataSet API., from the ecosystem perspective of Flink.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report