Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to import data into Elasticsearch in Kafka

2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you how to import data into Elasticsearch in Kafka. The content is concise and easy to understand, which will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

As the current mainstream full-text retrieval engine, Elasticsearch not only has strong full-text retrieval ability and high expansibility, but also has the ability to be compatible with a variety of data sources. The strong data source compatibility of Elasticsearch mainly comes from Logstash, one of its core components. Logstash implements the input and output of a variety of data sources in the form of plug-ins. Kafka is a high-throughput distributed publish and subscribe messaging system. It is a common data source and one of the many input and output sources supported by Logstash. From a practical point of view, we will study the process of using Logstash Kafka Input plug-ins to import data from Kafka into Elasticsearch.

Use the Logstash Kafka plug-in to connect Kafka and Elasticsearch

1 introduction to the Logstash Kafka input plug-in

The Logstash Kafka Input plug-in uses Kafka API to read data from Kafka topic. When using it, you need to pay attention to whether the version of Kafka and the corresponding plug-in version are consistent. The plug-in supports connecting to Kafka through SSL and Kerveros SASL. In addition, the plug-in provides group management and uses the default offset management policy to operate Kafka topic.

By default, Logstash uses a separate group to subscribe to Kafka messages, and each Logstash Kafka Consumer uses multiple threads to increase throughput. Of course, you can also use the same group_id for multiple Logstash instances to balance the load. It is also recommended that the number of Consumer be set to the size of Kafka partitions to provide better performance.

2 the test environment is ready to create an Elasticsearch cluster

To simplify the building process, Tencent Cloud Elasticsearch service is used in this article. Tencent Cloud Elasticsearch service can not only quickly build Elasticsearch clusters, but also provide built-in Kibana, cluster monitoring, dedicated master nodes, Ik word segmentation plug-ins and other features, which greatly simplifies the creation and management of Elasticsearch clusters.

2.2 create a Kafka service

Tencent Cloud CKafka is used to build Kafka services. Like Elasticsearch Service, Tencent Cloud CKafka enables the rapid creation of Kafka services, which is 100% compatible with open source Kafka API (version 0.9).

2.3 Server

In addition to preparing Elasticsearch and Kafka, you also need to prepare a server to run Logstash to connect to Elasticsearch and Kafka. This article uses Tencent Cloud CVM server

2.4 considerations

1) Elasticsearch, Kafka and server need to be created under the same network in order to achieve network interconnection. Since this article uses Tencent Cloud-related technical services, you only need to create Elasticsearch service,CKafka and CVM under the same VPC.

2) pay attention to obtaining the private network addresses and ports of Elasticsearch serivce,CKafka and CVM so that subsequent services can use them

In this test:

Service ip port

Elasticsearch service192.168.0.89200Ckafka192.168.13.109092CVM192.168.0.13-

3 use Logstash to connect Elasticsearch and Kafka 3.1 Kafka preparation

Please refer to [introduction to CKafka]

Follow the above tutorial

1) create a topic named kafka_es_test

2) install JDK

3) install the Kafka toolkit

4) create producer and consumer to verify kafka function

3.2 install Logstash

For the installation and use of Logstash, please see [an article on getting started with Logstash]

Configure the Logstash Kafka input plug-in

Create the kafka_test_pipeline.conf file as follows:

Input {kafka {bootstrap_servers= > "192.168.13.10 kafka_es_test 9092" topics= > ["kafka_es_test"] group_id= > "logstash_kafka_test"} output {elasticsearch {hosts= > ["192.168.0.8 bootstrap_servers="]}

An input for kafka and an output for elasticsearch are defined.

The above three parameters are required for the Kafka input plug-in. In addition, there are some parameters that adjust the behavior of the plug-in, such as:

Auto_commit_interval_ms is used to set the time interval for Consumer to submit offset to Kafka

Consumer_threads is used to set the number of threads for Consumer. The default is 1. In practice, the setting should be the same as the number of Kafka Topic partitions.

Fetch_max_wait_ms is used to specify the maximum time that Consumer waits for an fetch request to reach fetch_min_bytes

Fetch_min_bytes is used to specify the minimum amount of data that should be returned by an Consumer fetch request

Topics_pattern is used for a set of topic that conforms to a rule through regular subscriptions

For more parameters, please see [Kafka Input Configuration Options]

3.4 start Logstash

The following operations are done in the Logstash root directory

1) verify the configuration

. / bin/logstash-f kafka_test_pipeline.conf-- config.test_and_exit

If there are any errors, follow the prompts to modify the configuration file. If the configuration is correct, you will get the following results

Sending Logstash's logs to / root/logstash-5.6.13/logs which is now configured via log4j2.properties [2018-11-11T15:24:01598] [INFO] [logstash.modules.scaffold] Initializing module {: module_name= > "netflow", directory= > "/ root/logstash-5.6.13/modules/netflow/configuration"} [2018-11-11T15:24:01603] [INFO] [logstash.modules.scaffold] Initializing module {: module_name= > "fb_apache" Directory= > "/ root/logstash-5.6.13/modules/fb_apache/configuration"} Configuration OK [2018-11-11T15:24:01746] [INFO] [logstash.runner] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

2) start Logstash

. / bin/logstash-f kafka_test_pipeline.conf-- config.reload.automatic

Observe if there are any errors in the log and deal with them in time.

3.4 start Kafka Producer

The following operations are done under the root directory of the Kafka toolkit

. / bin/kafka-console-producer.sh-- broker-list 192.168.13.10 topic kafka_es_test

Write test data

This is a message

3.5 Kibana verification results

Log in to Elasticsearch corresponding to Kibana, and do the following in Dev Tools

1) View Index

GET _ cat/indices

You can see that an index named logstash-xxx.xx.xx has been created successfully

Green open .kibana QUw45tN0SHqeHbF9-QVU6A 11 1 0 5.5kb 2.7kbgreen open logstash-2018.11.11 DejRdNJVQ1e1MwbyJjJjLw 5 11 0 8.7kb 4.3kb

2) View the written data

GET logstash-2018.11.11/_search

You can see that the data has been successfully written

{"took": 0, "timed_out": false, "_ shards": {"total": 5, "successful": 5, "skipped": 0, "failed": 0}, "hits": {"total": 1, "max_score": 1, "hits": [{"_ index": "logstash-2018.11.11", "_ type": "logs" "_ id": "AWcBsEegMu-Dkjm1ap3H", "_ score": 1, "_ source": {"message": "This is a message", "@ version": "1", "@ timestamp": "2018-11-11T07:33:09.079Z"}]}}

As the core component of data acquisition and processing in Elastic Stack, Logstash provides powerful data source compatibility for Elasticsearch. From the test process, it can be seen that the connection process between kafka and Elaticsearch using Logstash is quite simple and convenient. In addition, the data processing function of Logstash also makes the system using this architecture have natural advantages in data mapping and processing.

The above is how to import data into Elasticsearch in Kafka. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report