Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the real-time log processing method of Flink and Drools

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the knowledge of "what is the real-time log processing method of Flink and Drools". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

Background

There are many kinds of logs connected to the log system, and the formats are complex and diverse. The mainstream logs are as follows:

The text log collected by filebeat is in various formats. The operating system log collected by winbeat is reported to the syslog log of logstash and connected to the business log of kafka.

There are two major problems with the logs accessed through various channels:

How to extract the indicators that users care about from all kinds of logs and mine more business value

In order to solve the above two problems, we do real-time log processing service based on flink and drools rule engine.

System architecture

The architecture is relatively simple, and the architecture diagram is as follows:

All kinds of logs are summarized through kafka for log transfer.

Flink consumes kafka data and pulls drools rule engine through API call. After parsing the log, the parsed data is stored in Elasticsearch for log search and analysis.

In order to monitor the real-time status of log parsing, flink writes the statistics processed by the log, such as the number of logs processed per minute, and the number of logs from each machine IP to Redis to monitor statistics.

Module introduction

The system project is named eagle.

Eagle-api: based on springboot, serves as a write and read API service for the drools rules engine.

Eagle-common: general class module.

Eagle-log: flink-based log processing service.

Let's focus on eagle-log:

Docking kafka, ES and Redis

Interface between kafka and ES is relatively simple, using the official connector (flink-connector-kafka-0.10 and flink-connector-elasticsearch7), see the code for details.

Docking with Redis started with redis connector provided by org.apache.bahir, but later found that it was not flexible enough, so it used Jedis.

When the statistical data is written to redis, the packet data is cached after the initial keyby grouping and written in sink after statistical processing. The reference code is as follows:

String name = "redis-agg-log"

DataStream keyedStream = dataSource.keyBy ((KeySelector) log-> log.getIndex ())

.timeWindow (Time.seconds (windowTime)) .trigger (new CountTriggerWithTimeout (windowCount, TimeCharacteristic.ProcessingTime))

.process (new ProcessWindowFunction () {)

@ Override

Public void process (String s, Context context, Iterable iterable, Collector collector) {

ArrayList logs = Lists.newArrayList (iterable)

If (logs.size () > 0) {

Collector.collect (new Tuple2 (s, logs))

}

}

}) .setParallelism (redisSinkParallelism) .name (name) .uid (name)

Later, it is found that this consumes a lot of memory. In fact, there is no need to cache the original data of the entire grouping, only one statistical data is needed to OK. After optimization:

String name = "redis-agg-log"

DataStream keyedStream = dataSource.keyBy ((KeySelector) log-> log.getIndex ())

.timeWindow (Time.seconds (windowTime))

.trigger (new CountTriggerWithTimeout (windowCount, TimeCharacteristic.ProcessingTime))

New LogStatAggregateFunction (), new LogStatWindowFunction ()

.setParallelism (redisSinkParallelism) .name (name) .uid (name)

Here, the aggregate function of flink and Accumulator are used to make statistics through the agg operation of flink to reduce the pressure of memory consumption.

Using the broadcast broadcast drools rules engine

1. The drools rule stream is broadcast through broadcast map state.

2. Kafka data flow connect rule flow handles logs.

/ / broadcast rule flow

Env.addSource (new RuleSourceFunction (ruleUrl)) .name (ruleName) .uid (ruleName) .setParallelism (1)

.clients (ruleStateDescriptor)

/ / kafka data flow

FlinkKafkaConsumer010 source = new FlinkKafkaConsumer010 (kafkaTopic, new LogSchema (), properties); env.addSource (source) .name (kafkaTopic) .uid (kafkaTopic) .setParallelism (kafkaParallelism)

/ / data flow connect rule flow processing log

BroadcastConnectedStream connectedStreams = dataSource.connect (ruleSource)

ConnectedStreams.process (new LogProcessFunction (ruleStateDescriptor, ruleBase)) .setParallelism (processParallelism) .name (name) .uid (name); "what are the real-time log processing methods for Flink and Drools". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report