In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces how to connect Connectors to Kafka in Flink. It is very detailed and has a certain reference value. Friends who are interested must read it!
Connect to the document database Index of the ElasticSearch search engine by using the Flink DataStream Connectors data stream connector and provide data stream input and output operations
Sample environment
Java.version: 1.8.xflink.version: 1.11.1kafka:2.11
Data stream input
DataStreamSource.java
Package com.flink.examples.kafka;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment Import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties / * * @ Description consumes data from Kafka * / public class DataStreamSource {/ * official document: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html * / public static void main (String [] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () / / set parallelism (using several CPU cores) env.setParallelism (1); / / launch a checkpoint env.enableCheckpointing every other 2000ms (2000); / / set the mode to exactly-once env.getCheckpointConfig (). SetCheckpointingMode (CheckpointingMode.EXACTLY_ONCE); / / ensure that there is a progress of 500 ms between checkpoints env.getCheckpointConfig (). SetMinPauseBetweenCheckpoints (500) / / 1. The consumer client connects to kafka Properties props = new Properties (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG"); props.put (ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000); props.put (ConsumerConfig.GROUP_ID_CONFIG, "consumer-45"); props.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") FlinkKafkaConsumer consumer = new FlinkKafkaConsumer ("test", new SimpleStringSchema (), props); / / setStartFromEarliest () starts consumption from the earliest data, ignoring stored offset information / / consumer.setStartFromEarliest (); / / Flink starts consumption from the time specified in topic, and data before the specified time point is ignored / / consumer.setStartFromTimestamp (1559801580000L) / / Flink starts consuming / / consumer.setStartFromLatest () from the latest data in topic; / / Flink starts consuming from the location where group was last consumed in topic, so you must configure the group.id parameter / / consumer.setStartFromGroupOffsets (); / / 2. Process DataStream sourceStream = env.addSource (consumer) .filter ((FilterFunction) value-> StringUtils.isNotBlank (value)) .map ((MapFunction) value-> {System.out.println ("print:" + value) in the operator / / Note: since the enableCheckpointing fault-tolerant periodic check status mechanism is enabled, when an error occurs in the operator, / / the data flow will return to the state of the latest checkpoint, and the messages in the Kafka will be re-consumed from the offset stored in the checkpoint. / / therefore, it is possible to lead to repeated consumption of data, repeat errors, and fall into an endless cycle. Add try | catch to catch the error and then output it correctly. Gson gson = new Gson (); try {TUser user = gson.fromJson (value, TUser.class); return user;} catch (Exception e) {System.out.println ("error:" + e.getMessage ()) } return new TUser ();}) .returns (TUser.class); sourceStream.print (); / / 3. Execute env.execute ("flink kafka source");}}
Data stream output
DataStreamSink.java
Package com.flink.examples.kafka;import com.flink.examples.TUser;import com.google.gson.Gson;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig Import java.util.Properties;/** * @ Description writes producer data to kafka * / public class DataStreamSink {/ * official document: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html * / public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment () / / setParallelism parallelism must be set, otherwise env.setParallelism (1) will not be output; / / checkpoint env.enableCheckpointing (2000) is started every other 2000ms; / / the mode is set to exactly-once env.getCheckpointConfig () .setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE); / / ensure that there is a progress of 500ms between checkpoints env.getCheckpointConfig () .setMinPauseBetweenCheckpoints (500) / / checkpoints must be completed within one minute, or be discarded env.getCheckpointConfig () .setCheckpointTimeout (60000); / / only one checkpoint env.getCheckpointConfig () .checkpoint (1) is allowed at a time; / / 1. Connect kafka Properties props = new Properties (); props.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG"); FlinkKafkaProducer producer = new FlinkKafkaProducer ("test", new SimpleStringSchema (), props); / / 2. Create data and write it to the stream TUser user = new TUser (); user.setId (8); user.setName ("liu3"); user.setAge (22); user.setSex (1); user.setAddress ("CN"); user.setCreateTimeSeries (1598889600000L); DataStream sourceStream = env.fromElements (user). Map ((MapFunction) value-> new Gson (). ToJson (value)) / / 3. Input data flow to kafka sourceStream.addSink (producer); sourceStream.print (); env.execute ("flink kafka sink");}}
Create a topic named test on kafka
First start DataStreamSource.java to get the output stream, then start the DataStreamSink.java input stream
Data presentation
The above is all the content of the article "how to connect Connectors to Kafka in Flink". Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.