Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to achieve the optimal throughput of Kafka

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article introduces the knowledge of "how to achieve the best throughput of Kafka". In the operation of practical cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

Start using func main () {/ / 1. Initialize pusher: = kq.NewPusher ([] string {"127.0.0.1 string 19092", "127.0.0.1 string 19092", "127.0.0.1 string 19092",}, "kq") ticker: = time.NewTicker (time.Millisecond) for round: = 0; round

< 3; round++ { select { case 当然,目前只支持单个 msg 写入。可能有人会疑惑,那就继续往下看,为什么只能一条一条写入? 初始化 一起看看 pusher 初始化哪些步骤: NewPusher(clusterAddrs, topic, opts...) |- kafka.NewWriter(kfConfig) // 与 kf 之前的连接 |- executor = executors.NewChunkExecutor() // 设置内部写入的executor为字节数定量写入 建立与 kafka cluster 的连接。此处肯定就要传入 kafka config; 设置内部暂存区的写入函数以及刷新规则。 使用 chunkExecutor 作用不言而喻:将随机写 ->

Write in batches to reduce the consumption of Ihammer O, while ensuring that a single write cannot exceed the default 1m or the maximum number of bytes set by yourself.

In fact, if you look inside the chunkExecutor, there are actually two indicators for each trigger insertion:

MaxChunkSize: maximum number of bytes written at a time

FlushInterval: refreshes the interval between staging message inserts

When a write is triggered, a write is performed as long as any metric is met. At the same time, the insertion interval is set in the executors to prevent write blocking in the scratch area and the messages in the cache area are not refreshed and emptied all the time.

For more information about executors, please see the following: https://zeromicro.github.io/go-zero/executors.html

Producer insertion

According to the introduction to executors based on the above initialization, it is necessary to cooperate with it during the insertion process:

Func (p * Pusher) Push (v string) error {/ / 1. Insert Message msg inside msg-> kafka: = kafka.Message {Key: [] byte (strconv.FormatInt (time.Now (). UnixNano (), 10)), Value: [] byte (v),} / use executor.Add () to insert internal container / / when executor initialization fails or an internal error occurs Message will also be directly inserted into kafka if p.executor! = nil {return p.executor.Add (msg, len (v))} else {return p.produer.WriteMessages (context.Background (), msg)}}

The process is actually very simple. So how does executors.Add (msg, len (msg)) insert msg into kafka?

The logic of the insertion is actually declared in initialization:

Pusher.executor = executors.NewChunkExecutor (func (tasks [] interface {}) {chunk: = make ([] kafka.Message, len (tasks)) / / 1 for I: = range tasks {chunk [I] = tasks [I]. (kafka.Message)} / / 2 if err: = pusher.produer.WriteMessages (context.Background () Chunk...) Err! = nil {logx.Error (err)}}, newOptions (opts)...)

When the insert is triggered, the [] msg stored in the staging area is taken out in turn as the final insert message collection.

Insert the message collection from the previous step into the topic of kafka as a batch

So pusher-> chunkExecutor-> kafka a link appears.

This is the end of "how to achieve the best throughput of Kafka". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report