Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Kafka of message queue (cluster building and shell operation)

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1.kafka cluster building

Download address of   kafka installation package:

  official website: http://kafka.apache.org/quickstart

  Chinese official website: http://kafka.apachecn.org/quickstart.html

  is on the windows platform, downloaded from the official website: http://mirrors.hust.edu.cn/apache/kafka/1.1.0/

  on centos platform: wget http://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz

(1) basic environment preparation for cluster deployment:

  A: install JDK 1.8

  A: install a zookeeper cluster (you can also use your own ZooKeeper, but not recommended)

(2) Collective construction:

  version: kafka_2.11-1.1.0

  cluster planning: hadoop01, hadoop02, hadoop03 (three nodes)

① unpack the installation package to the corresponding directory

  tar zxvfkafka_2.11-1.1.0.tgz-C / application/

② modify configuration file

  [hadoop@hadoop01 ~] $cd / application/kafka_2.11-1.1.0/config/

  [hadoop@hadoop01 ~] $vim server.properties

   broker.id=5 # # A unique number for each broker node in the current cluster, each node is different

   listeners=PLAINTEXT://:9092

   listeners=PLAINTEXT://hadoop01:9092

Each node of the    host.name=hadoop01## is specified as the current hostname, as well as above

   log.dirs=/home/hadoop/data/kafka-logs # # kafkabroke worker node data storage directory

Default number of partitions for the topic of    num.partitions=1 # # kafka

Maximum retention time of    log.retention.hours=168 # # logs

   zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181 # # zookeeper address

③ batch sending

[hadoop@hadoop01 application] $scp-r / application/kafka_2.11-1.1.0 / hadoop02:$PWD

[hadoop@hadoop01 application] $scp-r / application/kafka_2.11-1.1.0 / hadoop03:$PWD

Note: modify the information of the corresponding broker node in the $KAFKA_HOME/config/server.properties file

Broker.id=your broker id host.name=your broker hostname advertised.listeners=PLAINTEXT:// your broker hostname:9092

④ configuration environment variables

[hadoop@hadoop01 application] $sudo etc/profile

Export KAFKA_HOME=/application/kafka_2.11-1.1.0

[hadoop@hadoop01 application] $source/etc/profile

⑤ starts the cluster for verification (each node should be started)

Nohup kafka-server-start.sh\ / application/kafka_2.11-1.1.0/config/server.properties\ 1 > ~ / logs/kafka_std.log\ 2 > ~ / logs/kafka_err.log & 2.kafka related shell

(1) start the process of each node in the cluster

Nohup kafka-server-start.sh\ / home/hadoop/apps/kafka_2.11-1.1.0/config/server.properties\ 1 > ~ / logs/kafka_std.log\ 2 > ~ / logs/kafka_err.log &

(2) create topic

Kafka-topics.sh\-- create\-- zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181\-- replication-factor 1\-- partitions 1\-- topic kafka_test** parameter introduction * *-- create creates kafka topic--zookeeper specified kafka zookeeper address-- number of partitions specified partitions-- replication-factor specifies the number of copies for each partition

(3) View all kafka topic that have been created

Kafka-topics.sh\-zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181\-list\

(4) View the details of a specified kafka topic

Kafka-topics.sh\-zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181\-describe\ # View details-topic kafka_test # specify the topic to be viewed

The name of the Topic:topic

Partition number of the Partition:topic

Leader: handles messages and reads and writes. Leader is randomly selected from all nodes.

Replicas: lists all replica nodes, regardless of whether they are in service or not.

Isr: the node in service.

(5) enable producer simulation to generate data:

Kafka-console-producer.sh\-broker-list hadoop01:9092\ # Node list of broker-topic kafka_test

(6) enable consumer simulation consumption data:

Kafka-console-consumer.sh\-- zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181\-- from-beginning\ # where to start consumption-- topic kafka_test

(7) View the maximum and minimum offset values of a topic partition

Kafka-run-class.sh\ kafka.tools.GetOffsetShell\-topic kafka_test\-time-1\-broker-list hadoop01:9092\-partitions 1

(8) increase the number of topic partitions (this operation is not allowed)

Kafka-topics.sh\-alter\-zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181\-topic kafka_test\-partitions 5 /-replication-factor 2

(9) Delete Topic

