Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the performance tuning of Apache Pulsar in BIGO?

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

What is the performance tuning practice of Apache Pulsar in BIGO? I believe many inexperienced people don't know what to do about it. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Background

With the support of artificial intelligence technology, BIGO video-based products and services are widely popular and have users in more than 150 countries, including Bigo Live (live broadcast) and Likee (short video). Bigo Live has sprung up in more than 150 countries, Likee has more than 100 million users, and is popular in Generation Z.

With the rapid growth of business, the scale of data carried by the BIGO message queue platform has increased exponentially. Downstream online model training, online recommendation, real-time data analysis, real-time data warehouse and other services put forward higher requirements for the real-time and stability of messages.

BIGO message queuing platform uses open source Kafka. However, with the multiplication of business data and the increasing requirements of real-time message and system stability, the maintenance cost of multiple Kafka clusters is getting higher and higher, which is mainly reflected in:

Data storage is bound to message queue service, and cluster scaling / partition balancing requires a large number of copies of data, resulting in a decline in cluster performance.

When a partition copy is not in ISR state, if broker fails, it may cause loss or the partition cannot provide read and write services.

Manual intervention is required when Kafka broker disk failure / utilization is too high

Cluster uses KMM (Kafka Mirror Maker) synchronously across regions, so its performance and stability are difficult to meet expectations.

In the catch-up read scenario, PageCache pollution is easy to occur, resulting in a decline in read and write performance.

Although the topic partition of the Kafka is written sequentially, when there are hundreds of topic partition on the broker, it becomes random write from the disk point of view, and the disk read and write performance decreases with the increase of the number of topic partition, so the number of topic partition stored on the Kafka broker is limited.

With the growth of the scale of Kafka cluster, the operation and maintenance cost of Kakfa cluster increases rapidly, which requires a lot of manpower for daily operation and maintenance. In BIGO, it takes 0.5 person / day to expand a machine to Kafka cluster and to balance partitions, while it takes 1 person / day to scale down a machine.

In order to improve the real-time, stability and reliability of message queue, and reduce the cost of operation and maintenance, we reconsider the shortcomings of Kafka architecture design, and investigate whether we can solve these problems from the architecture design to meet the current business requirements.

Next Generation message flow platform: Pulsar

Apache Pulsar, a top-level project of the Apache Software Foundation, is the next generation cloud native distributed message flow platform that integrates messaging, storage, and lightweight functional computing. Pulsar was opened up by Yahoo and donated to the Apache Software Foundation for incubation in 2016 and became a top project of the Apache Software Foundation in 2018.

Pulsar adopts a hierarchical architecture of separation of computing and storage, supports multi-tenant, persistent storage, and multi-room cross-regional data replication, and has the characteristics of strong consistency, high throughput and low latency of high scalable streaming data storage.

The main features that attract us about Pulsar are as follows:

Linear expansion: seamless expansion to hundreds of nodes

High throughput: has been tested in Yahoo's production environment, supporting publish-subscribe (Pub-Sub) of millions of messages per second

Low latency: low latency (less than 5 ms) even with large volume of messages

Persistence mechanism: Plusar's persistence mechanism is built on Apache BookKeeper and provides read-write separation

Read-write separation: BookKeeper's read-write separation IO model makes great use of disk sequential write performance, is relatively friendly to mechanical hard disks, and the number of topic supported by a single bookie node is unlimited.

Apache Pulsar's architectural design solves the problems we encounter when using Kafka, and provides a lot of great features, such as multi-tenancy, message queuing and batch flow convergence consumption model, strong consistency, and so on.

In order to further deepen the understanding of Apache Pulsar and measure whether Pulsar can really meet the needs of large-scale messaging Pub-Sub in our production environment, we have carried out a series of stress tests since December 2019. Since we used a mechanical hard disk without SSD, we encountered a series of performance problems in the process of stress testing. Thank you very much for the help of StreamNative students, and thank Si Jie, Zhai Jia and Penghui for their patient guidance and discussion. After a series of performance tuning, we continue to improve the throughput and stability of Pulsar.

