Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to write data in batches to hdfs using kafka connect

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

Editor to share with you how to use kafka connect to write data to hdfs in batches, I believe most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's learn about it!

Kafka-connect runs in single-node mode, or standalone.

one。 First of all, give a brief introduction to kafka and kafka connect.

Kafka:Kafka is a high-throughput distributed publish and subscribe messaging system that can handle all action flow data in consumer-scale websites. The more intuitive explanation is that it has a producer (producer) and a consumer (consumer). Think of kafka as a data container in which the producer is responsible for sending data, while the consumer takes the data out of the container and processes the data, such as storing it in hdfs.

Kafka connect:Kafka Connect is a tool for scalable and reliable streaming of data between Kafka and other systems. It makes it easy to quickly define connectors that move large sets of data into and out of Kafka. It is suitable for batch data import and export operation.

two。 Here's how to write data to hdfs using kafka connect. It includes the explanation of some problems that may be encountered in the process.

Start kafka-connect first:

The last two arguments to the bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties command

The first is to specify the startup mode, there are distributed and single node, here is a single node. Kafka comes with it and puts it in the config directory.

The second parameter points to a file that describes the properties of the connector, which can be multiple, where only one connector is used to write to the hdfs. You need to create it yourself.

Let's take a look at the contents of connector1.properties.

Name= "test" # the name of the connector

# package the code written according to the connect interface specification and put it in the kafka/libs directory, and then reference the corresponding connector according to the project structure

Connector.class=hdfs.HdfsSinkConnector#Task is the specific implementation of import and export. Here, how many task are specified to run the import and export job in parallel, which is implemented by multithreading. Since one file in hdfs can only operate one file at a time, it can only be 1tasks.max=1.

# specify which topic to read data from, which is actually used to read in the code of connector or task.

Topics=test# specifies how key is converted, which is consistent with the serialization specified by the Producer sender

Key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

The url path of value.converter=org.apache.kafka.connect.json.JsonConverter # ditto hdfs.url=hdfs://127.0.0.1:9000 # hdfs will be read in Connector

Hdfs.path=/test/file # hdfs file path, also read in Connector

Key.converter.schemas.enable=true # will be introduced later. It can be true or false, which affects the transmission format.

Value.converter.schemas.enable=true # will be introduced later, either true or false

three。 Next look at the code, connect is mainly to import and export two concepts, import is source, export when Sink. Only Sink is used here, but the implementations of Source and Sink are basically the same.

In fact, it is not difficult to implement Sink. Implement the corresponding interfaces, namely SinkConnector and SinkTask, and package them into the kafka/libs directory. Among them, there is only one SinkConnector, and how much Task can start with Connector

Public class HdfsSinkConnector extends SinkConnector {/ / these two items are the configuration items for configuring urlh and path of hdfs. You need to specify public static final String HDFS_URL = "hdfs.url" and public static final String HDFS_PATH = "hdfs.path" in connector1.properties. Private static final ConfigDef CONFIG_DEF = new ConfigDef () .define (HDFS_URL, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs url") .define (HDFS_PATH, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs path"); private String hdfsUrl; private String hdfsPath; @ Override public String version () {return AppInfoParser.getVersion ();}

/ / the start method will be executed again at the initial time, which is mainly used to configure @ Override public void start (Map props) {hdfsUrl = props.get (HDFS_URL); hdfsPath = props.get (HDFS_PATH);} / the class @ Override public Class of Task is specified here

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report