In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly introduces "how to use Kafka". In daily operation, I believe many people have doubts about how to use Kafka. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to use Kafka". Next, please follow the editor to study!
I. Kafka application
When the Kafka cluster traffic reaches trillions of records / day or trillions of records / day or more, what capabilities do we need to ensure the cluster's high availability, high reliability, high performance, high throughput and safe operation?
The summary here is mainly aimed at Kafka2.1.1 version, including cluster version upgrade, data migration, traffic restriction, monitoring alarm, load balancing, cluster expansion / reduction, resource isolation, cluster disaster recovery, cluster security, performance optimization, platform, open source version defects, community dynamics and so on. This article mainly introduces the core context and does not explain it in too much detail. Let's first take a look at some of the core application scenarios of Kafka as a data hub.
The following figure shows some of the mainstream data processing processes, with Kafka acting as a data hub.
Next, let's take a look at the overall architecture of our Kafka platform.
1.1 version upgrade 1.1.1 how to perform version rolling upgrade and fallback for open source version
Official website address: http://kafka.apache.org
1.1.1.2 how to upgrade and fallback from source code modification
Because in the upgrade process, there is bound to be the alternation of new and old code logic. Some of the nodes in the cluster are open source versions, while the other nodes are modified versions. Therefore, you need to consider how to mix the old and new code during the upgrade process, how to be compatible, and how to fall back in the event of a failure.
1.2 data migration
Due to the architectural characteristics of the Kafka cluster, this will inevitably lead to the imbalance of traffic load in the cluster, so we need to do some data migration to achieve traffic balance between different nodes in the cluster. The open source version of Kafka provides a scripting tool "bin/kafka-reassign-partitions.sh" for data migration, which you can use if you don't implement automatic load balancing.
The migration plan generated by the script provided by the open source version is completely manual intervention. When the size of the cluster is very large, the migration efficiency becomes very inefficient and is generally calculated on a daily basis. Of course, we can implement a set of automated balancing programs. When load balancing is automated, we basically use the internally provided API to help us generate migration plans and execute migration tasks. It should be noted that there are two migration plans: a specified data directory and an unspecified data directory. ACL security authentication is required for the specified data directory.
Official website address: http://kafka.apache.org
1.2.1 data migration between broker
Do not specify a data directory
/ the migration plan for which no migration directory is specified {"version": 1, "partitions": [{"topic": "yyj4", "partition": 0, "replicas": [1000003jm 1000004]}, {"topic": "yyj4", "partition": 1, "replicas": [1000003T 1000004]}, {"topic": "yyj4", "partition": 2, "replicas": [1000003M 1000004]}]}}
Specify data directory
/ / specify the migration plan of the migration directory {"version": 1, "partitions": [{"topic": "yyj1", "partition": 0, "replicas": [1000006jue 1000005], "log_dirs": ["/ data1/bigdata/mydata1", "/ data1/bigdata/mydata3"]}, {"topic": "yyj1", "partition": 1, "replicas": [1000006c001000005], "log_dirs": ["/ data1/bigdata/mydata1" "/ data1/bigdata/mydata3"]}, {"topic": "yyj1", "partition": 2, "replicas": [1000006 video 1000005], "log_dirs": ["/ data1/bigdata/mydata1", "/ data1/bigdata/mydata3"]}} 1.2.2 data migration between disks within broker
Servers in production environment generally mount multiple hard disks, such as 4 / 12, etc.; then it may appear within the Kafka cluster, and the traffic among the broker is relatively balanced, but within the broker, the traffic among the disks is uneven, resulting in some disk overload, which affects the performance and stability of the cluster, and does not make good use of hardware resources. In this case, we need to load balance the traffic of multiple disks within the broker, so that the traffic is more evenly distributed to each disk.
1.2.3 concurrent data migration
The replica migration tool "bin/kafka-reassign-partitions.sh" provided by the current open source version of Kafka (version 2.1.1) can only implement serial migration tasks in the same cluster. For the situation that multiple resource groups have been physically isolated in the cluster, because the resource groups will not influence each other, but can not submit the migration task in parallel, the migration efficiency is a little low, this deficiency can not be solved until version 2.6.0. If you need to implement concurrent data migration, you can choose to upgrade the Kafka version or modify the Kafka source code.
1.2.4 terminating data migration
The replica migration tool "bin/kafka-reassign-partitions.sh" provided by the current open source version of Kafka (version 2.1.1) cannot terminate the migration after starting the migration task. When the migration task has an impact on the stability or performance of the cluster, there is nothing you can do but to wait for the migration task to be completed (success or failure). This deficiency will not be resolved until version 2.6.0. If you need to terminate data migration, you can choose to upgrade the Kafka version or modify the Kafka source code.
1.3 flow restrictions 1.3.1 production and consumption flow restrictions
There are often some sudden and unpredictable abnormal production or consumption traffic which will exert great pressure on the IO and other resources of the cluster, and eventually affect the stability and performance of the whole cluster. Then we can limit the flow of users' production, consumption and data synchronization between replicas. The purpose of this flow-limiting mechanism is not to limit users, but to avoid sudden traffic affecting the stability and performance of the cluster, so as to provide users with better service.
As shown in the following figure, the inbound traffic of nodes increases from around 140MB/s to 250MB/s, while the outbound traffic increases from around 400MB/s to 800MB/s. If there is no current limiting mechanism, then multiple nodes of the cluster will be hung up by these abnormal traffic, and even cause cluster avalanches.
Photo production / consumption traffic restrictions official website address: click the link
For the traffic restrictions of producers and consumers, the official website provides the following combination of dimensions to limit (of course, there are some defects in the following current limit mechanism, which will be mentioned later in "functional defects in the open source version of Kafka"):
/ config/users//clients// / current restriction according to the combination of user and client ID / config/users//clients//config/users/// according to user flow restriction is our most commonly used method / config/users//clients//config/users//clients//config/users//config/clients//config/clients/
When starting the broker service of Kafka, you need to enable the configuration of JMX parameters to facilitate the collection of JMX metrics of Kafka through other applications for service monitoring. When a user needs to adjust the current limit threshold, an intelligent assessment is made based on the traffic that a single broker can withstand, and no manual intervention is required to determine whether it can be adjusted. For user traffic limit, the main indicators that need to be referred to include the following:
(1) consumption traffic index: ObjectName:kafka.server:type=Fetch,user=acl authenticated user name attribute: byte-rate (user's outbound traffic in current broker), throttle-time (user's outbound traffic in current broker is restricted time) (2) production traffic indicator: ObjectName:kafka.server:type=Produce,user=acl certified user name attribute: byte-rate (user's inbound traffic in current broker), throttle-time (user's inbound traffic in current broker is restricted)
1.3.2 follower synchronous leader/ data migration traffic limit
Replica migration / data synchronization traffic restrictions official website address: link
The parameters involved are as follows:
/ / the current limit configuration of replica synchronization involves the following four parameters leader.replication.throttled.ratefollower.replication.throttled.rateleader.replication.throttled.replicasfollower.replication.throttled.replicas
The auxiliary indicators are as follows:
(1) replica synchronous outbound traffic metrics: ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec (2) replica synchronous inbound traffic metrics: ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec
1.4 Monitoring alarm
There are some open source tools available for Kafka monitoring, such as the following:
Kafka Manager
Kafka Eagle
Kafka Monitor
KafkaOffsetMonitor
We have embedded Kafka Manager as a tool for us to view some basic metrics, but these open source tools are not well integrated into our own business systems or platforms. Therefore, we need to implement a system with finer granularity, smarter monitoring and more accurate alarm. The monitoring coverage should include basic hardware, operating system (occasional hang residence of system processes in the operating system, resulting in broker death, unable to provide services normally), broker services of Kafka, Kafka client applications, zookeeper clusters, upstream and downstream full-link monitoring.
1.4.1 hardware monitoring
Network Monitoring:
The core indicators include network inbound traffic, network outbound traffic, network packet loss, network retransmission, number of TCP connections in TIME.WAIT, switch, computer room bandwidth, DNS server monitoring (if the DNS server is abnormal, there may be a traffic black hole, causing large area business failure), and so on.
Disk Monitoring:
The core metrics include monitoring disk write, disk read (if there is no delay or only a small delay, there is generally no disk read operation), disk ioutil, disk iowait (if this indicator is too high, the disk load is heavy), disk storage space, disk bad disk, disk bad block / bad channel (bad path or bad block will cause broker to be half-dead, consumers will be stuck due to crc check, etc.).
CPU Monitoring:
Monitoring CPU idle rate / load, motherboard failure, etc., usually the low utilization rate of CPU is not the bottleneck of Kafka.
Memory / swap monitoring:
Memory utilization, memory failure. In general, except for the heap memory allocated when starting Kafka's broker, almost all the other memory on the server is used for PageCache.
Cache hit rate monitoring:
Whether to read the disk or not has a great impact on the performance of Kafka, so we need to monitor the PageCache cache hit rate of Linux. If the cache hit rate is high, it means that consumers basically hit the cache.
For more information, please read the article: "Application of Linux Page Cache tuning in Kafka".
Syslog:
We need to monitor and alarm the error log of the operating system and find some hardware faults in time.
1.4.2 broker Service Monitoring
The monitoring of broker service is mainly by specifying the JMX port when the broker service is started, and then by implementing a set of index collection procedures to collect JMX indicators. (server indicator official website address)
* * broker-level monitoring: * * broker processes, broker inbound traffic byte size / record number, broker outbound traffic byte size / record number, replica synchronous inbound traffic, replica synchronization outbound traffic, inter-broker traffic deviation, broker connections, broker request queues, broker network idle rate, broker production delay, broker consumption delay, broker production requests, broker consumption requests, leader distributed on broker, copies distributed on broker, disk traffic on broker, Broker GC et al.
* * topic-level monitoring: * * topic inbound traffic byte size / record number, topic outbound traffic byte size / record number, no traffic topic, topic traffic sudden change (sudden increase / sudden drop), topic consumption delay.
* * partition-level monitoring: * * inbound traffic byte size / record number, partition outbound traffic byte size / record number, missing copy of topic partition, delay record of partition consumption, partition leader switching, partition data skew (when producing messages, if the key of the message is specified, it is easy to cause data skew, which seriously affects the service performance of Kafka), partition storage size (can govern the topic that is too large in a single partition).
* * user-level monitoring: * * user outbound / inbound traffic byte size, user outbound / inbound traffic limit time, user traffic sudden change (sudden increase / sudden drop).
* * broker service log monitoring: * * monitor and alarm the error log printed on the server side to detect service anomalies in time.
1.4.3. Client monitoring
Client monitoring is mainly to implement a set of index reporting program, which needs to be implemented.
Org.apache.kafka.common.metrics.MetricsReporter interface. Then add the configuration item metric.reporters to the configuration of the producer or consumer, as follows:
Properties props = new Properties (); props.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ""); props.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName ()); props.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName ()); / / ClientMetricsReporter class implements org.apache.kafka.common.metrics.MetricsReporter interface props.put (ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName ());
The official website address of client metrics:
Http://kafka.apache.org/21/documentation.html#selector_monitoring
Http://kafka.apache.org/21/documentation.html#common_node_monitoring
Http://kafka.apache.org/21/documentation.html#producer_monitoring
Http://kafka.apache.org/21/documentation.html#producer_sender_monitoring
Http://kafka.apache.org/21/documentation.html#consumer_monitoring
Http://kafka.apache.org/21/documentation.html#consumer_fetch_monitoring
The client monitoring process architecture is shown in the following figure:
1.4.3.1 producer client monitoring
* * Dimension: * * user name, client ID, client IP, topic name, Cluster name, brokerIP
* * Metrics: * * number of connections, IO waiting time, production flow, number of production records, number of requests, request delay, delivery error / retry times, etc.
1.4.3.2 Consumer client Monitoring
* * Dimension: * * user name, client ID, client IP, topic name, cluster name, consumer group, brokerIP, topic partition
* * Metrics: * * number of connections, io waiting time, consumption traffic, consumption records, consumption delay, topic consumption delay records, etc.
1.4.4 Zookeeper Monitoring
Zookeeper process monitoring
Leader handover Monitoring of Zookeeper
Error log monitoring of Zookeeper service
1.4.5 full link monitoring
When the data link is very long (for example: business application-> buried point SDk- > data collection-> Kafka- > real-time computing-> business application), we usually need multiple teams to communicate and troubleshoot the problem repeatedly to find out which part of the problem occurs, so it is inefficient to troubleshoot the problem. In this case, we need to comb the monitoring of the whole link together with the upstream and downstream. When there is a problem, the first time to locate which link the problem occurs, shorten the problem location and fault recovery time.
1.5 Resource isolation 1.5.1 physical isolation of different business resources in the same cluster
We physically isolate the resource groups of different businesses in all clusters to avoid the interaction between businesses. Here, we assume that the cluster has four broker nodes (Broker1/Broker2/Broker3/Broker4) and two services (business A / business B), each of which has topic partition distribution as shown in the figure below. The two business topic are scattered on each broker of the cluster, and there is also overlap at the disk level.
Just imagine, if one of our business anomalies, such as a sudden increase in traffic, causes the broker node to be abnormal or hung up. At this time, another business will also be affected, which will greatly affect the availability of our services, cause failures, and expand the scope of failure.
In view of these pain points, we can isolate the physical resources of the businesses in the cluster, enjoy the exclusive resources of each business, and divide the resource groups (here, each broker is divided into two resource groups: Group1 and Group2). As shown in the figure below, the topic of different businesses is distributed in their own resource groups. When one business is abnormal, it will not affect the other business, which can effectively reduce the scope of our failures and improve service availability.
1.6 Cluster classification
According to the business characteristics, we split the cluster into log cluster, monitoring cluster, billing cluster, search cluster, offline cluster, online cluster and so on. Different scenarios of business are placed in different clusters to avoid the interaction of different businesses.
1.7 capacity expansion / reduction 1.7.1 topic expansion Partition
As the amount of topic data increases, the number of partitions specified by the topic we originally created may not be able to meet the traffic requirements, so we need to expand the partitions of topic. There are several points to consider when expanding the partition:
It is necessary to ensure that the leader and follower polling of topic partitions are distributed on all broker in the resource group, so that the traffic distribution is more balanced, and different replicas of the same partition need to be distributed across racks to improve disaster recovery.
When there is a remainder between the number of leader partitions in topic divided by the number of nodes in the resource group, the remainder partition leader needs to give priority to the broker with lower traffic.
1.7.2 broker launch
With the increase of business and data volume, our cluster also needs to expand the capacity of broker nodes. With regard to capacity expansion, we need to achieve the following points:
Intelligent evaluation of capacity expansion: according to the load of the cluster, the evaluation of whether capacity expansion is needed is programmed and intelligent.
Intelligent capacity expansion: when the evaluation needs capacity expansion, the expansion process and traffic balance platform.
1.7.3 broker offline
In some scenarios, we need to offline our broker, which mainly includes the following scenarios:
Some aging servers need to be offline to realize the offline platform of the node.
Server failure, broker failure cannot be recovered. We need to offline the failed server to realize the offline platform of the node.
The server with better configuration replaces the existing broker node and makes the offline node platform.
1.8 load balancing
Why do we need load balancing? First of all, let's take a look at the first figure. The following figure shows the traffic distribution of a resource group in our cluster after it has just been expanded. Traffic cannot be automatically allocated to our newly expanded nodes. At this time, we need to manually trigger data migration and migrate some copies to the new node in order to achieve traffic balance.
Next, let's look at the second picture. In this picture, we can see that the flow distribution is very uneven, and the deviation between the minimum and maximum flow is more than several times. This is related to the architectural characteristics of Kafka. When the size and amount of data of the cluster reach a certain amount, problems will inevitably occur. In this case, we also need to do load balancing.
Let's take a look at the third picture. Here, we can see that there is only a sudden increase in traffic in some nodes, that is, the topic partition is not scattered enough within the cluster, which is caused by the centralized distribution to a few broker. In this case, we also need to expand the capacity, partition and balance.
Our ideal traffic distribution should be shown in the following figure, and the traffic deviation between nodes is very small. In this case, it can not only enhance the ability of the cluster to withstand the abnormal surge of traffic, but also improve the overall resource utilization and service stability of the cluster. Reduce costs.
Load balancer needs to achieve the following effects:
1) generate replica migration plan and execute migration tasks platform, automated and intelligent
2) after equalization, the traffic between broker is relatively uniform, and a single topic partition is evenly distributed on all broker nodes.
3) after equalization, the traffic among multiple disks in broker is more balanced.
To achieve this effect, we need to develop a set of our own load balancing tools, such as the secondary development of the open source cruise control. The core of this tool is to generate the strategy of the migration plan, which directly affects the effect of the final cluster load balancing. Reference content:
1. LinkedIn/cruise-control
2. Introduction to Kafka Cruise Control
3. Cloudera Cruise Control REST API Reference
The cruise control architecture diagram is as follows:
When generating a migration plan, we need to consider the following:
1) Core metrics are selected as the basis for generating migration plans, such as outbound traffic, inbound traffic, rack, single topic partition dispersion, etc.
2) optimize the indicator samples used to generate the migration plan, such as filtering abnormal samples such as sudden increase / drop / zero drop in traffic.
3) the samples to be used in the migration plan of each resource group are all internal samples of the resource group, and do not involve other resource groups and have no crossover.
4) to control the excessive topic of single partition, so that the distribution of topic partition is more scattered, the traffic is not concentrated in part of the broker, and the amount of data in a single partition of topic is smaller, which can reduce the amount of data migrated and improve the speed of migration.
5) topic that has been evenly distributed in the resource group should be added to the migration blacklist without migration, which can reduce the amount of data migrated and increase the speed of migration.
6) do topic governance to eliminate the interference of long-term no-traffic topic on balance
7) when building a new topic or expanding the capacity of a topic partition, all partitions should be polled on all broker nodes. After polling, the remaining partitions should give priority to the broker with lower traffic.
8) when load balancer is enabled after expanding the capacity of broker nodes, the same broker should be allocated the same large traffic (large traffic rather than large storage space, which can be considered as throughput per second) topic multiple partitions leader, and migrate part to the new broker node.
9) when submitting the migration task, the deviation of the partition data size in the same batch migration plan should be as small as possible, so as to avoid waiting for the migration of the large partition for a long time after the completion of the migration task, causing the task to tilt.
1.9 Security Certification
Is it possible for everyone in our cluster to access at will? Of course not, for the security of the cluster, we need to carry out permission authentication to block illegal operations. It mainly includes the following aspects that need to be certified:
(1) producer authority authentication
(2) Consumer authority authentication
(3) specify data directory migration security authentication
Official website address: http://kafka.apache.org
1.10 Cluster disaster recovery
Cross-rack disaster recovery:
Official website address: http://kafka.apache.org
* * disaster recovery across clusters / data centers: * * if there are business scenarios such as working in different places, you can refer to Kafka2.7 version of MirrorMaker 2.0.
GitHub address: https://github.com
Exact KIP address: https://cwiki.apache.org
* * Kafka metadata recovery on ZooKeeper cluster: * * We will back up the permission information data on ZooKeeper on a regular basis, and use it for recovery when the cluster metadata is abnormal.
1.11 Parameter / configuration optimization
* * broker service parameter optimization: * * here I only list some of the core parameters that affect performance.
It is recommended that num.network.threads# create Processor to process network request threads. It is recommended to set it to broker when the number of CPU cores is * 2. This value is too low. Often, the network idle is too low and the copy is missing. Num.io.threads# creates KafkaRequestHandler to handle specific request threads. It is recommended to set it to the number of broker disks * 2. It is recommended to set it to the number of CPU cores / 4. Appropriate improvement can improve the utilization of CPU and the parallelism of follower synchronizing leader data. Compression.type# recommends using lz4 compression type, which can improve CPU utilization and reduce the amount of data transmitted over the network. If queued.max.requests# is a production environment, it is recommended to configure at least 500. The default is 500. Log.flush.scheduler.interval.mslog.flush.interval.mslog.flush.interval.messages# indicates the policy of refreshing log data to disk. The default configuration should be maintained, and the brushing policy should be left to the operating system, which decides when to flush the data. # if this parameter is set, it may have a great impact on throughput. Auto.leader.rebalance.enable# indicates whether to enable leader automatic load balancing. Default is true. We should set this parameter to false because automatic load balancing is uncontrollable and may affect the performance and stability of the cluster.
* * production optimization: * * here I only list some of the core parameters that affect performance.
How long does the linger.ms# client production message wait before it is sent to the server? unit: millisecond. This parameter can be used in conjunction with the batch.size parameter, but if the client is at risk of data loss on the downside, the message batch size sent by the batch.size# client to the server is used in conjunction with the linger.ms parameter. Appropriate enlargement can improve the throughput, but if the client has the risk of data loss on the downmachine, compression.type# recommends using the lz4 compression type with high compression ratio and throughput. Since Kafka does not have high requirements for CPU, you can make full use of CPU resources to improve network throughput through compression. The buffer size of buffer.memory# client can be appropriately increased if topic is large and memory is sufficient. The default is only 33554432 (32MB) retries# retries after production failure. Default is 0, which can be increased appropriately. When the retry exceeds a certain number of times, if the business requires high data accuracy, fault-tolerant processing is recommended. After the failure of retry.backoff.ms# production, the retry interval. Default 100ms. It is recommended that the setting is not too large or too small.
In addition to optimizing some core parameters, we also need to consider, for example, the number of partitions in topic and the retention time of topic. If the number of partitions is too small and the retention time is too long, but the amount of data written may be too large, the following problems may occur:
1) the topic partition is concentrated on several broker nodes, resulting in an imbalance of traffic replicas
2) some disks inside the broker node are overloaded, and the storage is burst.
1.11.1 consumption optimization
The biggest problem with consumption, and the problem that often occurs is that consumption delays and pulls historical data. When pulling a large amount of historical data, there will be a large number of disk read operations, which will pollute the pagecache, which will increase the load of the disk and affect the performance and stability of the cluster.
How can you reduce or avoid a large amount of consumption delay?
1) when the amount of topic data is very large, it is recommended that a partition start a thread to consume
2) add monitoring alarm to topic consumption delay and find and deal with it in time.
3) when topic data can be discarded, there is a large delay, such as a single partition delay record of more than 10 million or even hundreds of millions, then you can reset the consumption point of topic for emergency processing. [this solution is usually used only in extreme scenarios]
4) avoid resetting the partition offset of topic to an early location, which may result in pulling a large amount of historical data
1.11.2 Parameter optimization of Linux server
We need to optimize the file handle, pagecache and other parameters of Linux. Please refer to "Application of Linux Page Cache tuning in Kafka".
1.12. Hardware optimization
Disk optimization
If conditions permit, SSD solid state disk can be used to replace HDD mechanical hard disk to solve the problem of low IO performance of mechanical disk; if there is no SSD solid state disk, hard RAID can be done on multiple hard disks on the server (usually using RAID10), so that the IO load of broker nodes is more balanced. If it is a HDD mechanical hard disk, a broker can mount multiple hard drives, such as 12 * 4TB.
Memory
Because Kafka is a high-frequency read-write service, and the read and write requests of Linux basically go to Page Cache, a larger memory of a single node will significantly improve the performance. Generally choose 256GB or higher.
The network
Increase network bandwidth: when conditions permit, the larger the network bandwidth, the better. Because in this way, the network bandwidth will not become a performance bottleneck, and at least a 10-gigabit network (10Gb, full-duplex network card) can have relatively high throughput. If it is a single channel, the upper limit of the sum of network outbound traffic and inbound traffic is 1.25GB Universe. If it is a duplex dual channel, the theoretical value of network inbound and outbound traffic can reach 1.25GB/s.
Network isolation marking: because a data center may have both offline clusters (such as HBase, Spark, Hadoop, etc.) and real-time clusters (such as Kafka). Then the real-time cluster and the offline cluster will compete for network bandwidth when the servers are mounted under the same switch, and the offline cluster may affect the real-time cluster. So we need to isolate at the switch level so that offline machines and real-time clusters are not mounted to the same switch. Even if it is mounted under the same switch, we will mark the network traffic priority (gold, silver, copper, iron) to give priority to real-time traffic when the network bandwidth is tight.
CPU
The bottleneck of Kafka is not CPU. Generally speaking, a 32-core CPU on a single node is sufficient.
1.13. Platform
Now the problem is, we mentioned a lot of monitoring, optimization and other means; do we administrators or business users need to log in to the cluster server for all operations in the cluster? The answer is no, of course, and we need rich platform features to support it. On the one hand, it is to improve the efficiency of our operation, on the other hand, it is also to improve the stability of the cluster and reduce the possibility of errors.
Configuration management
Black screen operation. Every time you modify the server.properties configuration file of broker, there is no change record that can be traced back. Sometimes there may be some failures caused by someone modifying the cluster configuration, but the relevant records cannot be found. If we put configuration management on the platform, there will be traces of every change and reduce the risk of change errors.
Rolling restart
When we need to make online changes, sometimes we need to roll restart the cluster to multiple nodes, if we go to the command line, the efficiency will become very low, and it will require human intervention, a waste of manpower. At this time, we need to platform this repetitive work and improve our operational efficiency.
Cluster management
Cluster management mainly implements a series of operations on the command line on the platform, so that users and administrators no longer need to operate Kafka clusters on a black screen. This mainly has the following advantages:
Improve operational efficiency
The probability of operation error is smaller, and the cluster is more secure.
All operations are traceable and traceable
Cluster management mainly includes: broker management, topic management, production / consumption rights management, user management, etc.
1.13.1 mock function
Provide the function of production sample data and consumption sampling for users' topic on the platform. Users can test whether topic can be used and whether their permissions are normal without writing their own code.
Provide production / consumption permission verification function for users' topic on the platform, so that users can determine whether their account has read and write access to a certain topic.
1.13.2 Rights Management
Make user read / write rights management and other related operations on the platform.
1.13.3 capacity expansion / reduction
Put the broker node up and down to the platform, and all the online and offline nodes no longer need to operate the command line.
1.13.4 Cluster governance
1) Governance of no-flow topic to clean up no-flow topic in the cluster to reduce the pressure on the cluster caused by too much useless metadata
2) topic partition data size governance, sort out the topic with too much data in topic partition (for example, the amount of data in a single partition exceeds 100GB/ days) to see if it needs to be expanded to prevent data from being concentrated on some nodes in the cluster
3) topic partition data skew governance to prevent the client from specifying the key of the message when producing the message, but the key is too centralized and the message is only centrally distributed in some partitions, resulting in data skew
4) topic partition decentralized governance, so that topic partition is distributed on as many broker of the cluster as possible, so as to avoid the risk that traffic is only concentrated on a small number of nodes due to the sudden increase of topic traffic, and that a broker exception has a great impact on topic.
5) topic zone consumption delay governance; generally, there are two situations when there is more delayed consumption. One is that the performance of the cluster is declining, and the other is that the consumption concurrency of the business side is not enough. If the concurrency of consumers is not enough, they should increase consumption concurrency with business contacts.
1.13.5 Monitoring alarm
1) make all the index collection platform configurable, provide a unified index collection, index display and alarm platform, and realize integrated monitoring.
2) correlate upstream and downstream services to make full-link monitoring
3) users can configure topic or monitoring alarms such as regional traffic delay or sudden change
1.13.6 large business screen
Main metrics of business large screen: number of clusters, number of nodes, daily inbound traffic, daily inbound traffic record, sunrise traffic size, sunrise traffic record, inbound traffic per second, inbound traffic per second, outbound traffic per second, outbound traffic record per second, number of users, production delay, consumption delay, data reliability, service availability, data storage size, number of resource groups, number of topic, number of partitions, The number of copies, the number of consumption groups and other indicators.
1.13.7 Traffic limit
The user traffic is now achieved on the platform, where the intelligent current-limiting processing is carried out.
1.13.8 load balancing
Automatic load balancing function to achieve the platform, through the platform for scheduling and management.
1.13.9 Resource budget
When the cluster reaches a certain size and the traffic continues to grow, where does the cluster expansion machine come from? The resource budget of the business allows multiple businesses in the cluster to share the hardware cost of the entire cluster according to their own traffic in the cluster. Of course, independent clusters and isolated resource groups can be calculated separately.
1.14. Performance Evaluation 1.14.1 performance Evaluation of single broker
The purpose of our single broker performance evaluation includes the following aspects:
1) provide the basis for us to evaluate the application for resources.
2) Let us know more about the reading and writing ability of the cluster and where the bottleneck lies, and optimize for the bottleneck.
3) provide the basis for setting the current limiting threshold for us.
4) provide a basis for us to evaluate when we should expand our capacity.
1.14.2 topic partition performance evaluation
1) to provide a basis for evaluating how many partitions should be specified reasonably when we create a topic
2) to provide a basis for the evaluation of partition expansion of our topic.
1.14.3 performance evaluation of single disk
1) to understand the true read and write ability of disks, and to provide a basis for us to choose the disk type that is more suitable for Kafka.
2) provide the basis for setting the disk traffic alarm threshold.
1.14.4 find out the limitation of cluster size
1) We need to understand the upper limit of the size of a single cluster or the upper limit of metadata size, and explore the impact of relevant information on cluster performance and stability.
2) according to the situation, evaluate the reasonable range of cluster node size, predict the risk in time, split the super-large cluster and so on.
1.15 DNS+LVS network architecture
When our cluster nodes reach a certain size, such as hundreds of broker nodes in a single cluster, what if we specify the bootstrap.servers configuration for our production and consumption clients? Do you want to choose some of these broker configurations or all of them?
In fact, none of the above is appropriate. If we configure only a few IP, when we configure a few broker nodes offline, our application will not be able to connect to the Kafka cluster; if we configure all IP, it will be even more unrealistic, hundreds of IP, then what should we do?
* * solution: * * DNS+LVS network architecture is adopted. In the end, producers and consumers only need to configure domain names. It should be noted that when a new node joins the cluster, a mapping needs to be added; when a node goes offline, it needs to be kicked out of the mapping, otherwise if these machines are used elsewhere, if the port is the same as that of Kafka, some requests for the original cluster will be sent to the server that has been offline, causing major failures in the production environment.
II. Functional defects of the open source version
The main features of RTMP protocol are: multiplexing, subpacket and application layer protocol. These characteristics are described in detail below.
2.1 replica Migration
Incremental migration cannot be achieved. [we have implemented incremental migration based on the 2.1.1 source code modification]
Cannot implement concurrent migration; [the open source version did not implement concurrent migration until 2.6.0]
Unable to terminate the migration; [We have implemented the termination of replica migration based on the 2.1.1 source code] [the open source version does not suspend the migration until 2.6.0, unlike terminating the migration, metadata will not be rolled back]
When you specify a migration data directory, if you shorten the topic retention time during the migration, the topic retention time will not take effect for the migrating topic partition, and the expired data of the topic partition cannot be deleted. [open source version bug, which has not been fixed yet]
When the migration data directory is specified, the entire migration task cannot be completed and remains stuck when the migration plan is in the following scenarios. [open source version of bug, which has not been fixed yet]
During the migration process, if a broker node is restarted, all leader partitions on that broker node cannot be switched back, causing all node traffic to be transferred to other nodes, and the leader will not be switched back until all replicas have been migrated. [open source version bug, which has not been fixed yet].
In the native Kafka version, there is a situation where the following specified data directory scenarios cannot be migrated, and we do not decide to fix secondary bug: 1. For the same topic partition, if part of the target copy changes compared with the original copy that belongs to the broker, and part of the target copy changes from the original copy to the data directory that belongs to broker, then the target copy to which the broker changes can be migrated normally, and the target copy cannot be migrated normally if the data directory changes within broker. However, the old copy can still provide normal production and consumption services, and does not affect the submission of the next migration task. The next migration task only needs to change the broker list to which the replica list of this topic partition belongs, and the migration can still be completed normally, and the previously uncompleted target copy can be cleaned up. It is assumed that the initial replica distribution of topic yyj1 is as follows: {"version": 1, "partitions": [{"topic": "yyj", "partition": 0, "replicas": [1000003J 1000001], "log_dirs": ["/ kfk211data/data31", "/ kfk211data/data13"]} / / Migration scenario 1: {"version": 1, "partitions": [{"topic": "yyj", "partition": 0, "replicas": [1000003ct 1000002] "log_dirs": ["/ kfk211data/data32", "/ kfk211data/data23"]}} / / Migration scenario 2: {"version": 1, "partitions": [{"topic": "yyj", "partition": 0, "replicas": [1000002M 1000001], "log_dirs": ["/ kfk211data/data22", "/ kfk211data/data13"]}} for the distribution of topic yyj1 mentioned above At this point, if our migration plan is "Migration scenario 1" or "Migration scenario 2", then there will be a situation where the copy cannot be migrated. However, this does not affect the old copy to handle production and consumption requests, and we can normally submit other migration tasks. In order to clean up the old unmigrated copies, we only need to modify the migration plan once [the new target replica list is completely different from the current partition assigned replica list] and submit the migration again. Here, we still use the above example to modify the migration plan as follows: {"version": 1, "partitions": [{"topic": "yyj", "partition": 0, "replicas": [1000004shot 1000005], "log_dirs": ["/ kfk211data/data42", "/ kfk211data/data53"]}} so that we can complete the migration normally. 2.2 Traffic protocol
The current-limiting granularity is coarse, not flexible and accurate, not intelligent enough.
Current combination of current limiting dimensions
/ config/users//clients//config/users//clients//config/users//config/users//clients//config/users//clients//config/users//config/clients//config/clients/
There are problems
When multiple users on the same broker are producing and consuming a large number of products at the same time, in order for broker to work properly, the sum of all user traffic thresholds must not exceed the upper limit of broker throughput when making current restrictions. If the limit of broker is exceeded, broker is at risk of being hit and suspended. However, even if user traffic does not reach the upper limit of broker traffic, if all user traffic is concentrated on certain disks and exceeds the read and write load of the disk, all production and consumption requests will be blocked and broker may be suspended.
Solution
(1) modify the source code to limit the traffic limit of a single broker. As long as the traffic reaches the upper limit of broker, all users writing to this broker can be restricted, or users can be given priority treatment to let go of high-priority ones and limit low-priority ones.
(2) modify the source code to limit the traffic limit of a single disk on the broker (most of the time, the traffic is concentrated on several disks, so that the upper limit of broker traffic is not reached but the limit of read and write capacity of a single disk is exceeded). As long as the disk traffic reaches the upper limit, all users writing to this disk can be restricted. Or give priority to the users, let go of the high priority and limit the low priority.
(3) transform the source code to realize the current limit of topic dimension and the write-ban function of topic partition.
(4) transform the source code to achieve accurate current limit by the combination of user, broker, disk, topic and other dimensions
At this point, the study on "how to use Kafka" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.