Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to operate data from sink to kafka

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces how to operate data from sink to kafka. The content is very detailed. Interested friends can use it for reference. I hope it will be helpful to you.

Let's experience the operation of sink data to kafka.

Version and environment preparation

The environment and version of this actual combat are as follows:

JDK:1.8.0_211

Flink:1.9.2

Maven:3.6.0

Operating system: macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)

IDEA:2018.3.5 (Ultimate Edition)

Kafka:2.4.0

Zookeeper:3.5.5

Please make sure that the above environment and services are ready.

Source code download

If you don't want to write code, the source code for the entire series can be downloaded from GitHub. The address and link information are shown in the following table (https://github.com/zq2599/blog_demos):

Name Link Note Project Home Page https://github.com/zq2599/blog_demos the home page of the project on GitHub git warehouse address (https) https://github.com/zq2599/blog_demos.git the warehouse address of the project source code, https protocol git warehouse address (ssh) git@github.com:zq2599/blog_demos.git the warehouse address of the project source code, ssh protocol

There are multiple folders in this git project. The application of this chapter is under the flinksinkdemo folder, as shown in the red box below: when you are ready, start the development.

Preparatory work

Before officially coding, go to the official website to check the relevant information to understand the basic situation:

Address: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html

The kafka I use here is version 2.4.0. Look for the corresponding libraries and classes in the official documents, as shown in the red box below:

Kafka preparation

Create a topic named test006 with four partitions, refer to the command:

. / kafka-topics.sh\-- create\-- bootstrap-server 127.0.0.1 bootstrap-server 9092\-- replication-factor 1\-- partitions 4\-- topic test006

To consume test006 messages in the console, refer to the command:

. / kafka-console-consumer.sh\-- bootstrap-server 127.0.0.1 topic test006

At this point, if a message comes in from the topic, it will be output on the console.

Next, start coding.

Create a project

Create a flink project with the maven command:

Mvn\ archetype:generate\-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java\-DarchetypeVersion=1.9.2

According to the prompt, enter com.bolingcavalry,artifactid for groupid and enter flinksinkdemo to create a maven project.

Add kafka dependency libraries to pom.xml:

Org.apache.flink flink-connector-kafka_2.11 1.9.0

After the project is created, start writing the code for the flink task

Sink that sends string messages

First try to send a message of type string:

Create an implementation class for the KafkaSerializationSchema interface, which will be used as a parameter for creating a sink object:

Package com.bolingcavalry.addsink;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import org.apache.kafka.clients.producer.ProducerRecord;import java.nio.charset.StandardCharsets;public class ProducerStringSerializationSchema implements KafkaSerializationSchema {private String topic; public ProducerStringSerializationSchema (String topic) {super (); this.topic = topic;} @ Override public ProducerRecord serialize (String element, Long timestamp) {return new ProducerRecord (topic, element.getBytes (StandardCharsets.UTF_8);}}

To create a task class KafkaStrSink, please pay attention to the parameters of the FlinkKafkaProducer object. FlinkKafkaProducer.Semantic.EXACTLY_ONCE means strict once:

Package com.bolingcavalry.addsink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class KafkaStrSink {public static void main (String [] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / parallelism is 1 env.setParallelism (1); Properties properties = new Properties () Properties.setProperty ("bootstrap.servers", "192.168.50.43 List 9092"); String topic = "test006"; FlinkKafkaProducer producer = new FlinkKafkaProducer (topic, new ProducerStringSerializationSchema (topic), properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); / / create a List with two Tuple2 elements List list = new ArrayList () List.add ("aaa"); list.add ("bbb"); list.add ("ccc"); list.add ("ddd"); list.add ("eee"); list.add ("fff"); list.add ("aaa") / / count the number of each word env.fromCollection (list) .addSink (producer) .setParallelism (4); env.execute ("sink demo: kafka str");}}

Use the mvn command to compile and build, and get the file flinksinkdemo-1.0-SNAPSHOT.jar in the target directory

Submit the flinksinkdemo-1.0-SNAPSHOT.jar on the web page of flink, and make the execution class, as shown below:

After a successful submission, if the flink has four available slot, the task will be executed immediately, and the message will be received at the terminal that consumes the kafak message, as shown below:

The implementation of the task is shown below:

The sink that sends the object message

Let's try how to send a message of object type, where the object selects the commonly used Tuple2 object:

Create an implementation class for the KafkaSerializationSchema interface, which will later be used as an input parameter to the sink object. Note the note in the code that catches the exception: be careful with printStackTrace ()!!

Package com.bolingcavalry.addsink;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import org.apache.kafka.clients.producer.ProducerRecord;import javax.annotation.Nullable;public class ObjSerializationSchema implements KafkaSerializationSchema {private String topic; private ObjectMapper mapper Public ObjSerializationSchema (String topic) {super (); this.topic = topic;} @ Override public ProducerRecord serialize (Tuple2 stringIntegerTuple2, @ Nullable Long timestamp) {byte [] b = null; if (mapper = = null) {mapper = new ObjectMapper ();} try {b = mapper.writeValueAsBytes (stringIntegerTuple2) } catch (JsonProcessingException e) {/ / Note, this is a very dangerous operation in the production environment. / / too many misprints will seriously affect the system performance. Please adjust e.printStackTrace ();} return new ProducerRecord (topic, b);}} according to the production environment.

Create a flink task class:

Package com.bolingcavalry.addsink;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class KafkaObjSink {public static void main (String [] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); / / parallelism is 1 env.setParallelism (1) Properties properties = new Properties (); / / broker address of kafka properties.setProperty ("bootstrap.servers", "192.168.50.43 properties.setProperty"); String topic = "test006"; FlinkKafkaProducer producer = new FlinkKafkaProducer (topic, new ObjSerializationSchema (topic), properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE) / / create a List with two Tuple2 elements List list = new ArrayList (); list.add (new Tuple2 ("aaa", 1)); list.add (new Tuple2 ("bbb", 1)); list.add (new Tuple2 ("ccc", 1)); list.add (new Tuple2 ("ddd", 1)); list.add (new Tuple2 ("eee", 1)) List.add (new Tuple2 ("fff", 1)); list.add (new Tuple2 ("aaa", 1)); / / count the number of each word env.fromCollection (list) .keyby (0) .sum (1) .addSink (producer) .setParallelism (4); env.execute ("sink demo: kafka obj") }}

Compile the build as in the previous task, submit the jar to flink, and specify that the execution class is com.bolingcavalry.addsink.KafkaObjSink

The console output for consuming kafka messages is as follows:

The implementation can be seen on the web page as follows: at this point, the actual combat that flink sends the calculation results as kafka messages is completed. I hope I can provide you with a reference. In the following chapters, we will continue to experience the official sink capabilities.

On how to carry out data sink to kafka operations to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report