Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

If Kafka does not stop, how to migrate ZooKeeper clusters without awareness?

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Kafka is widely used in Yelp. Yelp sends billions of messages through various clusters every day. Behind this, Kafka uses Zookeeper to complete a variety of distributed coordination tasks.

Because Yelp relies heavily on Kafka, the question is whether it can switch Zookeeper clusters without attracting the attention of Kafka and other Zookeeper users. This article will reveal the answer.

Kafka is widely used in Yelp. In fact, we send billions of messages every day through various clusters. Behind this, Kafka uses Zookeeper for various distributed coordination tasks, such as deciding which Kafka broker is responsible for assigning partition leaders and storing metadata about topics in broker.

The successful application of Kafka in Yelp shows that our cluster has experienced significant growth since its first deployment of Kafka. At the same time, the size of other Zookeeper heavy users (such as Smartstack and PaasTA) is also growing, adding a lot of burden to our shared Zookeeper cluster. To alleviate this situation, we decided to use a dedicated Zookeeper cluster for our Kafka cluster.

Because we rely heavily on Kafka, any downtime caused by maintenance can lead to a chain reaction, such as delays in the dashboard displayed to the business owner and log accumulation on the server. Then the question arises: can we switch Zookeeper clusters without attracting the attention of Kafka and other Zookeeper users?

Zookeeper mitosis

After several rounds of discussion and brainstorming between teams about Kafka and Zookeeper, we found a way to achieve our goal: to have the Kafka cluster use a dedicated Zookeeper cluster without causing Kafka downtime.

Our proposed scheme can be compared to natural cell mitosis: we copy Zookeeper hosts (i.e. DNA), and then use firewall rules (i.e. cell walls) to divide the replicated hosts into two separate clusters.

The main event in mitosis, the division of chromosomes in the nucleus.

Let's delve into the details step by step. In this article, we will use the source cluster and the target cluster, where the source cluster represents the existing cluster and the target cluster represents the new cluster to which Kafka will migrate. The example we are going to use is a Zookeeper cluster with three nodes, but the process itself can be used for any number of nodes.

Our example will use the following IP address for the Zookeeper node:

Source 192.168.1.1-3

Target 192.168.1.4-6

Stage 1: DNA replication

First, we need to start a new Zookeeper cluster. The target cluster must be empty because during the migration, the content in the target cluster will be deleted.

Then, we combine two nodes in the target cluster and three nodes in the source cluster to get a Zookeeper cluster with five nodes. The reason for this is that we want the data (originally saved by Kafka in the source Zookeeper cluster) to be replicated to the target cluster. Zookeeper's replication mechanism automates the replication process.

Combine nodes from source and target clusters

The zoo.cfg file for each node now looks like this, containing all nodes in the source cluster and two nodes in the target cluster:

Server.1=192.168.1.1:2888:3888

Server.2=192.168.1.2:2888:3888

Server.3=192.168.1.3:2888:3888

Server.4=192.168.1.4:2888:3888

Server.5=192.168.1.5:2888:3888

Note that a node from the target cluster (192.168.1.6 in the above example) remains dormant during this process and is not part of the federated cluster, and Zookeeper is not running on it, in order to maintain the quorum of the source cluster.

At this point, the federated cluster must be restarted. Be sure to perform a rolling restart (one node at a time, at least 10 seconds apart), starting with two nodes from the target cluster. This order ensures that the quorum of the source cluster is not lost and that it is available to other clients, such as Kafka, when new nodes join the cluster.

After a rolling restart of the Zookeeper node, Kafka knows nothing about the new node in the federated cluster because its Zookeeper connection string is only the IP address of the original source cluster:

Zookeeper.connect=192.168.1.1192.168.1.2192.168.1.3/kafka

The data sent to Zookeeper is now copied to the new node, and Kafka doesn't even notice.

Now that the data is synchronized between the source cluster and the target cluster, we can update the connection string of Kafka to point to the target cluster:

Zookeeper.connect=192.168.1.4192.168.1.5192.168.1.6/kafka

A rolling Kafka restart is required to obtain a new connection, but do not perform an overall downtime.

Stage 2: mitosis

The first step in splitting the federated cluster is to restore the configuration files (zoo.cfg) of the original source Zookeeper and the target Zookeeper, as they reflect the final state required by the cluster. Note that the Zookeeper service should not be restarted at this time.

We use firewall rules to perform mitosis, dividing our federated cluster into different source and target clusters, each with its own leader. In our example, we use iptables to do this, but the firewall system that can be enforced between two Zookeeper cluster hosts should be possible.

For each target node, we run the following command to add iptables rules:

$source_node_list = 192.168.1.1192.168.1.2192.168.1.3

Sudo / sbin/iptables-v-An INPUT-p tcp-d $source_node_list-j REJECT

Sudo / sbin/iptables-v-An OUTPUT-p tcp-d $source_node_list-j REJECT

This denies any incoming or outgoing TCP traffic from the target node to the source node, thus separating the two clusters.

Separate the source and target clusters by firewall rules, and then restart

Separation means that the two target nodes are now separate from the other nodes. Because they think they belong to a five-node cluster and cannot communicate with most of the nodes in the cluster, they cannot conduct a leader election.

At this point, we also restart the Zookeeper of each node in the target cluster, including the dormant node that does not belong to the federated cluster. This way the Zookeeper process will use the new configuration provided in step 2 and will force leader elections in the target cluster so that each cluster will have its own leader.

From the perspective of Kafka, the target cluster is unavailable from the moment the network partition occurs and is not available until after the leader election. For Kafka, this is the only time period in which Zookeeper is not available throughout the process. From now on, we have two different Zookeeper clusters.

Now all we have to do is clean up. The source cluster still thinks it has two additional nodes, and we need to clean up some firewall rules.

Next, we restart the source cluster so that the zoo.cfg configuration that contains only the original source cluster nodes takes effect. We can now safely delete firewall rules because clusters no longer need to communicate with each other. The following command is used to delete the iptables rule:

$source_node_list = 192.168.1.1192.168.1.2192.168.1.3

Sudo / sbin/iptables-v-D INPUT-p tcp-d $source_node_list-j REJECT

Sudo / sbin/iptables-v-D OUTPUT-p tcp-d $source_node_list-j REJECT

Build confidence in distributed stress testing

The main method we use to test the correctness of the migration process is distributed stress testing. During the migration, we used scripts to run dozens of Kafka producer and consumer instances on multiple machines. When the traffic generation is complete, all consumed data payloads are aggregated on a single host to detect whether data loss has occurred.

Distributed stress testing works by creating a set of Kafka containers for Docker producers and consumers and running them in parallel on multiple hosts. All generated messages contain a sequence number that can be used to detect whether a message has been lost.

Temporary cluster

In order to prove the correctness of the migration, we need to build some clusters dedicated to testing. Instead of manually creating Kafka clusters and then shutting them down after testing, we have built a tool that automatically generates and shuts down clusters on our infrastructure so that we can script the entire testing process.

This tool connects to AWS EC2 API and activates multiple hosts with specific EC2 instance tags, allowing our puppet code to configure hosts and install Kafka (through External Node Classifiers

This temporary cluster script was later used to create a temporary Elasticsearch cluster for integration testing, which proved to be a very useful tool.

Zk-smoketest

We found that phunt's Zookeeper smoketest script Zookeeper cluster status. At each stage of the migration, we run smoketest in the background to ensure that the Zookeeper cluster behaves as expected.

Zkcopy

Our first plan for migration involves shutting down Kafka, replicating a subset of Zookeeper data to a new cluster, and restarting Kafka using an updated Zookeeper connection. A more refined version of the migration process-- what we call "block & copy"-- is used to migrate Zookeeper clients to a cluster with data, because the "mitosis" process requires a blank target Zookeeper cluster. The tool for copying a subset of Zookeeper data is zkcopy, which replicates a subtree of a Zookeeper cluster to another cluster.

We have also added transaction support to allow us to manage Zookeeper operations in bulk and minimize the network overhead of creating transactions for each znode. This increases the speed of using zkcopy by about 10 times.

Another core feature that accelerates the migration process is "mtime" support, which allows us to skip replicating nodes that are earlier than a given modification time. As a result, we avoided most of the work required to keep the Zookeeper cluster synchronized with the second "catch-up" replication. The downtime of Zookeeper has been reduced from 25 minutes to less than 2 minutes.

Experience and lessons

Zookeeper clusters are lightweight, and if possible, try not to share them between different services, as they can cause performance problems for Zookeeper that are difficult to debug and usually require downtime to fix.

We can get Kafka to use the new Zookeeper cluster without Kafka downtime, but this is certainly not a small thing.

It would be much easier to allow Kafka downtime during Zookeeper migration.

Welcome to learn from Java and big data to join the java architecture exchange: 855835163

Add group link: https://jq.qq.com/?_wv=1027&k=5dPqXGI

Free architecture materials are also available in the group: Java engineering, high-performance and distributed, high-performance, simple and simple. High architecture. Free live lectures on performance tuning, Spring,MyBatis,Netty source code analysis and big data and other advanced practical information can come in and learn together.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report