Kafka-topics.sh\-- delete\-- zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181\-- topic kafka_test\ 3.kafka consumer group displacement offset reset

   this section, coincidentally, recently received the project is quasi-real-time (OOG+kafka+stream), and colleagues also talked to me about the blog suggestions, generally speaking, the blog is too simple, although easy to understand, but the real practical application is not much, it is suggested that I can add some complex and often used things; to put it bluntly, I am lazy to find information, want to see the editor ready-made I can't help it, so add some more commonly used ones.

   first introduces the editor. After kafka consumes topic data through group, how to customize the location of kafka data consumption? the previous operation is "--from-beginning", and each time it is consumed from scratch. If the consumption statement is accurate once, how should it be done?

   here arbitrarily sets the displacement of the consumer group (consumer group) by using the kafka-consumer-groups.sh script that comes with Kafka. It is important to emphasize that this is a new feature provided by versions 2.11-0.11.0.0 and is only applicable to the new version of consumer. Before the new version, if you want to adjust the displacement for the existing consumer group, you have to manually write Java programs to call the KafkaConsumer#seek method, which is time-consuming and easy to make mistakes. Version 0.11.0.0 enriches the functionality of the kafka-consumer-groups script, which can be used directly to easily reset the displacement of an existing consumer group, but only if the consumer group must be inactive, that is, it cannot be in a working state.

  , here are the three steps to reset the displacement:

   -determine the scope of consumer group under topic

      →-- all-topics: adjust the displacement for all partitions of all topic under consumer group

      →-- topic T1-- topic T2: adjust the displacement for all partitions of a specified number of topic

      →-- topic T1 → 0Magiol 1 Magi 2: adjust the displacement for the specified topic partition

   -determine the displacement reset strategy

      →-- to-earliest: adjusts the displacement to the current minimum displacement of the partition

      →-- to-latest: adjust the displacement to the current latest displacement of the partition

      →-- to-current: adjust the displacement to the current displacement of the zone

      →-- to-offset: adjust the displacement to the specified displacement

      →-- shift-by N: adjust the displacement to the current displacement + N, note that N can be negative, indicating moving forward

      →-- to-datetime: adjust the displacement to the earliest displacement greater than a given time. The datetime format is yyyy-MM-ddTHH:mm:ss.xxx, such as 2017-08-04T00:00:00.000.

      →-- by-duration: adjusts the displacement to the displacement at the specified interval at the current time. The duration format is PnDTnHnMnS, such as PT0H5M0S.

   -determine the implementation plan

      → does not add any parameters: just print out the displacement adjustment plan, without specific implementation

      →-- execute: perform true displacement adjustment

      →-export: print the displacement adjustment scheme according to CSV format, which is convenient for users to turn into csv files for subsequent direct use.

   specific case demonstration:

# create topic and produce data to it

① create topickafka-topics.sh\-- create\-- zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181\-- replication-factor 1\-- partitions 3\-- topic 'test-group' ② create producer Production data kafka-producer-perf-test.sh\-- topic 'test-group'\-- num-records 500\-- throughput-1\-- record-size 100\-- producer-props bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092 acks=-1 ③ starts a console consumer program The group name is set to test-group:kafka-console-consumer.sh\-- bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092\-- topic 'test-group'\-- from-beginning\-- consumer-property group.id=test-group ④ to view the details of topic consumed by the current consumer group kafka-consumer-groups.sh\-- bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092\-- group test-group\-- describe

As can be seen from the figure, current consumers consume all the data in topic. LAG represents the remaining unconsumed message.

# case demonstration

# #-- to-earliest: set the offset to the beginning of partition kafka-consumer-groups.sh\-- bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092\-- group test-group\-- reset-offsets\-- all-topics\-- to-earliest\-- execute##-- to-latest: adjust the displacement to the latest displacement of the partition kafka-consumer-groups.sh\-- bootstrap-server hadoop01:9092,hadoop02:9092 Hadoop03:9092\-- group test-group\-- reset-offsets\-- all-topics\-- to-latest\-- execute##-- to-offset: adjust the displacement to the specified displacement kafka-consumer-groups.sh\-- bootstrap-server hadoop01:9092,hadoop02:9092 Hadoop03:9092\-- group test-group\-- reset-offsets\-- all-topics\-- to-offset 100\-- execute # #-to-current: adjust the displacement to the current displacement of the partition kafka-consumer-groups.sh\-- bootstrap-server hadoop01:9092,hadoop02:9092 Hadoop03:9092\-- group test-group\-- reset-offsets\-- all-topics\-- to-current\-- execute #-- shift-by N adjusts the displacement to the current displacement + N Note that N can be negative. Indicates moving forward kafka-consumer-groups.sh\-- bootstrap-server hadoop01:9092,hadoop02:9092 Hadoop03:9092\-- group test-group\-- reset-offsets\-- all-topics\-- shift-by-100\-- execute # #-- to-datetime: adjust the offset to the earliest shift greater than the XX date out of kafka-consumer-groups.sh\-- bootstrap-server hadoop01:9092,hadoop02:9092 Hadoop03:9092\-- group test-group\-- reset-offsets\-- all-topics\-- to-datetime 2019-07-31T03:40:33.000 # #-- by-duration adjusts the displacement to kafka-consumer-groups.sh\-- bootstrap-server hadoop01:9092,hadoop02:9092 from the displacement at the specified interval at the current time Hadoop03:9092\-group test-group\-reset-offsets\-all-topics\-by-duration PT0H20M0S

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report