Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does Kafka handle hundreds of billions of logs every day?

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How does Kafka deal with hundreds of billions of logs every day? I believe many inexperienced people are at a loss about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

The following mainly focuses on the following content sharing:

Message queue selection

The present situation of the commercialization of Kafka

Kafka Client framework

High availability of data

Load balancing

Authentication, Authorization and ACL Scheme

Quota mechanism

Data synchronization across IDC

Monitoring alarm

Online problems and Solutions

Message queue selection

At that time, the following dimensions were mainly considered:

Community activity

Client support

Throughput

Comparing several systems, I think Kafka is more in line with our requirements. Now that there is a new open source system Pulsar, I think we can give it a try.

The highlights of Kafka design are as follows:

Kafka performance and throughput are very high, through Sendfile and Pagecache to implement the Zero Copy mechanism, sequential read and write characteristics make it possible to use ordinary disks to achieve a large throughput, relatively cost-effective.

Kafka ensures the high availability of data through Replica and ISR mechanisms.

The Kafka cluster has two administrative roles:

Controller is mainly for cluster management.

Coordinator mainly does business-level management.

Both roles are played by a Broker in Kafka, so Failover is simple, and you only need to choose a Broker instead.

From this point of view, Kafka has a decentralized design idea, but Controller itself is also a bottleneck, which can be compared to Hadoop's Namenode.

CAP theory, I believe we all know, distributed system implementation is either CP, or AP.

The implementation of Kafka is relatively flexible, and different services can configure the Topic level to CP or AP according to their own business characteristics.

Support independent repeated consumption between businesses, and playback can be done.

This is the brief structure of Kafka, which is mainly divided into:

Production end

Broker end

Consumer end

There are three levels of logging:

The first level Topic

The second level Partition (each Partition is a degree of parallelism)

The third level, Replica (Replica represents the number of copies of Partition)

The present situation of the commercialization of Kafka

Currently, there are hundreds of billions of data in the cluster, more than 10 gigabytes of machines, the maximum peak of a single Topic is 600000 QPS, and the peak of the cluster is about 5 million QPS.

Our physical machine is configured with 24Core/10G Nic / 128g memory / 4T*12 HDD. It is worth mentioning that we have adopted the configuration of 10 Gigabit Nic and ordinary disk 4T*12, which can match disk throughput and network throughput.

Furthermore, considering the large amount of data we have, the SSD disk is not particularly large and the cost is relatively high.

The organizational structure of the disk we use JBOD,RAID10 is also a good solution (disk cost will double).

Our current Kafka version is 1.1.1. It is recommended that you deploy a version above 0.11. This version makes a lot of optimizations to the protocol and is compatible with subsequent 2.x versions.

This is our Kafka upstream and downstream related components, the production side is mainly a variety of Kafka Clients/ real-time services / Flume/Logstash.

The consumer end is divided into three parts: real-time, offline (ETL) and monitoring. There are mainstream frameworks such as Spark/Flink/Storm in real time. In the offline part, we have developed a unified landing framework Hamal based on Flink. After consuming data from Kafka, you can land on multiple downstream systems (HDFS, Hbase, Redis, etc.) to avoid repeated consumption.

There is also the need for monitoring. We type the logs related to ES/InfluxDB to Kafka, and then consume them and display them through Grafana, but now we have cut them to Prometheus.

Kafka Client framework

Why do you want to make this framework? Before that, many business units used naked API to implement the logic of Kafka Client.

But there will be a lot of problems, there are some exceptions will be Catch incomplete, we built this framework is to mask all the details, and then expose enough simple interface.

This can reduce the possibility of business errors. We need to ensure the availability in extreme cases, such as network or cluster exception. If the network or cluster is not available, the data will fall locally first, and then recover from the local disk to the Kafka when the recovery occurs.

We have implemented two frameworks:

LogProducer, which supports at least once.

LogConsumer supports both at least once and exactly once semantics, in which exactly once requires services to implement Rollback interfaces.

