Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the Technical practice of Real-time Operation and maintenance of Public Security big data based on Spark

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

How to analyze the real-time operation and maintenance technology practice of public security big data based on Spark, many novices are not very clear about this. In order to help you solve this problem, the following editor will explain it in detail. People with this need can come and learn. I hope you can get something.

There are tens of thousands of front and rear devices in the public security industry, the front-end equipment includes cameras, detectors and sensors, and the back-end equipment includes servers, application servers, network equipment and power systems in all levels of central computer rooms. A large number of a wide variety of equipment has brought great challenges to the internal operation and maintenance management of public security. The traditional way of diagnosing and analyzing the equipment through ICMP/SNMP, Trap/Syslog and other tools can no longer meet the actual requirements. Because of the particularity of the internal operation and maintenance management of the public security, the current way through ELK and other structures can not meet the needs either. In order to find a reasonable solution, we turn our attention to the open source architecture and build a set of real-time operation and maintenance management platform suitable for the public security industry.

Overall architecture of real-time operation and maintenance platform

Data acquisition layer: Logstash+Flume is responsible for collecting and filtering Snmp Trap and Syslog log information from various front and rear hardware devices and system and business logs generated by the application server itself in different scenarios.

Data transport layer: a distributed message queue Kafka cluster with high throughput is adopted to ensure the reliable transmission of aggregated logs and messages.

Data processing layer: real-time Pull Kafka data by Spark, data flow processing and logical analysis through Spark Streaming and RDD operations

Data storage layer: real-time data is stored in MySQL for real-time business application and display; all data is stored in ES and HBase for subsequent retrieval and analysis

Business service layer: based on the storage layer, subsequent overall business applications include APM, network monitoring, topology, alarm, work order, CMDB and so on.

The main open source frameworks involved in the overall system are as follows:

In addition, the overall environment is based on JDK 8 and Scala 2.10.4. There are many kinds of equipment in the public security system. Next, we will take the Syslog log of the switch as an example to introduce in detail the overall process of log processing and analysis.

Fig. 1 overall structure of public security real-time operation and maintenance platform

Flume+Logstash log collection

Flume is a distributed, reliable and highly available mass log collection system contributed by Cloudera, which supports customizing all kinds of Source (data sources) for data collection, while providing simple data processing and the ability to write to Sink (data receiver) through cache.

In Flume, the configurations of Source, Channel and Sink are as follows:

# is the source of the Flume Agent, Channel and sink naming a1.sources = R1 a1.sinks = k1a1.channels = C1 # configuration Syslog source a1.sources.r1.type = syslogtcpa1.sources.r1.port = 5140 a1.sources.r1.host = localhost # KafkaSink related configuration a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = syslog-kafkaa1.sinks.k1.brokerList = gtcluster-slave01:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 A1.sinks.k1.channel = C1 # Channel is based on memory as cache a1.channels.c1.type = memorya1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind Source and Sink to Channel a1.sources.r1.channels = c1a1.sinks.k1.channel = C1

This configuration uses syslog source to configure localhost tcp port 5140 to receive Syslog information sent by network devices, event is cached in memory, and logs are sent through KafkaSink to the topic named "syslog-kafka" in the kafka cluster.

Logstash, from Elastic, is designed to collect, analyze and transmit all kinds of logs, events, and unstructured data. It has three main functions: event input (Input), event filter (Filter), and event output (Output), which are set in a configuration file with the suffix .conf. In this case, the Syslog configuration is as follows:

# syslog.conf input {Syslog {port = > 514}} filter {} output {kafka {bootstrap_servers = > "192.168.65 snappy 9092" topic_id = > "syslog-kafka" compression_type = > "snappy" codec = > plain {format = > "% {host}% {@ timestamp}% {message}"}

The Input (input) plug-in is used to specify various data sources. In this example, Logstash receives Syslog information through port udp 514.

Although the Filter (filter) plug-in does not need to be configured in this example, it is very powerful and can perform complex logical processing, including regular expression processing, codec, KLV segmentation and various numerical, time and other data processing, which can be set according to the actual scenario.

The Output (output) plug-in is used to send processed event data to a specified destination, specifying the location of the Kafka, topic, and compression type. In the final Codec coding plug-in, the IP address (host) of the source host and the timestamp processed by Logstash (@ timestamp) are specified as prefixes and the original event message (message) is integrated to facilitate the identification of the source of Syslog information during event transmission. An example of a single original Syslog information flow is as follows:

12164: Oct 9 18 Interface GigabitEthernet0/16 04 changed state to down 10.735:% LINK-3-UPDOWN: Interface GigabitEthernet0/16

The flow of information processed by the Logstash Output plug-in becomes:

19.1.1.12 2016-10-13T10:04:54.520Z 12164: Oct 9 1815 04purl 10.735:% LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

The red field is the host and timestamp information implanted by the codec coding plug-in. The processed Syslog information is sent to the Kafka cluster for message caching.

Kafka log buffering

