In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the log service Python consumer group how to distribute data in real time, the content is very detailed, interested friends can refer to, I hope it can be helpful to you.
Scene target
Logs collected by Web-tracking, logtail, syslog, etc. using CLS often have various formats. We need to distribute specific logs (such as topic) to specific logstore for processing and indexing. This document mainly describes how to use consumer groups to distribute logs to the target log library that is not available in real time. And make use of the specificity of the consumer group to achieve automatic balancing, load balancing and high availability.
Cdn.com/cdd946a301ff55389b6f03559de059b6dc3b4b81.png ">
Basic concept
Collaborative consumption Library (Consumer Library) is an advanced mode for consuming logs in log services. It provides the concept of consumption group (ConsumerGroup) to abstract and manage consumers. The difference between using SDK for data reading is that users do not need to care about the implementation details of log services, but only need to focus on business logic. In addition, users such as load balancing and failover among consumers do not need to care.
Consumption group (Consumer Group)-A consumption group consists of multiple consumers. Consumers under the same consumption group jointly consume the data in the same logstore. Consumers will not repeat the consumption data.
Consumer (Consumer)-the constituent unit of a consumer group that actually undertakes the consumption task. The consumer name under the same consumer group must be different.
In CLS, there will be multiple shard under a logstore. The function of collaborative consumption database is to assign shard to consumers under a consumer group. The allocation method follows the following principles:
Each shard is assigned to only one consumer.
A consumer can have multiple shard at the same time.
New consumers join a consumption group, and the shard dependency under this consumption group will be adjusted to achieve the purpose of consumption load balancing, but the above allocation principle will not change, and the allocation process will be transparent to users.
Another function of the collaborative consumer library is to save the checkpoint so that the program can continue to consume from the breakpoint when the program is recovered, so as to ensure that the data will not be consumed repeatedly.
Real-time distribution using consumer groups
Here we describe programming with Python using consumer groups to distribute data in real time according to the topic of the data.
Note: the relevant code for this article may be updated, and the latest version can be found here: Github sample.
Installation
Environment
It is recommended that the program run on the ECS under the same Region as the source log database, and use the local area network service entry. The advantage is that the network is the fastest, and the second is that there is no external network charge for reading.
PyPy3 is highly recommended to run this program instead of using the standard CPython interpreter.
The Python SDK of CLS can be installed as follows:
Pypy3-m pip install aliyun-log-python-sdk-U
For more user manuals of SLS Python SDK, please refer to here
Program configuration
How to configure the program is shown below:
Configure the program log file for subsequent testing or diagnosis of possible problems (skip, refer to the sample).
Basic log service connection and configuration options for consumer groups.
Some connection information of the target Logstore
Please read the comments in the code carefully and adjust the options as needed:
# encoding: utf8def get_option (): # basic options # load SLS parameters and options from environment variables You can configure multiple targets accessKeyId = os.environ.get ('SLS_AK_ID','') accessKey = os.environ.get ('SLS_AK_KEY','') endpoint = os.environ.get ('SLS_ENDPOINT','') project = os.environ.get ('SLS_PROJECT','') logstore = os.environ.get ('SLS_LOGSTORE'') as needed '') to_endpoint = os.environ.get ('SLS_ENDPOINT_TO', endpoint) to_project = os.environ.get (' SLS_PROJECT_TO', project) to_logstore1 = os.environ.get ('SLS_LOGSTORE_TO1','') to_logstore2 = os.environ.get ('SLS_LOGSTORE_TO2',') to_logstore3 = os.environ.get ('SLS_LOGSTORE_TO3' '') consumer_group = os.environ.get ('SLS_CG',') # the starting point of consumption This parameter is valid the first time you run the program, and subsequent runs will continue from the SavePoint of the last consumption. # you can make "begin", "end", or a specific ISO time format. Cursor_start_time = "2018-12-26 0:0:0" # generally do not change the consumer name Especially when you need to run concurrently, consumer_name = "{0}-{1}" .format (consumer_group, current_process (). Pid) # build a consumer group and consumer option = LogHubConfig (endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time) # bind put_log_raw which is faster to_client = LogClient (to_endpoint, accessKeyId AccessKey) put_method1 = partial (to_client.put_log_raw, project=to_project, logstore=to_logstore1) put_method2 = partial (to_client.put_log_raw, project=to_project, logstore=to_logstore2) put_method3 = partial (to_client.put_log_raw, project=to_project, logstore=to_logstore3) return option, {upright ngnix: put_method1, upright sql auditorium: put_method2, upright clicker: put_method3}
Note that functools.partial is used to bind the put_log_raw to facilitate subsequent calls.
Data consumption and distribution
The following code shows how to get the data from SLS and forward it according to topic.
If _ _ name__ ='_ _ main__': option, put_methods = get_copy_option () def copy_data (shard_id, log_groups): for log_group in log_groups.LogGroups: # update topic if log_group.Topic in put_methods: put_ methodslog _ group.Topic] (log_group=log_group) logger.info ("* start to consume data...") Worker = ConsumerWorker (ConsumerProcessorAdaptor, option, args= (copy_data,)) worker.start (join=True) start
Assuming that the program is named "dispatch_data.py", you can start it as follows:
Export SLS_ENDPOINT=export SLS_AK_ID=export SLS_AK_KEY=export SLS_PROJECT=export SLS_LOGSTORE=export SLS_LOGSTORE_TO1=export SLS_LOGSTORE_TO1=export SLS_LOGSTORE_TO1=export SLS_CG=pypy3 dispatch_data.py performance considers launching multiple consumers
Programs based on consumer groups can be started several times directly to achieve concurrency:
Nohup pypy3 dispatch_data.py &...
Note:
All consumers use the same consumer group name and different consumer names (because the consumer name is suffixed with the process ID).
Because a Shard can only be consumed by one consumer, suppose a log library has 10 partitions, then a maximum of 10 consumers consume at the same time.
Performance throughput
Based on the test, in the case of no bandwidth limit and rate limit of the receiver (such as Splunk), to promote the hardware to run the above sample with pypy3, a single consumer can consume a rate of 5 MB/s original log under about 10% of the single-core CPU. Therefore, it is theoretically possible to achieve 50 MB/s raw logs per CPU core, that is, each CPU core can consume 4TB raw logs every day.
Note: this data depends on bandwidth, hardware parameters, and whether the target Logstore can receive data quickly.
High availability
The consumer group will save the check-point on the server side, and when one consumer stops, the other consumer will automatically take over and continue spending from the breakpoint.
Consumers can be started on different machines so that when one machine stops or breaks down, consumers on other machines can automatically take over and spend from the breakpoint.
In theory, consumers who are larger than the number of shard can also be started for backup.
Other restrictions and constraints
A maximum of 10 consumer groups can be configured for each log library (logstore). If you encounter an error ConsumerGroupQuotaExceed, it is recommended to delete some unused consumer groups on the console.
Monitor and control
View the status of consumer groups in the console
Check the delay of consumer groups through cloud monitoring and configure alarms
Https
If the service entry (endpoint) is configured with a https:// prefix, such as https://cn-beijing.log.aliyuncs.com, the program's connection to SLS is automatically encrypted using HTTPS.
The server certificate * .aliyuncs.com is signed by GlobalSign, which is automatically trusted by most Linux/Windows machines by default. If there are special circumstances, the machine does not trust this certificate, you can refer to here to download and install this certificate.
On the log service Python consumer group how to distribute data in real time to share here, I hope the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.