The general idea of the LogProducer framework is to send logs to Kafka through memory queues. When Kafka or network is not available, the local disk will be written, and a thread will check the availability of Kafka or network in real time. If restored, disk logs will be loaded and sent to Kafka.

We also support a shared memory strategy instead of memory, which is used to reduce the number of logs lost during the restart.

The framework implementation of LogConsumer decouples Consumer threads from Worker threads through Blocking Queue, because the reality is that the consumption logic is simple, but the processing logic is complex.

In this way, Consumer threads and Worker threads can be configured differently, and the reverse pressure mechanism can be implemented through Blocking Queue.

For example, if the Worker cannot handle it, the Blocking Queue will be full and the Consumer thread will stop consuming.

At the same time, we will provide an interface in the Worker thread interface for users to submit to global offsetmap.

As shown in the figure above, we provide three composite interfaces. If the business-side Rollback logic is implemented in business processing and Commit, it is exactly once semantics, and the default is at least once semantics.

High availability of data

As mentioned earlier, Kafka itself provides a mechanism for Replica+ISR to ensure the high availability of data, but we think this may not be enough, so we also support Rack Aware.

For example, in the case of Replica=3, make sure that three copies are on different physical Rack, so that we can tolerate problems with two physical racks at the same time while the data is still available. Our Rack Aware solution is implemented together with the load balancing solution, which will be discussed later.

It is worth noting that Kafka officially supports Rack Aware, which can be achieved by configuring the broker.rack parameter on the Broker side.

However, there is a limitation that the same number of Brokers must be assigned to each Rack, otherwise the Replica allocation will be skewed. In fact, the Rack of IDC is large, and the distribution of physical machines assigned may be very random.

A solution that can be used for reference is to adopt the concept of virtual Rack Group, such as maintaining three virtual Rack Group, adding the applied physical machines to the three Group, and ensuring that the number of physical machines allocated among the Rack Group is the same.

Of course, physical machines between Rack Group should not have the same physical Rack.

Load balancing

The load balancing feature of Kafka is only supported in the commercial version of Confluent. In essence, load balancing is a problem of uniform distribution of Replica.

At first, we want to solve the problem through classic consistency Hash, as shown in the following figure:

Then we find that the classic one-time Hash can not meet our needs. For example, to add a node node5, we can only share part of the load of the node node2 and cannot balance the load of the global node.

So we implement a scheme based on the one-time Hash algorithm of virtual nodes, as shown in the figure: the same color corresponds to the same physical machine, and all the virtual nodes on the Hash ring.

There are four physical nodes here, of which node4 is our new node. The load of the physical node can be distributed evenly enough through the virtual node, so when I add the node4 to the Hash ring, I share the load of all the physical machines.

The implementation steps of the algorithm are divided into two major steps:

① New hash circle: make a Hash of MD5 through vnode_str (such as hostname-v0), get the vnode_key of virtual node, then use ring dictionary to save the mapping from virtual node to physical node, and add vnode_key to the list of sorted_keys.

② allocates Replica in Hash ring: use (topic_name+partition_num+replica_num) as Key to get replica_key using the same MD5 Hash algorithm.

Then the Position of the replica_key in sorted_keys is found in dichotomy, and finally the Ring dictionary is used to map to the physical machine Node, so the Replica allocation is completed.

We solve three problems based on this algorithm:

Adding a physical node requires only a small amount of data migration.

Set weights for different configurations of physical machines to support the deployment of heterogeneous clusters.

To implement the Rack Aware of Replica, there will be Rack information on the physical node, and the assigned Rack information will be recorded when the physical node is assigned to the Replica.

If there is a repetition, the vnode_key will find the location of the Position + 1 to the next physical node, and we will make sure that the physical Rack of the three Replica must be different (if Replica=3).

Leader Balance: this is a fast and low-cost load Balance method. Because only Leader provides read and write in Kafka, the effect of load switching can be achieved through Leader switching. Because only Leader switching does not involve data synchronization, this cost is relatively small.

