In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Network Security >
Share
Shulou(Shulou.com)05/31 Report--
Editor to share with you how to achieve real-time data processing based on Flink, I believe most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to know it!
The real-time analysis of network traffic based on Flink 1.11 mainly carries on the real-time analysis of TCP/UDP/ICMP protocol data for the original network traffic data based on Pcap, and installs the parsed data into data frame Frame for real-time network traffic analysis.
In order to accomplish the above functions, it is necessary to understand Pcap data parsing, TCP/ UDP layer protocol parsing, Flink serialization and deserialization, Flink custom functions and Stream sql-based Flink real-time data analysis.
1. Pcap data parsing
To parse network traffic data based on Pcap format, you must understand the Pcap file format definition:
As shown above, the standard Pcap data consists of a Pcap file header, a data frame Frame header, and a data frame Frame.
In the Pcap header: Magic: 0x1A2B3C 4D, used to indicate the beginning of Pcap data; Major: used to indicate the major version number of Pcap data; Minor: used to indicate the minor version number of Pcap data; ThisZone: local standard time; SigFigs: timestamp precision; SnapLen: maximum storage length; LinkType: link type.
In the data frame header: Timestamp1: timestamp high, accurate to Sittimestamp2: timestamp low, accurate to ms;CapLen: current data frame length
Len: the length of the actual data frame in the network.
Note: the current LinkType link type supports EN10MB, RAW, LOOP and LINUX_SLI; through the above basic structure. In the Pcap file header, we get the most useful information real-time LinkType. Later, we need to parse the data frame Frame according to different LinkType types.
In addition, the encapsulation time of the data frame can be obtained according to the data frame header.
Here according to the Ethernet data frame type as an example: that is, Ipv4, Ipv6, ARP data frame, as shown in the above figure, the offset of the data part of this type of data frame is 14. If it is the protocol type of Ipv4 or Ipv6, you can parse and obtain the Mac address. The next step is to parse the protocol of the TCP/IP layer.
2. TCP/UDP protocol parsing (1), TCP protocol
/ / get TCP header size tcpOrUdpHeaderSize = getTcpHeaderLength (packetData, ipStart + ipHeaderLen); packet.put (Packet.TCP_HEADER_LENGTH, tcpOrUdpHeaderSize); / / Store the sequence and acknowledgement numbers-- M _ hash / get TCP request serial number packet.put (Packet.TCP_SEQ, PcapReaderUtil.convertUnsignedInt (packetData, ipStart + ipHeaderLen + PROTOCOL_HEADER_TCP_SEQ_OFFSET)); / / get TCP acknowledgement serial number packet.put (Packet.TCP_ACK, PcapReaderUtil.convertUnsignedInt (packetData, ipStart + ipHeaderLen + PROTOCOL_HEADER_TCP_ACK_OFFSET)) / / Flags stretch two bytes starting at the TCP header offsetint flags = PcapReaderUtil.convertShort (new byte [] {packetData [ipStart + ipHeaderLen + TCP_HEADER_DATA_OFFSET], packetData [ipStart + ipHeaderLen + TCP_HEADER_DATA_OFFSET + 1]}) & 0x1FF; / / Filter first 7 bits. First 4 are the data offset and the other 3 reserved for future use.packet.put (Packet.TCP_FLAG_NS, (flags & 0x100) = = 0? False: true); packet.put (Packet.TCP_FLAG_CWR, (flags & 0x80) = = 0? False: true); packet.put (Packet.TCP_FLAG_ECE, (flags & 0x40) = = 0? False: true); packet.put (Packet.TCP_FLAG_URG, (flags & 0x20) = = 0? False: true); packet.put (Packet.TCP_FLAG_ACK, (flags & 0x10) = = 0? False: true); packet.put (Packet.TCP_FLAG_PSH, (flags & 0x8) = = 0? False: true); packet.put (Packet.TCP_FLAG_RST, (flags & 0x4) = = 0? False: true); packet.put (Packet.TCP_FLAG_SYN, (flags & 0x2) = = 0? False: true); packet.put (Packet.TCP_FLAG_FIN, (flags & 0x1) = = 0? False: true); 2. UDP protocol
TcpOrUdpHeaderSize = UDP_HEADER_SIZE;if (ipProtocolHeaderVersion = = 4) {int cksum = getUdpChecksum (packetData, ipStart, ipHeaderLen); if (cksum > = 0) packet.put (Packet.UDP_SUM, cksum);} int udpLen = getUdpLength (packetData, ipStart, ipHeaderLen); packet.put (Packet.UDP_LENGTH, udpLen); 3. Serialization and deserialization of Kafka
Based on the distributed message queue Kafka as the intermediate temporary cache of network traffic data, the network flow data is parsed through FlinkKafkaConsumer. Here we define the parser of PcapResover and use the self-defined de-serialization function PcapDataDeSerializer.
Kafka Producer, which is responsible for forwarding collected network traffic. Here, serialization classes inside Kafka are used.
Props.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName ()); props.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName ()); this.consumer = new FlinkKafkaConsumer (this.topic, (KafkaDeserializationSchema) new PcapDataDeSerializer (Object.class), props); public class PcapDataDeSerializer implements KafkaDeserializationSchema {private static final Logger log= LoggerFactory.getLogger (PcapDataDeSerializer.class); private static final long serialVersionUID = 1L political private Class clazz;public PcapDataDeSerializer (Class clazz) {this.clazz=clazz;} List packetList = new ArrayList () @ Overridepublic boolean isEndOfStream (Object nextElement) {return false;} @ Overridepublic Object deserialize (ConsumerRecord record) throws IOException {DataInputView in = new DataInputViewStreamWrapper (new ByteArrayInputStream (record.value (); DataInputStream dataInputStream=new DataInputStream ((InputStream) in); PcapReader reader = new PcapReader (dataInputStream); for (Packet packet: reader) {packetList.add (packet) } log.info ("finish deserialize pcap data," + record.key () + ", topic is" + record.topic () + "," + "partition is" + record.partition () + "," + "offset is" + record.offset ()); return JSON.toJSON (packetList);} @ Overridepublic TypeInformation getProducedType () {return TypeExtractor.getForClass (this.clazz);}}
PcapDataDeSerializer mainly implements deserialze in KafkaDeserializationSchema. In this function, network traffic is parsed, and the parsed network traffic is encapsulated into a Pcaket List object for return.
The creation of KafkaConsumer uses a self-defined de-serialization function, mainly to analyze the network traffic and encapsulate it into a data frame according to the analysis of the Pcap network traffic format in part 1 and 2.
4. Flink custom function
Based on the FlinkKafkaConsumer created above, you can configure Flink Stream DAG,DataStreamSouce-> flatMap- > Map- > Stream
DataStreamSource stream = executionEnvironment.addSource (this.consumer); log.info ("start to build pcap dataStream DAG graph, transform packet into frame stream," + "and default parallelism is 4!"); return stream.flatMap (new FrameFlatMap ()) .map (new FrameMapFunction ()) .setParallelism (4)
What is actually returned here is DataStream, that is, we parse the original network traffic, and finally output the data flow in the way of a data frame for data analysis. Next, in order to do some data analysis based on Stream sql, you can actually register DataStream as a temporary table view, and then use sql-like syntax for real-time analysis.
5. Flink real-time analysis example
Aggregate the count of destination mac addresses in the window of 10s. Of course, there are many ways to express sql here, and the expression ability is strong enough. Different analyses can be carried out according to different business demands.
AggregationSql = "select dstMac,count (1) as c from" + KafkaProperties.FRAME_VIEW_NAME + "group by tumble (PROCTIME (), interval '10' SECOND)" + ", dstMac"
Then sink is done, the DAG build is complete, and the Excute submits the task to the cluster.
Table result = streamTableEnvironment.sqlQuery (sql); DataStream resultData = streamTableEnvironment.toAppendStream (result, Row.class); resultData.print ()
To sum up, the basic process is shown in the following figure:
Mainly through the configuration of FlinkKafkaConsumer, the implementation of PcapDataDesrializer is responsible for deserializing and parsing the Frame in the Pcap packet, which is based on the stream data of Frame, and then processes and encapsulates the stream data into the original derived stream DataStream through custom FlatMapFunction and MapFunction functions.
The above is all the contents of the article "how to achieve real-time data processing based on Flink". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.