After 3-4 months of stress testing and tuning, we officially used Pulsar cluster in the production environment in April 2020. We adopt the mixed mode of bookie and broker on the same node to gradually replace the Kafka cluster in the production environment. Up to now, there are more than ten Pulsar clusters in the production environment, and the daily message processing volume is 10 billion, and the Kafka traffic is gradually expanding and migrating to the Pulsar cluster.

Problems encountered in stress testing / using Pulsar

When using / stress testing Pulsar, you may encounter the following problems:

The load of Pulsar broker nodes is uneven.

The hit rate of Cache on Pulsar broker is low, which causes a large number of read requests to enter bookie, and the read performance is poor.

Broker memory overflow (OOM) often occurs during stress testing.

The presence of direct memory OOM in Bookie caused the process to hang.

The load of Bookie nodes is uneven and often jitters.

When the Journal disk is HDD, although fsync is turned off, the bookie add entry 99th latency is still high and the write performance is poor.

When there are a large number of read requests in bookie, the write is reversed and the add entry latency rises.

The word "Lookup Timeout Exception" often appears in Pulsar client.

The high read and write latency of ZooKeeper makes the whole Pulsar cluster unstable.

Use reader API (eg. Pulsar flink connector) when consuming Pulsar topic, the consumption speed is slower (Pulsar version before 2.5.2).

Problems 4, 5, 6, and 7 are particularly serious when the Journal/Ledger disk is a mechanical hard drive (HDD). Intuitively, these problems are caused by the fact that the disk is not fast enough. If the read and write speed of the Journal/Ledger disk is fast enough, messages will not accumulate in the direct memory and a series of OOM will not occur.

Because in our message queue production system, the amount of data that needs to be stored is relatively large (TB ~ PB level), and both Journal disk and Ledger disk need high cost for SSD, is it possible to optimize some parameters / policies on Pulsar / BookKeeper so that HDD can also play a better performance?

In the process of stress testing and using Pulsar, we have encountered a series of performance problems, which are mainly divided into Pulsar Broker level and BookKeeper level. For this reason, this series of performance tuning articles are divided into two parts, which respectively introduce the solutions for BIGO to tune the performance of Pulsar Broker and Bookkeeper in the process of using Pulsar, so that Pulsar can achieve better performance in both SSD and HDD scenarios.

Due to the space, this performance tuning series is divided into two parts. The first part mainly introduces the performance tuning of Pulsar broker, and the second part mainly introduces the performance tuning in the process of combining BookKeeper and Pulsar.

Next, it mainly introduces the performance-related parts of Pulsar / BookKeeper, and puts forward some suggestions for performance tuning (these performance tuning schemes have been running stably in the BIGO production system and have achieved good benefits).

Environment deployment and monitoring environment deployment and monitoring

Because BookKeeper and Pulsar Broker rely heavily on ZooKeeper, in order to ensure the stability of Pulsar, it is necessary to ensure low latency of ZooKeeper Read/Write. In addition, BookKeeper is an IO-intensive task, and to avoid interference between IO, Journal/Ledger is placed on a separate disk. The summary is as follows:

The Bookie Journal/Ledger directory is placed on a separate disk

When the disk of the Journal/Ledger directory is HDD, do not put ZooKeeper dataDir/dataLogDir on the same disk as the Journal/Ledger directory

Both BookKeeper and Pulsar Broker rely on direct memory, and BookKeeper also relies on PageCache for data read and write acceleration, so a reasonable memory allocation strategy is also critical. The memory allocation strategy recommended by the sijie of the Pulsar community is as follows:

OS: 1 ~ 2 GB

JVM: 1/2

Heap: 1/3

Direct memory: 2/3

PageCache: 1/2

Assuming that the physical memory of the machine is a mixture of 128 GB bookie and broker, the memory allocation is as follows:

OS: 2GB

Broker: 31GB

Heap: 10GB

Direct memory: 21GB

Bookie: 32GB

Heap: 10GB

Direct memory: 22GB

PageCache: 63GB

Monitor: performance tuning, monitoring first

In order to find the performance bottleneck of the system more intuitively, we need to build a complete monitoring system for Pulsar/BookKeeper to ensure that relevant indicators are reported in every link, and when there are anomalies (including but not limited to performance problems), we can quickly locate the performance bottleneck through relevant monitoring indicators and formulate corresponding solutions.

Pulsar/BookKeeper provides Prometheus interfaces, and relevant statistical metrics can be directly obtained by Http and directly interfaced with Prometheus/Grafana. Interested students can directly follow the instructions of Pulsar Manager to install: https://github.com/streamnative/pulsar-manager.

The indicators that need to be focused are as follows:

Pulsar Broker

Jvm heap/gc

Bytes in per broker

Message in per broker

Loadbalance

Broker Cache hit rate

Bookie client quarantine ratio

Bookie client request queue

BookKeeper

Bookie request queue size

Bookie request queue wait time

Add entry 99th latency

Read entry 99th latency

Journal create log latency

Ledger write cache flush latency

Entry read throttle

ZooKeeper

Local/global ZooKeeper read/write request latency

There are some metrics that are not provided with the corresponding Grafana template in the above repo. You can add PromQL to configure it.

Performance tuning on Pulsar broker side

The performance tuning of Pulsar broker is mainly divided into the following aspects:

Load balancing

Load balancing between Broker

Load balancing between Bookie nodes

Current limit

Broker needs to do flow control to receive messages to prevent sudden flood peak traffic from causing broker direct memory OOM.

Broker needs to do flow control when sending messages to consumer/reader to prevent consumer/reader from frequently GC caused by sending too many messages at one time.

Improve Cache hit rate

Ensure low read and write latency of ZooKeeper

Close auto bundle split to ensure the stability of the system

Load balancing between Broker

Load balancing among Broker can improve the utilization of broker nodes, improve the hit rate of Broker Cache, and reduce the probability of broker OOM. This part is mainly related to the knowledge of Pulsar bundle rebalance.

The structure of Namespace Bundle is as follows, each namespace (namespace) consists of a certain number of bundle, all topic under this namespace are mapped to a unique bundle by hash, and then bundle is loaded / unloaded to the broker that provides the service by load/unload.

If there is no bundle on a broker or the number of bundle is less than other broker, then the traffic of this broker will be lower than that of other broker.

The existing / default bundle rebalance policy (OverloadShedder) is to count every other minute whether the maximum occupancy rates of CPU, Memory, Direct Memory, BindWith In and BindWith Out of all broker in the cluster exceed the threshold (default is 85%). If the threshold is exceeded, a certain amount of bundle with large inbound traffic is unloaded from the broker, and then it is up to leader to reload the unloaded bundle onto the least loaded broker.

The problem with this strategy is:

The default threshold is difficult to reach, and it is easy to cause most of the traffic in the cluster to be concentrated on several broker

The threshold adjustment standard is difficult to determine and is greatly affected by other factors, especially if other services are deployed on this node.

After the broker restarts, there is no traffic equalization to the broker for a long time, because other broker nodes do not reach the bundle unload threshold.

To this end, we have developed a mean-based load balancing strategy, which supports the weight configuration of CPU, Memory, Direct Memory, BindWith In and BindWith Out. For more information, please see PR-6772.

This policy is supported in Pulsar 2.6.0. It is disabled by default and can be enabled by modifying the following parameters in broker.conf:

LoadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder

We can accurately control the weight of different collection indicators through the following parameters:

# The broker resource usage threshold.# When the broker resource usage is greater than the pulsar cluster average resource usage,# the threshold shredder will be triggered to offload bundles from the broker.# It only takes effect in ThresholdSheddler strategy.loadBalancerBrokerThresholdShedderPercentage=10# When calculating new resource usage The history usage accounts for.# It only takes effect in ThresholdSheddler strategy.loadBalancerHistoryResourcePercentage=0.9# The BandWithIn usage weight when calculating new resource usage.# It only takes effect in ThresholdShedder strategy.loadBalancerBandwithInResourceWeight=1.0# The BandWithOut usage weight when calculating new resource usage.# It only takes effect in ThresholdShedder strategy.loadBalancerBandwithOutResourceWeight=1.0# The CPU usage weight when calculating new resource usage.# It only takes effect in ThresholdShedder strategy.loadBalancerCPUResourceWeight=1.0# The heap memory usage weight when calculating new resource usage.# It only takes effect in ThresholdShedder strategy.loadBalancerMemoryResourceWeight=1.0# The direct memory usage weight when calculating new Resource usage.# It only takes effect in ThresholdShedder strategy.loadBalancerDirectMemoryResourceWeight=1.0# Bundle unload minimum throughput threshold (MB) Avoiding bundle unload frequently.# It only takes effect in ThresholdShedder strategy.loadBalancerBundleUnloadMinThroughputThreshold=10 balancing the load between bookie nodes

Bookie node load monitoring is shown in the following figure, and we will find:

The load between Bookie nodes is not uniform, and the difference between the highest flow node and the lowest flow node may be hundreds of MB/s

In the case of high load, the load of some nodes may rise and fall periodically, with a period of 30 minutes.

The impact of these problems is: bookie load imbalance, resulting in a decline in BookKeeper cluster utilization, and prone to jitter.

The reason for this problem is that the granularity of bookie client's circuit breaker policy for bookie write requests is too large.

Let's first review Pulsar broker's strategy for writing to bookie:

When broker receives the message sent by producer, it first stores the message in the direct memory of broker, and then invokes bookie client to send the message to bookies in pipeline mode according to the configured (EnsembleSize,WriteQuorum,AckQuorum) policy.

Bookie client counts the write failure rate of each bookie every minute (including various exceptions such as write timeout). By default, when the failure rate exceeds 5 times per minute, the bookie will be locked in a dark room for 30 minutes to avoid continuously writing data to the abnormal bookie, thus ensuring the success rate of message writes.

The problem with this circuit breaker strategy is that when the load (traffic) of a bookie is very high, all messages written to the bookie may slow down at the same time, and all bookie client may receive write exceptions, such as write timeout, etc., then all bookie client will lock the bookie in the dark room for 30 minutes at the same time, and then add it to the writable list at the same time. This causes the load of this bookie to rise and fall periodically.

In order to solve this problem, we introduce a probability-based quarantine mechanism. When an exception occurs in the bookie client write message, the bookie is not directly locked in the dark room, but whether the quarantine is decided based on the probability.

This quarantine strategy can prevent all bookie client from locking the same bookie into a small dark room at the same time, and avoid bookie inbound traffic jitter. For PR, please see: BookKeeper PR-2327, because the code is not merged and released to the main version of bookie, if you want to use this feature, you need to compile your own code: https://github.com/apache/bookkeeper/pull/2327.

According to the BIGO practice test, this feature reduces the standard deviation of inbound traffic between bookie nodes from 75 MB/s to 40 MB/s.

Current limit > > Broker direct memory OOM (memory overflow)

In a production environment, in high throughput scenarios, we often encounter broker direct memory OOM, which causes the broker process to crash. The reason for this may be that the underlying bookie writes are slower, resulting in a large backlog of data in the broker direct memory. The processing of messages sent by Producer in broker is shown in the following figure:

In a production environment, we cannot guarantee that the underlying bookie always maintains a very low write latency, so we need to limit the current at the broker layer. Penghui of the Pulsar community has developed a current-limiting feature. The current-limiting logic is shown in the following figure:

Released in Pulsar version 2.5.1, see PR-6178.

Consumer consumes a lot of memory

