In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article focuses on "what is a go-stash component", interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "what are go-stash components"?
Preface
Today, let's introduce go-stash, another component of go-zero ecology. This is an alternative version of the Go language of logstash, and we use go-stash to save 2 and 3 server resources compared to the original logstash. If you are using logstash, give it a try and see how easy it is to implement such a tool based on go-zero, which took the author only two days.
Overall architecture
From its configuration, let's take a look at the design architecture.
Clusters:-Input: Kafka: # Kafka configuration-> Joint go-queue Filters: # filter action-Action: drop-Action: remove_field-Action: transfer Output: ElasticSearch: # es configuration {host, index}
Look at the configuration name: kafka is the data output, es is the data input, filter abstracts the data processing process.
Yes, the entire go-stash is shown in the config configuration, WYSIWYG.
Start
The startup process from stash.go is roughly divided into several parts. Because multiple cluster can be configured, analyze from one cluster:
Establish a connection to es [input es configuration]
Build filter processors [es pre-processor, do data filtering and processing, you can set multiple]
Improve the index configuration in es, start handle, and add filter to handle [handle input and output]
Connect the downstream kafka, pass in the handle created above, and complete the data consumption and data writing between kafka and es
MessageHandler
In the architecture diagram above, the middle filter is only seen in config, but it is actually part of MessageHandler in more detail, doing data filtering and transformation. Let's talk about this.
> the following code: https://github.com/tal-tech/go-stash/tree/master/stash/handler/handler.go
Type MessageHandler struct {writer * es.Writer indexer * es.Index filters [] filter.FilterFunc}
This corresponds to the above, filter is only a part of it, structurally MessageHandler is connected to the downstream es, but does not see the operation of kafka.
Don't worry, MessageHandler implements the ConsumeHandler interface in go-queue from the interface design.
Here, the upstream and downstream are connected:
MessageHandler takes over the operation of es and is responsible for data processing to data writing.
The Consume operation of kafka is realized on the. In this way, the operation of handler is performed during the consumption process, thus writing to es
In fact, Consume () does the same thing:
Func (mh * MessageHandler) Consume (_, val string) error {var map [string] interface {} / / deserialize the message from kafka if err: = jsoniter.Unmarshal ([] byte (val), & m) Err! = nil {return err} / / es write index configuration index: = mh.indexer.GetIndex (m) / / filter chained processing [because there is no generics, the whole processing is `map in and out `] for _, proc: = range mh.filters {if m = proc (m) M = = nil {return nil}} bs, err: = jsoniter.Marshal (m) if err! = nil {return err} / / es write return mh.writer.Write (index, string (bs))} data stream
Finished the data processing, as well as the upstream and downstream connection points. However, the data should be from kafka-> es. From the kafka point of view, the data outflow should be actively pull data from kafka by the developer.
So how does the data flow move? Let's go back to the main program https://github.com/tal-tech/go-stash/blob/master/stash/stash.go.
In fact, in starting the whole process, it is actually a combination mode:
Func main () {/ / parse command line arguments Start elegant exit. / / service combination mode group: = service.NewServiceGroup () defer group.Stop () for _, processor: = range c.Clusters {/ / connect es. / / filter processors build. / / prepare es write operation {write index Writer writer} handle: = handler.NewHandler (writer, indexer) handle.AddFilters (filters...) Handle.AddFilters (filter.AddUriFieldFilter ("url", "uri")) / / start kafka according to the configuration and pass in the consumption operation At the same time, add combiner for _, k: = range toKqConf (processor.Input.Kafka) {group.Add (kq.MustNewQueue (k, handle))}} / / start this combiner group.Start ()}
The whole data flow is related to this group combiner.
Group.Start () |-group.doStart () |-[service.Start () for service in group.services]
So the service that joins group implements Start (). In other words, the startup logic on the kafka side is in Start ():
Func (Q * kafkaQueue) Start () {q.startConsumers () q.startProducers () q.producerRoutines.Wait () close (q.channel) q.consumerRoutines.Wait ()}
Start the kafka consumer program
Start the kafka consumer pull end [may be confused by the name, actually pull the message from kafka to q.channel]
The consumer program is terminated and the work is finished.
We pass in handler in kafka, which is actually Consume as mentioned above, and this method is executed in q.startConsumers ():
Q.startConsumers () |-[q.consumeOne (key, value) for msg in q.channel] |-q.handler.Consume (key, value)
In this way, the whole data stream is completely strung together:
At this point, I believe you have a deeper understanding of "what is a go-stash component", might as well come to the actual operation of it! Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.