Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How Golang listens to log files and sends them to kafka

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how Golang monitors log files and sends them to kafka". The explanation in this article is simple and clear, easy to learn and understand, and now please follow the editor's ideas slowly and deeply, together to study and learn "how Golang monitors log files and sends them to kafka"!

The golang libraries and visualization tools involved:

Go-ini,sarama,tail where:

Go-ini: used to read configuration files and manage configuration items uniformly, which is beneficial to its maintenance

Sarama: is a client that go operates on kafka. Currently I use it to send messages to kefka

Tail: similar to linux's tail command, read the last few lines of the file. If the file has additional data, it will be detected. It is used to monitor log files.

Visualization tools:

Offsetexplorer: it is a visualization tool for kafka. It is used to check whether the message has been delivered successfully.

The flow of work

Load the configuration and initialize sarama and kafka.

Start a cooperative process and use tail to constantly listen for changes in log files.

The main cooperation program has been blocking waiting for the tail to send messages, and the two communicate through a pipeline. Once the master program receives the new log, assemble the format and send it to the kafka

Environmental preparation

In the environment, make sure that zookeeper and kafka are running properly. Because you haven't used sarama to read the data, use offsetexplorer to see if the task was actually delivered successfully.

Code layering

Serve to store tail service classes and sarama service classes, and conf to store ini configuration files

The main function is the entry of the program

The key code main.go

The main function does the following: build the configuration structure and map the configuration file. Invoke and initialize the tail,srama service.

Package mainimport ("fmt"sarama/serve"github.com/go-ini/ini") type KafkaConfig struct {Address string `ini: "address" `ChannelSize int `ini: "chan_size" `} type TailConfig struct {Path string `ini: "path" `Filename string `ini: "fileName"` / / if it is a structure Specify the Children `ini: "tailfile.children" `} type Config struct {KafkaConfig `ini: "kafka" `TailConfig `ini: "tailfile"`} type Children struct {Name string `ini: "name" `} func main () {/ / load configuration var cfg = new (Config) err: = ini.MapTo (cfg) ". / conf/go-conf.ini") if err! = nil {fmt.Print (err)} / / initialize kafka ks: = & serve.KafukaServe {} / / start kafka message listening. When asynchronous ks.InitKafka ([] string {cfg.KafkaConfig.Address}, int64 (cfg.KafkaConfig.ChannelSize)) / / closes channel defer ks.Destruct () / / initializes tail ts: = & serve.TailServe {} ts.TailInit (cfg.TailConfig.Path + "/" + cfg.TailConfig.Filename) / / blocking ts.Listener (ks.MsgChan)} kafka.go

There are three ways:

InitKafka, assembling configuration items and initializing the pipeline to receive messages

Listener, listen for pipeline messages. After receiving the messages, assemble the messages and send them to kafka.

Destruct, close the pipe

Package serveimport ("fmt"github.com/Shopify/sarama") type KafukaServe struct {MsgChan chan string / / err error} func (ks * KafukaServe) InitKafka (addr [] string, chanSize int64) {/ / read configuration config: = sarama.NewConfig () / / 1. Initialize producer configuration config.Producer.RequiredAcks = sarama.WaitForAll / / Select partition config.Producer.Partitioner = sarama.NewRandomPartitioner / / Information for successful delivery config.Producer.Return.Successes = true ks.MsgChan = make (chan string, chanSize) go ks.Listener (addr, chanSize, config)} func (ks * KafukaServe) Listener (addr [] string, chanSize int64 Config * sarama.Config) {/ / Connect kafka var kafkaClient, _ = sarama.NewSyncProducer (addr, config) defer kafkaClient.Close () for {select {case content: =

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report