When the producer side sends messages in batch mode, the consumer side often takes up too much memory, resulting in frequent GC. The monitoring performance is that the load of this topic soars when the consumer starts, and then gradually returns to the normal level.

The cause of this problem needs to be seen in combination with the consumption pattern on the consumer side.

When consumer calls the receive interface to consume a message, it requests a message directly from the local receiverQueue. If there is a message in the receiverQueue that can be obtained, it directly returns the message to the consumer side and updates the availablePermit. When availablePermit < receiverQueueSize/2, Pulsar client will send availablePermit to broker, telling broker how many messages push needs to come over. If there is no message in the receiverQueue to get, the wait / return fails, and the consumer is not woken up until the receiverQueue receives the message pushed by broker.

After Broker receives the availablePermit, it reads the max (availablePermit, batchSize) entry from the broker Cache/bookie and sends it to the consumer. The processing logic is shown in the following figure:

The problem here is: when producer enables batch mode to send, an entry contains multiple messages, but broker still processes a message as an entry when processing availablePermit requests, resulting in broker sending a large amount of information to consumer at one time, which far exceeds the acceptable capacity of availiablePermit (availiablePermit vs. AvailiablePermit * batchSize), causing consumer to take up memory skyrocketing, causing frequent GC, and reducing consumption performance.

In order to solve the problem of skyrocketing memory on the consumer side, we count the average number of messages (avgMessageSizePerEntry) contained in each topic entry on the broker side. When we receive the availablePermit of the consumer request, convert it to the entry size that needs to be sent, and then pull the corresponding number of entry from the broker Cache/bookie and send it to the consumer. The processing logic is shown in the following figure:

This feature has been released in Pulsar 2.6.0 and is off by default. You can enable this feature by using the following switch:

# Precise dispatcher flow control according to history message number of each entrypreciseDispatcherFlowControl=true increases Cache hit rate

There are several layers of Cache in Pulsar to improve the read performance of message, including:

Broker Cache

Bookie write Cache (Memtable)

Bookie read Cache

OS PageCache

This chapter mainly introduces the operation mechanism and tuning scheme of broker Cache, and the Cache tuning on the bookie side is introduced in the second part.

When broker receives a message sent by producer to a topic, it first determines whether the topic has an Active Cursor, and if so, writes the received message to the corresponding Cache of the topic; otherwise, no Cache is written. The processing flow is shown in the following figure:

Determine if there is an Active Cursor that needs to meet both of the following conditions:

There is durable cursor.

The lag of Cursor is within the range of managedLedgerCursorBackloggedThreshold

Because reader uses non-durable cursor for consumption, messages written by producer do not enter broker Cache, resulting in a large number of requests falling on bookie and performance loss.

Streamnative/pulsar-flink-connector uses reader API for consumption, so it also has the problem of low consumption performance.

Zhao Rongsheng of our BIGO message queuing team fixed this problem by removing durable cursor from the Active Cursor judgment condition. For more information, please see PR-6769. This feature is released in Pulsar 2.5.2. If you encounter related performance problems, please upgrade the Pulsar version to 2.5.2 or above.

In addition, we have added Cache hit rate monitoring for each subscription of topic to facilitate the identification of consumer performance issues, which will be contributed to the community later.

Tailing Read

For the data already in broker Cache, in the tailing read scenario, how can we improve the Cache hit rate and reduce the probability of reading data from bookie? Our idea is to make the data read from the broker Cache as much as possible, and to ensure this, we start to optimize in two places:

The maximum lag range determined to be Active Cursor is 1000 entry by default, which is controlled by the following parameters:

# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'# and thus should be set as inactive.managedLedgerCursorBackloggedThreshold=1000

The decision of Active Cursor is shown in the following figure.

Control the eviction policy of broker Cache. Currently, only the default eviction policy is supported in Pulsar. Students with needs can expand it on their own. The default eviction policy is controlled by the following parameters:

# Amount of memory to use for caching data payload in managed ledger. This memory# is allocated from JVM direct memory and it's shared across all the topics# running in the same broker. By default, uses 1/5th of available direct memorymanagedLedgerCacheSizeMB=# Whether we should make a copy of the entry payloads when inserting in cachemanagedLedgerCacheCopyEntries=false# Threshold to which bring down the cache level when eviction is triggeredmanagedLedgerCacheEvictionWatermark=0.9# Configure the cache eviction frequency for the managed ledger cache (evictions/sec) managedLedgerCacheEvictionFrequency=100.0# All entries that have stayed in cache for more than the configured time, will be evictedmanagedLedgerCacheEvictionTimeThresholdMillis=1000Catchup Read

For Catchup Read scenarios, broker Cache is likely to be lost and all read requests will fall on bookie, so is there any way to improve the performance of reading bookie?

Broker sends read requests to bookie in batches, and the maximum batch is controlled by dispatcherMaxReadBatchSize. The default is 100 entry.

# Max number of entries to read from bookkeeper. By default it is 100 entries.dispatcherMaxReadBatchSize=100

The larger the batchSize read at a time, the more efficient the underlying bookie reads from disk, and the lower the read latency allocated to a single entry. However, too large can also cause batch read latency to increase, because the underlying bookie read operation reads one entry at a time and is read synchronously.

The read tuning in this section is described in "Apache Pulsar's performance tuning in BIGO (part two)".

Ensure low read and write latency of ZooKeeper

Because both Pulsar and BookKeeper are heavily dependent on ZooKeeper, if the read and write latency of ZooKeeper increases, it will lead to Pulsar service instability. Therefore, it is necessary to give priority to ensuring low read and write latency of ZooKeeper. The recommendations are as follows:

When the disk is HDD, ZooKeeper dataDir/dataLogDir should not be placed on the same disk as other services that consume IO (such as bookie Journal/Ledger directory) (except SSD)

ZooKeeper dataDir and dataLogDir are best placed on two separate disks (except SSD)

Monitor the utilization of the broker/bookie network card to avoid losing contact with ZooKeeper due to the network card being full.

Close auto bundle split to ensure the stability of the system

Pulsar bundle split is a resource-intensive operation that causes all producer/consumer/reader connections connected to this bundle to be disconnected and reconnected. In general, the reason for triggering auto bundle split is that the pressure on this bundle is relatively high. It is necessary to split the bundle into two bundle and allocate the traffic to other broker to reduce the pressure on this bundle. The parameters for controlling auto bundle split are as follows:

# enable/disable namespace bundle auto splitloadBalancerAutoBundleSplitEnabled=true# enable/disable automatic unloading of split bundlesloadBalancerAutoUnloadSplitBundlesEnabled=true# maximum topics in a bundle, otherwise bundle split will be triggeredloadBalancerNamespaceBundleMaxTopics=1000# maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggeredloadBalancerNamespaceBundleMaxSessions=1000# maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggeredloadBalancerNamespaceBundleMaxMsgRate=30000# maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggeredloadBalancerNamespaceBundleMaxBandwidthMbytes=100

When the auto bundle split is triggered, the broker load is relatively high. If you close the producer/consumer/reader on the bundle, the connection will become slower, and the bundle split will take longer, which can easily cause the client (producer/consumer/reader) connection to time out and fail, trigger the client end to reconnect automatically, and cause Pulsar/Pulsar client instability.

For production environments, our recommendation is to allocate the number of bundle for each namespace in advance and turn off the auto bundle split feature. If a bundle pressure is found to be too high during operation, manual bundle split can be carried out during the flow trough to reduce the impact on the client side.

The number of pre-allocated bundle should not be too large, too much bundle will put more pressure on the ZooKeeper, because each bundle has to report its own statistics to the ZooKeeper on a regular basis.

After reading the above, have you mastered the practical method of Apache Pulsar's performance tuning in BIGO? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report