Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use kafka and Sarama in Golang language

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article introduces how to use kafka and Sarama in Golang language. The content is very detailed. Interested friends can use it for reference. I hope it will be helpful to you.

01. Introduction

Apache Kafka is an open source message engine system. Its main role in the project is to cut peak, fill valley and decouple. In this article, we will only introduce Apache Kafka's Golang client library, Sarama. Sarama is a MIT-licensed Apache Kafka 0.8 and later Golang client library.

If readers are not familiar with the Apache Kafka server, it is recommended to read the getting started section of the official documentation. The version used in this article is Apache Kafka 2.8.For those who are not familiar with the Apache Kafka server, it is recommended to read the introductory section of the official documentation first.

02. Producer

We can use the AsyncProducer or SyncProducer of the Sarama library to produce messages. In most cases, AsyncProducer is preferred to produce messages. It receives messages through an channel and produces messages asynchronously as efficiently as possible in the background.

SyncProducer sends a Kafka message and blocks until an ACK acknowledgment is received. SyncProducer has two caveats: it is generally inefficient, and the actual durability guarantee depends on the configuration value of Producer.RequiredAcks. In some configurations, messages acknowledged by SyncProducer are still sometimes lost, but they are relatively simple to use.

To make it easy for readers to understand, this article introduces how SyncProducer is used as a producer. If readers want to know how AsyncProducer is used as a producer, please refer to the official documentation.

Sample code that uses SyncProducer as the producer:

Func sendMessage (brokerAddr [] string, config * sarama.Config, topic string, value sarama.Encoder) {producer, err: = sarama.NewSyncProducer (brokerAddr, config) if err! = nil {fmt.Println (err) return} defer func () {if err = producer.Close () Err! = nil {fmt.Println (err) return} () msg: = & sarama.ProducerMessage {Topic: topic, Value: value,} partition, offset, err: = producer.SendMessage (msg) if err! = nil {fmt.Println (err) return} fmt.Printf ("partition:%d offset:%d\ n", partition, offset)}

Reading the above code, we call NewSyncProducer () to create a new SyncProducer, given the broker address and configuration information. Call SendMessage () to produce the given message and return only if the production succeeds or fails. It returns the partition (Partition) and the offset of the production message (Offset), or an error if the message production fails.

It is important to note that to avoid disclosure, Close () must be called on the producer, because it may not be automatically garbage collected when it goes out of scope.

03. Consumers

We can use the consumer Consumer of the Sarama library or the consumer group ConsumerGroup API to consume messages. To make it easy for readers to understand, in this article we introduce the use of Consumer to consume messages.

Consumer manages the PartitionConsumers, which processes Kafka messages from brokers.

Sample code for Consumer consumption messages:

Func consumer (brokenAddr [] string, topic string, partition int32, offset int64) {consumer, err: = sarama.NewConsumer (brokenAddr, nil) if err! = nil {fmt.Println (err) return} defer func () {if err = consumer.Close () Err! = nil {fmt.Println (err) return} () partitionConsumer, err: = consumer.ConsumePartition (topic, partition, offset) if err! = nil {fmt.Println (err) return} defer func () {if err = partitionConsumer.Close () Err! = nil {fmt.Println (err) return}} () for msg: = range partitionConsumer.Messages () {fmt.Printf ("partition:%d offset:%d key:%s val:%s\ n", msg.Partition, msg.Offset, msg.Key, msg.Value)}}

Reading the above code, we call NewConsumer () to create a new consumer, given the broker address and configuration information. Call ConsumePartition () to create a PartitionConsumer, given topic, partition, and offset. PartitionConsumer processes Kafka messages from a given topic and partition.

It is important to note that in order to prevent disclosure, Close () of consumer and partitionConsumer must be called, because when it goes out of range, it may not be automatically garbage collected.

So much for sharing about how to use kafka and Sarama in Golang language. I hope the above content can be of some help and learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report