Kafka is a distributed message queue with high throughput and a subscription / publishing system. Each node in the Kafka cluster has an instance called broker, which is responsible for caching data. Kafka has two types of clients, Producer (message producer) and Consumer (message consumer). Messages from different business systems in Kafka can be distinguished by topic. Each message is partitioned to share the message read and write load, and each partition can have multiple copies to prevent data loss. When consumers specifically consume a topic message, they specify the starting offset. Kafka ensures the real-time, efficient, reliable and fault-tolerant message transmission through Zero-Copy, Exactly Once and other technical semantics.

The configuration file server.properties of a broker in the Kafka cluster is partially configured as follows:

# Server Basics # # set a separate number for each broker as the connection configuration zookeeper.connect=gtcluster-slave02:2181,gtcluster-slave03:2181 for the id broker.id=1 # Socket Server Settings # # socket listening port port=9092 # Zookeeper # # Zookeeper Gtcluster-slave04:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=3000

You need to specify the id of different broker in the cluster. The id of this broker is 1, listen on port 9092 by default, then configure Zookeeper (zk) cluster, and then start broker.

Topic of the Kafka cluster named syslog-kafka:

Bin/kafka-topics.sh--create-zookeeper gtcluster-slave02:2181,gtcluster-slave03:2181,gtcluster-slave04:2181-replication-factor 3-partitions 3-topic syslog-kafka

Information such as topic and partition of the Kafka cluster can also be observed by logging in to zk. Then view all the switch log information received by Kafka with the following command:

Bin/kafka-console-consumer.sh--zookeeper gtcluster-slave02:2181--from-beginning-topic Syslog-kafka

Some examples of logs are as follows:

10.1.1.10 2016-10-18T05:23:04.015Z 5585: Oct 18 13:22:45:% LINK-3-UPDOWN: Interface FastEthernet0/9, changed state to down 19.1.1.113 2016-10-18T05:24:04.425Z 10857: Oct 18 1315 25 LINK-3-UPDOWN 23.019 cmt:% LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet1/0/3 Changed state to down 19.1.1.113 2016-10-18T05:24:08.312Z 10860: Oct 18 1313 25 cmt 27.935 cmt:% LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet1/0/3, changed state to upSpark log processing logic

Spark is a fast, general-purpose engine for large-scale data processing, which performs extremely well in terms of speed, efficiency and versatility.

In the Spark main program, all the Syslog information in the topic named "syslog-kafka" in Kafka Source is parsed by the regular expression of Scala, and then the parsed effective fields are encapsulated into the result object, and finally written into MySQL in near real-time through MyBatis for real-time visual display by front-end applications. In addition, full data is stored in HBase and ES to provide support for subsequent massive log retrieval and analysis and other more advanced applications. The sample code of the main program is as follows:

