In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces the knowledge of "what is the real-time log processing method of Flink and Drools". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Background
There are many kinds of logs connected to the log system, and the formats are complex and diverse. The mainstream logs are as follows:
The text log collected by filebeat is in various formats. The operating system log collected by winbeat is reported to the syslog log of logstash and connected to the business log of kafka.
There are two major problems with the logs accessed through various channels:
How to extract the indicators that users care about from all kinds of logs and mine more business value
In order to solve the above two problems, we do real-time log processing service based on flink and drools rule engine.
System architecture
The architecture is relatively simple, and the architecture diagram is as follows:
All kinds of logs are summarized through kafka for log transfer.
Flink consumes kafka data and pulls drools rule engine through API call. After parsing the log, the parsed data is stored in Elasticsearch for log search and analysis.
In order to monitor the real-time status of log parsing, flink writes the statistics processed by the log, such as the number of logs processed per minute, and the number of logs from each machine IP to Redis to monitor statistics.
Module introduction
The system project is named eagle.
Eagle-api: based on springboot, serves as a write and read API service for the drools rules engine.
Eagle-common: general class module.
Eagle-log: flink-based log processing service.
Let's focus on eagle-log:
Docking kafka, ES and Redis
Interface between kafka and ES is relatively simple, using the official connector (flink-connector-kafka-0.10 and flink-connector-elasticsearch7), see the code for details.
Docking with Redis started with redis connector provided by org.apache.bahir, but later found that it was not flexible enough, so it used Jedis.
When the statistical data is written to redis, the packet data is cached after the initial keyby grouping and written in sink after statistical processing. The reference code is as follows:
String name = "redis-agg-log"
DataStream keyedStream = dataSource.keyBy ((KeySelector) log-> log.getIndex ())
.timeWindow (Time.seconds (windowTime)) .trigger (new CountTriggerWithTimeout (windowCount, TimeCharacteristic.ProcessingTime))
.process (new ProcessWindowFunction () {)
@ Override
Public void process (String s, Context context, Iterable iterable, Collector collector) {
ArrayList logs = Lists.newArrayList (iterable)
If (logs.size () > 0) {
Collector.collect (new Tuple2 (s, logs))
}
}
}) .setParallelism (redisSinkParallelism) .name (name) .uid (name)
Later, it is found that this consumes a lot of memory. In fact, there is no need to cache the original data of the entire grouping, only one statistical data is needed to OK. After optimization:
String name = "redis-agg-log"
DataStream keyedStream = dataSource.keyBy ((KeySelector) log-> log.getIndex ())
.timeWindow (Time.seconds (windowTime))
.trigger (new CountTriggerWithTimeout (windowCount, TimeCharacteristic.ProcessingTime))
New LogStatAggregateFunction (), new LogStatWindowFunction ()
.setParallelism (redisSinkParallelism) .name (name) .uid (name)
Here, the aggregate function of flink and Accumulator are used to make statistics through the agg operation of flink to reduce the pressure of memory consumption.
Using the broadcast broadcast drools rules engine
1. The drools rule stream is broadcast through broadcast map state.
2. Kafka data flow connect rule flow handles logs.
/ / broadcast rule flow
Env.addSource (new RuleSourceFunction (ruleUrl)) .name (ruleName) .uid (ruleName) .setParallelism (1)
.clients (ruleStateDescriptor)
/ / kafka data flow
FlinkKafkaConsumer010 source = new FlinkKafkaConsumer010 (kafkaTopic, new LogSchema (), properties); env.addSource (source) .name (kafkaTopic) .uid (kafkaTopic) .setParallelism (kafkaParallelism)
/ / data flow connect rule flow processing log
BroadcastConnectedStream connectedStreams = dataSource.connect (ruleSource)
ConnectedStreams.process (new LogProcessFunction (ruleStateDescriptor, ruleBase)) .setParallelism (processParallelism) .name (name) .uid (name); "what are the real-time log processing methods for Flink and Drools". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.