In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
How to store Kafka in zookeeper, I believe many inexperienced people don't know what to do about it. Therefore, this article summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.
1. Kafka stores structure diagrams in zookeeper
2. Analyze 2.1topic registration information
/ brokers/topics/ [topic]:
Store all partitions allocation information for a topic
[zk: localhost:2181 (CONNECTED) 1] get / brokers/topics/topic2
Schema: {"version": "version number is currently fixed to the number 1", "partitions": {"partitionId number": [synchronization copy group brokerId list], "partitionId number": [synchronization copy group brokerId list],. }} Example: {"version": 1, "partitions": {"2": [1,2,3], "1": [0,1,2], "0": [3,0,1],}}
Schema: {"controller_epoch": indicates the number of central controller elections in the kafka cluster, "leader": indicates the brokerId of the partition election leader, "version": version number defaults to 1, "leader_epoch": the partition leader election times, "isr": [synchronous replica group brokerId list]} Example: {"controller_epoch": 1, "leader": 3, "version": 1, "leader_epoch": 0, "isr": [3,0,1]}
Schema: {"jmx_port": jmx port number, "timestamp": timestamp when kafka broker is initially started, "host": host name or ip address, "version": version number defaults to 1, "port": kafka broker service port number, determined by parameter port in server.properties} Example: {"jmx_port":-1, "timestamp": "1525741823119" version ": 1," host ":" hadoop1 "," port ": 9092}
2.5 Controller registration information
/ controller-> int (broker id of the controller) stores information about the kafka broker where the center controller central controller is located
Schema: {"version": version number defaults to 1, "brokerid": broker unique number in kafka cluster, "timestamp": timestamp when kafka broker central controller changes} Example: {"version": 1, "brokerid": 0, "timestamp": "1525741822769"}
a. When each consumer client is created, it registers its own information with zookeeper
b. This function is mainly for load balancing.
c. The Consumers,Kafka in the same Consumer Group sends each message in the corresponding Topic to only one of the Consumer.
Each Consumer in d.Consumer Group reads one or more Partitions of Topic and is a unique Consumer
e. All threads of multiple consumer of a Consumer group sequentially consume all partitions of a topic. If the total number of consumer threads in the Consumer group is greater than the number of partitions, an idle condition will occur.
Examples are as follows:
Create a topic in the kafka cluster as report-log 4 partitions with index number 0meme 1 mine2 Magi 3
If there are currently three consumer node: note-- > A consumer thread in a consumer can consume one or more partition.
If each consumer creates a consumer thread thread, the consumption of each node is as follows. The node1 consumption index number is 0jue 1 partition, and the node2 fee index number is 2min Node3 fee index number 3.
If 2 consumer thread threads are created for each consumer, the consumption of each node is as follows (determined by the startup status of consumer node), the node1 consumption index number is 0Magne1 partition, and the node2 fee index number is 2J3 and the idle state is 3
Summary:
As can be seen from the above, each consumer in Consumer Group consumes all the partitions of a topic sequentially according to the order in which it is started.
If the number of buses in all the consumer in the Consumer Group is greater than the number of partitions, it is possible that the consumer thread or consumer will be idle.
2.7 Consumer equalization algorithm
When consumer joins or leaves a group, partitions equilibrium is triggered. The ultimate goal of equilibrium is to improve the concurrent consumption capacity of topic.
1) if topic1, you have the following partitions: P0, P1, P2, P3.
2) add the following consumer to group: C0Magic C1
3) first, the partitions is sorted according to the partition index number: P0, P1, P2, P3.
4) sort by (consumer.id +'-'+ thread serial number): C0MagneC1
5) multiple of calculation: M = [P0Magne P1 P2MagneP3]. Size / [C0MagneC1] .size, the value of this example is Menda2 (rounded up)
6) then assign partitions: C0 = [P0 _ 1], C1 = [P2 ~ P3], that is, Ci = [P (I * M), P ((I + 1) * M-1)]
2.8Consumer registration information
Each consumer has a unique ID (consumerId can be specified through a configuration file or generated by the system), and this id is used to mark consumer information.
/ consumers/ [groupId] / ids/ [consumerIdString]
Is a temporary znode. The value of this node is see consumerIdString generation rules, that is, the list of topic + partitions currently consumed by this consumer.
ConsumerId generation rules:
StringconsumerUuid = null
If (config.roomerIdroomnull & & config.consumerId)
ConsumerUuid = consumerId
Else {
String uuid = UUID.randomUUID ()
ConsumerUuid = "% s-%d-%s" .format (
InetAddress.getLocalHost.getHostName, System.currentTimeMillis
Uuid.getMostSignificantBits () .toHexString.substring (0Jing 8)
}
String consumerIdString = config.groupId + "_" + consumerUuid
[zk: localhost:2181 (CONNECTED) 11] get / consumers/console-consumer-2304/ids/console-consumer-2304_hadoop2-1525747915241-6b48ff32
Schema: {"version": version number defaults to 1, "subscription": {/ / subscribe topic list "topic name": number of topic consumer threads in consumer}, "pattern": "static", "timestamp": "consumer startup timestamp"} Example: {"version": 1, "subscription": {"topic2": 1}, "pattern": "white_list", "timestamp": "1525747915336"}
2.10 Consumer offset
/ consumers/ [groupId] / offsets/ [topic] / [partitionId]-> long (offset)
Used to track the largest partition currently consumed by each consumer
This znode is a persistent node, and it can be seen that offset is related to group_id to indicate that when a consumer in the consumer group (consumer group) fails.
Re-trigger balance, other consumer can continue to consume.
2.11 Re-assign partitions
/ admin/reassign_partitions
{"fields": [{"name": "version", "type": "int", "doc": "version id"}, {"name": "partitions", "type": {"type": "array" "items": {"fields": [{"name": "topic", "type": "string", "doc": "topic of the partition to be reassigned"} {"name": "partition", "type": "int", "doc": "the partition to be reassigned"}, {"name": "replicas", "type": "array" "items": "int", "doc": "a list of replica ids"}],} "doc": "an array of partitions to be reassigned to new replicas"}}]} Example: {"version": 1 "partitions": [{"topic": "Foo", "partition": 1, "replicas": [0,1,3]}]}
{"fields": [{"name": "version", "type": "int", "doc": "version id"}, {"name": "partitions", "type": {"type": "array" "items": {"fields": [{"name": "topic", "type": "string", "doc": "topic of the partition for which preferred replica election should be triggered"} {"name": "partition", "type": "int", "doc": "the partition for which preferred replica election should be triggered"}] } "doc": "an array of partitions for which preferred replica election should be triggered"}]} example: {"version": 1, "partitions": [{"topic": "Foo", "partition": 1}, {"topic": "Bar" "partition": 0}]}
Schema: {"fields": [{"name": "version", "type": "int", "doc": "version id"}, {"name": "topics", "type": {"type": "array", "items": "string", "doc": "an array of topics to be deleted"}]} example: {"version": 1 "topics": ["foo", "bar"]}
2.14 Topic configuration
/ config/topics/ [topic _ name]
After reading the above, have you mastered how to store Kafka in zookeeper? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.