Object SwSyslogProcessor {def main (args: Array [String]): Unit = {/ / initialize SparkContext Batch interval 5 seconds val sparkConf: SparkConf = new SparkConf (). SetAppName ("SwSyslogProcessorApp") .set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ssc = new StreamingContext (sparkConf, Seconds (5)) / / define topic val topic = Set ("syslog-kafka") / / define the broker list address val brokers = "192.168.65 kafka 909192.168.669092192.168.67" val kafkaParams = Map [String String] ("metadata.broker.list"-> brokers, "serializer.class"-> "kafka.serializer.StringDecoder") / / through topic and brokers Create the data stream obtained from kafka val swSyslogDstream = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, topic) val totalcounts = ssc.sparkContext.accumulator (0L) "Total count") val lines = swSyslogDstream.map (x = > x.room2) / / Map a row of data to a SwSyslog object lines.filter (x = >! x.isEmpty & & x.contains ("% LIN") & & x.contains ("Ethernet") .map (x = > {SwSyslogService.encapsulateSwSyslog (x) / / encapsulates and returns SwSyslog }. ForeachRDD ((s: RDD [SwSyslog]) Time: Time) = > {/ / traversing RDD if (! s.isEmpty ()) {/ / traversing partition records in RDD s.foreachPartition {records = > {if (! records.isEmpty) records.toSet.foreach {r: SwSyslog = > / Statistics the total number of records currently processed totalcounts.add (1L) / / saves SwSyslog information to MySQL SwSyslogService .saveSwSyslog (r)}) / / Startup program ssc.start () / / blocking waiting for ssc.awaitTermination ()}

The overall processing analysis is mainly divided into four steps:

Initialize SparkContext and specify parameters for Application

Create a DirectStream based on Kafka topic "syslog-kafka"

Map each row of data obtained to a Syslog object, call Service to encapsulate the object and return

Traverse RDD, save or update Syslog information to MySQL when the record is not empty.

Some basic properties of Syslog POJO are as follows:

@ Table (name = "sw_syslog") public class SwSyslog {/ * log ID * / @ Id @ GeneratedValue (strategy = GenerationType.IDENTITY) private Long id; / * device IP * / @ Column (name = "dev_ip") private String devIp; / * server time * / @ Column (name = "server_time") private String serverTime / * * Information Serial number * / @ Column (name = "syslog_num") private Long syslogNum;. }

The basic attributes in the SwSyslog entity correspond to the interface information in Syslog, the annotated name corresponds to the table SW _ syslog in MySQL and each field, and MyBatis completes the ORM (object-relational mapping) of member attributes and database structure.

SwSyslogService in the program has two main functions:

Public static SwSyslog encapsulateSwSyslog (String syslogInfo) {SwSyslog swsyslog = new SwSyslog (); swsyslog.setDevIp (SwSyslogExtractorUtil.extractDevIp (syslogInfo)); swsyslog.setServerTime (SwSyslogExtractorUtil.extractServerTime (syslogInfo)); swsyslog.setSyslogNum (SwSyslogExtractorUtil.extractSyslogNum (syslogInfo)); swsyslog.setDevTime (SwSyslogExtractorUtil.extractDevTime (syslogInfo)); swsyslog.setSyslogType (SwSyslogExtractorUtil.extractSyslogType (syslogInfo)); swsyslog.setInfoType (SwSyslogExtractorUtil.extractInfoType (syslogInfo)); swsyslog.setDevInterface (SwSyslogExtractorUtil. ExtractDevInterface (syslogInfo)); swsyslog.setInterfaceState (SwSyslogExtractorUtil. ExtractInterfaceState (syslogInfo)); return swsyslog } public static void saveSwSyslog (SwSyslog swSyslog) {LOGGER.debug ("start saving or updating SwSyslog", swSyslog); / / query all Syslog List list = swSyslogMapper.queryAllByIp (swSyslog.getDevIp ()) according to ip / / if the list is not empty, the SwSyslog if of the corresponding IP (list! = null & &! list.isEmpty ()) {for (SwSyslog sys: list) {/ / if the interface of the IP is the same, update the information if (sys.getDevInterface (). Equals (swSyslog.getDevInterface () {LOGGER.debug ("start updating SwSyslog with records with the same IP and same interface") Sys.setServerTime (swSyslog.getServerTime ()); sys.setSyslogNum (swSyslog.getSyslogNum ()); sys.setDevTime (swSyslog.getDevTime ()); sys.setSyslogType (swSyslog.getSyslogType ()); sys.setInfoType (swSyslog.getInfoType ()); sys.setInterfaceState (swSyslog.getInterfaceState ()); sys.setUpdated (new Date ()); swSyslogMapper.update (sys) } else {/ / if the interface is different, save LOGGER.debug directly ("the same IP has no corresponding interface, save SwSyslog"); swSyslog.setCreated (new Date ()); swSyslog.setUpdated (swSyslog.getCreated ()); swSyslogMapper.insert (swSyslog) } else {/ / there is no corresponding IP record, so save the information LOGGER.debug directly ("No same IP record, save SwSyslog directly"); swSyslog.setCreated (new Date ()); swSyslog.setUpdated (swSyslog.getCreated ()); swSyslogMapper.insert (swSyslog);}}

EncapsulateSwSyslog () parses each line of Spark processed Syslog into different fields through the regular expression of Scala, then encapsulates and returns the Syslog object; every Syslog object generated by traversing the RDD partition contains ip and interface information, based on which saveSwSyslog () determines whether to insert or update the Syslog information to the database. In addition, the encapsulated Syslog object interoperates with MySQL through the ORM tool MyBatis.

Each line of Syslog information obtained is as described earlier:

19.1.1.12 2016-10-13T10:04:54.520Z 12164: Oct 9 1815 04purl 10.735:% LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

This information needs to be parsed into fields such as device ip, server time, information sequence number, device time, Syslog type, attribute, device interface, interface status, etc. The Scala regular parsing logic is as follows:

/ * * extraction server time * sample: 2016-10-09T10:04:54.517Z * @ param line * @ return * / def extractServerTime (line: String): String = {val regex1 = "20\\ d {2} -\ d {2} -\ r val regex2 ="\\ d {2}:\\ d {2}:\\ d {2}.? (\\ d {3})? ".r val matchedDate = regex1.findFirstIn (line) val matchedTime = regex2.findFirstIn (line) val result1 = matchedDate match {case Some (date) = > date case None = >"} val result2 = matchedTime match {case Some (time) = > time case None = > ""} result1 + "" + result2} / * * extraction device time * example: Sep 29 09:33:06 * Oct 9 18 : 04def extractDevTime 09.733 * @ param line * @ return * / def extractDevTime (line: String): String = "[a-zA-Z] {3}\\ s +\\ s\\ d {2}:\\ d {2} (.\\ d {3}) | ()" .r val matchedDevTime = regex.findFirstIn (line) val result = matchedDevTime match {case Some (devTime) = > devTime case None = > "} result}

Through regular filtering, Syslog encapsulation and MyBatis persistence layer mapping, the Syslog interface status information is finally resolved as follows:

Finally, business applications such as APM, network monitoring or alarm can be visually displayed based on MySQL.

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report