Disk Rebalance: this Feature needs to be supported after the Kafka1.1.0 version. Kafka provides some scripts and API to do Balance operations, and its essence is to generate Replica Plan and then do Reassign.

Authentication, Authorization and ACL Scheme

If it is a new cluster, the SASL-based SCRAM scheme is recommended, which is relatively simple to implement.

If the old cluster wants to implement the authentication and authorization mechanism midway, it will be difficult, and each service needs to be pushed to modify the configuration. At the same time, problems can easily occur in the switching process.

The following describes a whitelist mechanism we have implemented to solve the problem of the old cluster. First, add the old business to the whitelist, let the new business apply for Topics and Consumers resource permissions through the ticket process and add them to the whitelist, and regularly monitor illegal (no ticket walking) Topics,Consumers resources.

Deny all these resources at the same time, so that the access to read and write of Topics and Consumer will be tightened, and the original business will not be affected.

Quota mechanism

The main purpose of Quota is to solve the problem of resource preemption among multiple services. There are two types of Quota:

One is to limit network bandwidth.

One is to limit the request rate (limit CPU).

We have made three priority settings for services: high, medium, low priority, no restrictions on high priority, medium priority can tolerate lag, and low priority can be stopped in extreme cases. All services with a certain priority can be restricted in batches through tools to ensure the security of high priority services and clusters.

Data synchronization across IDC

Why do we do cross-IDC data synchronization in the first place? Before doing this synchronization, the business may not have an IDC concept of reading and writing data, so it is easy to have cross-IDC reading and writing, and multiple businesses may have duplicate Consume and Produce.

This results in a great waste of cross-IDC networks, coupled with the instability of cross-IDC networks, sometimes there are some exceptions, and the business may not be handled well.

In order to solve the above problems, we have implemented a cross-IDC data synchronization service. First of all, we agreed that the business can only read and write to this IDC, not cross-IDC. If you have cross-IDC data requirements, you need to apply to us to synchronize a copy through Mirrormaker.

This has two benefits:

First, it shields the impact of exceptions on the business.

Second, it saves the bandwidth between IDC (we can ensure that only one copy of this data is transmitted through the synchronization mechanism).

We also Pass the service based on Marathon/Mesos to improve the SLA of the service.

Monitoring alarm

Our monitoring and warning platform is as follows:

To make a chart display based on Jmx exporter+Promehteus+Grafana, deploying Jmx exporter,Prometheus on each Broker will Pull the data, and finally display it through Grafana.

Monitor the transient index based on Kafka Manager.

Monitor Consumer lag based on Burrow.

Based on Wonder to do alarm, this is a component of the internal implementation of 360. similar to Zabbix.

Online problems and Solutions

Disk failure: we use Smartctl to monitor, first of all, the status is Passed, and then we will determine that the attribute value of 197Current_Pending_Sector cannot be greater than 100. if it is greater than 100, this disk may have read and write performance problems.

Bootstrap.servers performance bottleneck: this parameter can be configured with multiple Broker, which provide Lookup services for Kafka Clients as the role of Proxy.

If the size of the cluster is very large and there is a lot of Clients, the Broker load of these Proxy roles will be very heavy. To solve this problem, we have configured the bootstrap.servers parameter VIP.

Each VIP can be bound to as many Brokers as possible, so that the Proxy can be dynamically expanded and reduced without the need for the client to modify the configuration.

Consumer restart does not consume: business feedback consumption stops and restart will not solve the problem. Later, the location found that it was earlier than the version of Bug before 0.11:

Https://issues.apache.org/jira/browse/KAFKA-5413

The reason is that the log cleaner thread hangs and causes the Compact to stop. The amount of _ _ consumer_offsets Topic is very large, and the broker reload time is very long, during which the service is stopped.

There are two solutions:

One is to upgrade to Kafka version 0.11 +.

The second is to migrate the Offset to the new Consumer Group to solve the problem (avoid the problematic Coordinator).

After reading the above, have you mastered how Kafka handles hundreds of billions of logs every day? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report