Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the overall process of the Flink simple project?

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces "how is the overall process of Flink simple project". In daily operation, I believe many people have doubts about the overall process of Flink simple project. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the doubts of "how is the overall process of Flink simple project?" Next, please follow the editor to study!

Project Overview

CDN popular distribution network, log data analysis, log data content includes

AliyunCNE [17/Jul/2018:17:07:50 + 0800] 223.104.18.110v2.go2yd.com17168

The type of data accessed is log.

Offline: Flume== > HDFS

Real-time: Kafka== > streaming engine = > ES== > Kibana

Data query

API name function description summary statistics query

Peak bandwidth

Total flow

Total number of requests

Project function

Count the traffic generated by each domain name access within one minute, and Flink receives Kafka data for processing

Count the traffic generated by each user within one minute. There is a corresponding relationship between domain name and user. Flink receives Kafka data for processing + Flink reads domain name and user configuration data (in MySQL) for processing.

Project architecture

Mock data

@ Component@Slf4jpublic class KafkaProducer {private static final String TOPIC = "pktest"; @ Autowired private KafkaTemplate kafkaTemplate; @ SuppressWarnings ("unchecked") public void produce (String message) {try {ListenableFuture future = kafkaTemplate.send (TOPIC, message); SuccessCallback successCallback = new SuccessCallback () {@ Override public void onSuccess (@ Nullable SendResult result) {log.info ("message sent successfully");}} FailureCallback failureCallback = new FailureCallback () {@ Override public void onFailure (Throwable ex) {log.error ("send message failed", ex); produce (message);}}; future.addCallback (successCallback,failureCallback);} catch (Exception e) {log.error ("send message exception", e) } @ Scheduled (fixedRate = 1000 * 2) public void send () {StringBuilder builder = new StringBuilder () Builder.append ("aliyun"). Append ("\ t") .append ("CN"). Append ("\ t") .append (getLevels ()) .append ("\ t") .append (new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .format (new Date () .append ("\ t") ") .append (getIps ()) .append ("\ t ") .append (getDomains ()) .append ("\ t ") .append (getTraffic ()) .append ("\ t ") Log.info (builder.toString ()); produce (builder.toString ());} / * production Level data * @ return * / private String getLevels () {List levels = Arrays.asList ("M", "E"); return levels.get (new Random (). NextInt (levels.size () } / * production IP data * @ return * / private String getIps () {List ips = Arrays.asList ("222.104.18.111", "223.101.75.185", "27.17.127.133", "183.225.121.16", "112.1.65.32" "175.147.222.190", "183.227.43.68", "59.88.168.87", "117.28.44.29", "117.59.34.167") Return ips.get (new Random () .nextInt (ips.size () } / * production domain name data * @ return * / private String getDomains () {List domains = Arrays.asList ("v1.go2yd.com", "v2.go2vd.com", "v3.go2yd.com", "v4.go2yd.com", "vmi.go2yd.com") Return domains.get (new Random (). NextInt (domains.size ();} / * * production flow data * @ return * / private int getTraffic () {return new Random () .nextInt (10000);}}

For other configurations of Springboot Kafka, please refer to Springboot2 Integration Kafka

Open the Kafka server consumer and you can see

It indicates that the Kafka data was sent successfully.

Flink consumers

Public class LogAnalysis {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test"); DataStreamSource data = env.addSource (topic, new SimpleStringSchema (), properties)) Data.print () .setParallelism (1); env.execute ("LogAnalysis");}}

Messages received

Aliyun CN M 2021-01-31 23:43:07 222.104.18.111 v1.go2yd.com 4603 aliyun CN E 2021-01-31 23:43:09 222.104.18.111 v4.go2yd.com 6313 aliyun CN E 2021-01-31 23:43:11 222.104.18.111 v2.go2vd.com 4233 aliyun CN E 2021-01- 31 23:43:13 222.104.18.111 v4.go2yd.com 2691 aliyun CN E 2021-01-31 23:43:15 183.225.121.16 v1.go2yd.com 212 aliyun CN E 2021-01-31 23:43:17 183.225.121.16 v4.go2yd.com 7744 aliyun CN M 2021-01-31 23:43:19 175.147.222.190 vmi.go2yd.com 1318

Data cleaning

Data cleaning is to process the original input data according to our business rules, so as to meet our business needs.

@ Slf4jpublic class LogAnalysis {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test"); DataStreamSource data = env.addSource (topic, new SimpleStringSchema (), properties)) Data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t"); String level = splits [2]; String timeStr = splits [3]; Long time = 0L Try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime ();} catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5]; String traffic = splits [6] Return new Tuple4 (level,time,domain,traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data whose Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map) (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1) Value.getField (2), Long.parseLong (value.getField (3)) ) .print () .setParallelism (1); env.execute ("LogAnalysis");}}

Running result

(1612130315000 retro v1.go2yd.comrec. 533) (1612130319000 reactant v4.go2yd.commeme 8657) (1612130327000 memo v1.go2yd.com.com.9566) (1612130329000memoto v2.go2yd.com.com.1460) (161213033000spectv3.go2yd.com.com.6955) (161213033000v1.go2yd.com.com.9612) (1612130341000vmi.go2yd.com.com.com612) (1612130341000vmi.go2yd.com.694)

Scala code

Import java.text.SimpleDateFormatimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.slf4j.LoggerFactoryimport org.apache.flink.api.scala._object LogAnalysis {val log = LoggerFactory.getLogger (LogAnalysis.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment val topic = "pktest" val properties = new Properties properties.setProperty ("bootstrap.servers" "Public network ip:9092") properties.setProperty ("group.id", "test") val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties)) data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.fug2 env.execute x.coach 4.toLong)) .print () .print (1) env.execute ("LogAnalysis")}}

Data analysis

What we are going to analyze now is the domain name traffic within one minute.

@ Slf4jpublic class LogAnalysis {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test") DataStreamSource data = env.addSource (new FlinkKafkaConsumer (topic, new SimpleStringSchema (), properties); data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t"); String level = splits [2]; String timeStr = splits [3]; Long time = 0L Try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime ();} catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5]; String traffic = splits [6] Return new Tuple4 (level,time,domain,traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data whose Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map) (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1) Value.getField (2), Long.parseLong (value.getField (3)) }) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks () {private Long maxOutOfOrderness = 10000L; private Long currentMaxTimestamp = 0L; @ Nullable @ Override public Watermark getCurrentWatermark () {return new Watermark (currentMaxTimestamp-maxOutOfOrderness)) } @ Override public long extractTimestamp (Tuple3 element, long previousElementTimestamp) {Long timestamp = element.getField (0); currentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp); return timestamp }}) .keyby (x-> (String) x.getField (1)) .timeWindow (Time.minutes (1)) / / output format: one minute interval, domain name The total traffic of the domain name within one minute. Apply (new WindowFunction () {@ Override public void apply (String s, TimeWindow window, Iterable input, Collector out) throws Exception {List list = (List) input Long sum = list.stream (). Map (x-> (Long) x.getField (2)). Reduce ((x, y)-> x + y). Get (); SimpleDateFormat format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss"); out.collect (new Tuple3 (format.format (window.getStart ()) + "-" + format.format (window.getEnd ()), new Tuple3 sum)) ) .print () .setParallelism (1); env.execute ("LogAnalysis");}}

Running result

(2021-02-01 07:14:00-2021-02-01 07 purl 1500grader vmi.go2yd.commem6307) (2021-02-01 07:15:00-2021-02-01 07V 1600recoverv4.go2yd.commem15474) (2021-02-01 07:15:00-2021-02-01 071167purl V2.go2vd.compjj9210) (2021-02-01 07:15:00-2021-02-01 0716purl v3.go2yd.com ) (2021-02-01 07:15:00-2021-02-01 07 purl 1600Magna v1.go2yd.commem12787) (2021-02-01 07:15:00-2021-02-01 07 purl 1600Magne vmi.go2yd.commem14250) (2021-02-01 07:16:00-2021-02-01 07 purl 1700Jet v4.go2yd.com.33298) (2021-02-01 07:16:00-2021-02-01 07JV 1700JV v1.go2yd.comjue 37140)

Scala code

Import java.text.SimpleDateFormatimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.slf4j.LoggerFactoryimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.watermark.Watermarkimport org .apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorobject LogAnalysis {val log = LoggerFactory.getLogger (LogAnalysis.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) val topic = "pktest" val properties = new Properties properties.setProperty ("bootstrap.servers") "Public network ip:9092") properties.setProperty ("group.id", "test") val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties)) data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.TimestampsAndWatermarks [(Long, String, new AssignerWithPeriodicWatermarks)). Long)] {var maxOutOfOrderness: Long = 10000l var currentMaxTimestamp: Long = _ override def getCurrentWatermark: Watermark = {new Watermark (currentMaxTimestamp-maxOutOfOrderness)} override def extractTimestamp (element: (Long, String, Long), previousElementTimestamp: Long): Long = {val timestamp = element._1currentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp) timestamp}) .keyBy (_ .2) .timeWindow (Time.minutes (1)) .apply (new WindowFunction [(Long) String,Long), (String,String,Long), String,TimeWindow] {override def apply (key: String, window: TimeWindow, input: Iterable [(Long, String,Long)], out: Collector [(String,String) Long)]: Unit = {val list = input.toListval sum = list.map (_. _ 3). Sumval format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") out.collect ((format.format (window.getStart) + "-" + format.format (window.getEnd), key,sum)}) .print () .setParallelism (1) env.execute ("LogAnalysis")}}

Sink to Elasticsearch

Install ES

The version we use here is 6.2.4.

Wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.4.tar.gz

After decompression, enter the config directory, edit elasticsearch.yml, and modify

Network.host: 0.0.0.0

Add a non-root user

Useradd es

Change all files in the ES directory to the es owner

Chown-R es:es elasticsearch-6.2.4

Modify / etc/security/limits.conf to change the bottom content to

Es soft nofile 65536es hard nofile 65536

Modify / etc/sysctl.conf, add

Vm.max_map_count=655360

Execute a command

Sysctl-p

Go to the bin folder of es and switch the user es

Su es

Execute under es user

. / elasticsearch-d

At this point, you can see the ES information in the Web interface (public network ip:9200)

Add ES Sink to Flink, add dependencies first

Org.apache.flink flink-connector-elasticsearch7_2.11 ${flink.version} @ Slf4jpublic class LogAnalysis {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test") List httpHosts = new ArrayList (); httpHosts.add (new HttpHost ("public network ip", 9200, "http"); ElasticsearchSink.Builder builder = new ElasticsearchSink.Builder (httpHosts, new ElasticsearchSinkFunction () {@ Override public void process (Tuple3 value, RuntimeContext runtimeContext, RequestIndexer indexer) {Map json = new HashMap (); json.put ("time", value.getField (0)) Json.put ("domain", value.getField (1)); json.put ("traffic", value.getField (2)); String id = value.getField (0) + "-" + value.getField (1) Indexer.add (Requests.indexRequest () .index ("cdn") .type ("traffic") .id (id) .source (json);}}) / / set the buffer size of bulk write data builder.setBulkFlushMaxActions (1); DataStreamSource data = env.addSource (new FlinkKafkaConsumer (topic, new SimpleStringSchema (), properties)); data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t")) String level = splits [2]; String timeStr = splits [3]; Long time = 0L; try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr) .getTime () } catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5]; String traffic = splits [6]; return new Tuple4 (level,time,domain,traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data whose Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map) (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1) Value.getField (2), Long.parseLong (value.getField (3)) }) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks () {private Long maxOutOfOrderness = 10000L; private Long currentMaxTimestamp = 0L; @ Nullable @ Override public Watermark getCurrentWatermark () {return new Watermark (currentMaxTimestamp-maxOutOfOrderness)) } @ Override public long extractTimestamp (Tuple3 element, long previousElementTimestamp) {Long timestamp = element.getField (0); currentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp); return timestamp }}) .keyby (x-> (String) x.getField (1)) .timeWindow (Time.minutes (1)) / / output format: one minute interval, domain name The total traffic of the domain name within one minute. Apply (new WindowFunction () {@ Override public void apply (String s, TimeWindow window, Iterable input, Collector out) throws Exception {List list = (List) input Long sum = list.stream (). Map (x-> (Long) x.getField (2)). Reduce ((x, y)-> x + y). Get (); SimpleDateFormat format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss"); out.collect (new Tuple3 (format.format (window.getStart ()) + "-" + format.format (window.getEnd ()), new Tuple3 sum)) ) .addSink (builder.build ()); env.execute ("LogAnalysis");}}

After execution, you can query the data in ES.

Http:// public network ip:9200/cdn/traffic/_search

Scala code

Import java.text.SimpleDateFormatimport java.utilimport java.util.Propertiesimport org.apache.flink.api.common.functions.RuntimeContextimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.slf4j.LoggerFactoryimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.scala.function .WindowFunctionimport org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.streaming.connectors.elasticsearch. {ElasticsearchSinkFunction RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSinkimport org.apache.flink.util.Collectorimport org.apache.http.HttpHostimport org.elasticsearch.client.Requestsobject LogAnalysis {val log = LoggerFactory.getLogger (LogAnalysis.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) val topic = "pktest" val properties = new Properties properties.setProperty ("bootstrap.servers", "public network ip:9092") properties.setProperty ("group.id") "test") val httpHosts = new util.ArrayList [HttpHost] httpHosts.add (new HttpHost ("public network ip", 9200, "http")) val builder = new ElasticsearchSink.Builder [(String,String,Long)] (httpHosts,new ElasticsearchSinkFunction [(String,String,Long)] {override def process (t: (String,String,Long), runtimeContext: RuntimeContext, indexer: RequestIndexer): Unit = {val json = new util.HashMap [String,Any] json.put ("time") ) json.put ("domain", t.room2) json.put ("traffic") ) val id = t.room1 + "-" + t.room2 indexer.add (Requests.indexRequest () .index ("cdn") .`type` ("traffic") .id (id) .source (json)}}) builder.setBulkFlushMaxActions (1) val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties)) data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.TimestampsAndWatermarks [(Long, String, new AssignerWithPeriodicWatermarks)). Long)] {var maxOutOfOrderness: Long = 10000l var currentMaxTimestamp: Long = _ override def getCurrentWatermark: Watermark = {new Watermark (currentMaxTimestamp-maxOutOfOrderness)} override def extractTimestamp (element: (Long, String, Long), previousElementTimestamp: Long): Long = {val timestamp = element._1currentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp) timestamp}) .keyBy (_ .2) .timeWindow (Time.minutes (1)) .apply (new WindowFunction [(Long) String,Long), (String,String,Long), String,TimeWindow] {override def apply (key: String, window: TimeWindow, input: Iterable [(Long, String,Long)], out: Collector [(String,String) Long)]: Unit = {val list = input.toListval sum = list.map (_. _ 3). Sumval format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") out.collect ((format.format (window.getStart) + "-" + format.format (window.getEnd), key,sum)}) .addSink (builder.build) env.execute ("LogAnalysis")}}

Kibana graphic display

Install kibana

Wget https://artifacts.elastic.co/downloads/kibana/kibana-6.2.4-linux-x86_64.tar.gz

To keep the version of kibana the same as ES, unzip it and enter the config directory to edit kibana.yml.

Server.host: "host2" elasticsearch.url: "http://host2:9200"

The contents will vary from version to version. After saving, enter the bin directory.

Switch es users, execute

. / kibana &

Visit the Web page, public network ip:5601

Here I made a table, a bar chart.

The second requirement is to count the traffic generated by each user in one minute.

Add a table user_domain_config to the MySQL database with the following fields

The contents of the table are as follows

Data cleaning

/ * Custom MySQL data source * / public class MySQLSource extends RichParallelSourceFunction {private Connection connection; private PreparedStatement pstmt; private Connection getConnection () {Connection conn = null; try {Class.forName ("com.mysql.cj.jdbc.Driver"); String url = "jdbc:mysql:// public network ip:3306/flink"; conn = DriverManager.getConnection (url, "root", "*") } catch (Exception e) {e.printStackTrace ();} return conn;} @ Override public void open (Configuration parameters) throws Exception {super.open (parameters); connection = getConnection (); String sql = "select user_id,domain from user_domain_config"; pstmt = connection.prepareStatement (sql) @ Override @ SuppressWarnings ("unchecked") public void run (SourceContext ctx) throws Exception {ResultSet rs = pstmt.executeQuery (); while (rs.next ()) {Tuple2 tuple2 = new Tuple2 (rs.getString ("domain"), rs.getString ("user_id")); ctx.collect (tuple2);} pstmt.close () } @ Override public void cancel () {} @ Override public void close () throws Exception {super.close (); if (pstmt! = null) {pstmt.close ();} if (connection! = null) {connection.close ();} @ Slf4jpublic class LogAnalysisWithMySQL {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); String topic = "pktest" Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test"); DataStreamSource data = env.addSource (topic, new SimpleStringSchema (), properties)) SingleOutputStreamOperator logData = data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t"); String level = splits [2]; String timeStr = splits [3]; Long time = 0L Try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime ();} catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5]; String traffic = splits [6] Return new Tuple4 (level, time, domain, traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data where Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1)) Value.getField (2), Long.parseLong (value.getField (3)) }}); DataStreamSource mysqlData = env.addSource (new MySQLSource ()); / / dual stream convergence logData.connect (mysqlData) .flatMap (new CoFlatMapFunction () {private Map userDomainMap = new HashMap (); @ Override public void flatMap1 (Tuple3 value, Collector out) throws Exception {String domain = value.getField (1) String userId = userDomainMap.getOrDefault (domain, "); out.collect (new Tuple4 (value.getField (0), value.getField (1), value.getField (2), userId);} @ Override public void flatMap2 (Tuple2 value, Collector out) throws Exception {userDomainMap.put (value.getField (0), value.getField (1);}}) .print () .setParallelism (1) Env.execute ("LogAnalysisWithMySQL");}}

Running result

(1612239325000 retro vmi.go2yd.commeme 7115magic80000001) (1612239633000relegate v4.go2yd.com.com.841212recover80000001) (1612239635000mv3.go2yd.comre3527pr 80000000) (1612239639000mv1.go2yd.comre7385flint 80000000) (16122396433000mvmi.go2yd.comre86500580000001) (1612239645000mavmi.go2yd.comne2642dire80000001) (1612239647000fear vmi.go2yd.com.com.1612239647000vmi.go2yd.com.com.88322,80000000)

Scala code

Import java.sql. {Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.source. {RichParallelSourceFunction, SourceFunction} class MySQLSource extends RichParallelSourceFunction [(String,String)] {var connection: Connection = null var pstmt: PreparedStatement = null def getConnection:Connection = {var conn: Connection = null Class.forName ("com.mysql.cj.jdbc.Driver") val url = "jdbc:mysql:// public network ip:3306/flink" conn = DriverManager.getConnection (url, "root") "*") conn} override def open (parameters: Configuration): Unit = {connection = getConnectionval sql = "select user_id,domain from user_domain_config" pstmt = connection.prepareStatement (sql)} override def cancel () = {} override def run (ctx: SourceFunction.SourceContext [(String, String)]) = {val rs = pstmt.executeQuery () while (rs.next) {val tuple2 = (rs.getString ("domain") Rs.getString ("user_id") ctx.collect (tuple2)} pstmt.close ()} override def close (): Unit = {if (pstmt! = null) {pstmt.close ()} if (connection! = null) {connection.close ()}} import java.text.SimpleDateFormatimport java.util.Propertiesimport com.guanjian.flink.scala.until.MySQLSourceimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org. Apache.flink.api.scala._import org.apache.flink.streaming.api.functions.co.CoFlatMapFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.util.Collectorimport org.slf4j.LoggerFactoryimport scala.collection.mutableobject LogAnalysisWithMySQL {val log = LoggerFactory.getLogger (LogAnalysisWithMySQL.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment val topic = "pktest" val properties = new Properties properties.setProperty ("bootstrap.servers" "Public network ip:9092") properties.setProperty ("group.id", "test") val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties) val logData = data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)}} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.fug2mx.fugmx.Zong4.toLong) val mysqlData = env.addSource (new MySQLSource) logData.connect (mysqlData) .flatMap (new CoFlatMapFunction [(Long,String)) Long), (String,String), (Long,String,Long,String)] {var userDomainMap = mutable.HashMap [String,String] () override def flatMap1 (value: (Long,String,Long), out: Collector [(Long,String,Long,String)]) = {val domain = value._2val userId = userDomainMap.getOrElse (domain, ") out.collect ((value._1,value._2,value._3) UserId))} override def flatMap2 (value: (String, String), out: Collector [(Long, String, Long, String)]) = {userDomainMap + = value._1-> value._2}}. Print (). SetParallelism (1) env.execute ("LogAnalysisWithMySQL")}}

Data analysis

@ Slf4jpublic class LogAnalysisWithMySQL {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test") DataStreamSource data = env.addSource (new FlinkKafkaConsumer (topic, new SimpleStringSchema (), properties); SingleOutputStreamOperator logData = data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t"); String level = splits [2]; String timeStr = splits [3] Long time = 0L; try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime ();} catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5] String traffic = splits [6]; return new Tuple4 (level, time, domain, traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data where Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1)) Value.getField (2), Long.parseLong (value.getField (3)) }}); DataStreamSource mysqlData = env.addSource (new MySQLSource ()); / / dual stream convergence logData.connect (mysqlData) .flatMap (new CoFlatMapFunction () {private Map userDomainMap = new HashMap (); @ Override public void flatMap1 (Tuple3 value, Collector out) throws Exception {String domain = value.getField (1) String userId = userDomainMap.getOrDefault (domain, "); out.collect (new Tuple4 (value.getField (0), value.getField (1), value.getField (2), userId);} @ Override public void flatMap2 (Tuple2 value, Collector out) throws Exception {userDomainMap.put (value.getField (0), value.getField (1)) }) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks () {private Long maxOutOfOrderness = 10000L; private Long currentMaxTimestamp = 0L; @ Nullable @ Override public Watermark getCurrentWatermark () {return new Watermark (currentMaxTimestamp-maxOutOfOrderness);} @ Override public long extractTimestamp (Tuple4 element, long previousElementTimestamp) {Long timestamp = element.getField (0) CurrentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp); return timestamp }}) .keyby (x-> (String) x.getField (3)) .timeWindow (Time.minutes (1)) / / output format: one minute interval, user The total traffic of the user within one minute. Apply (new WindowFunction () {@ Override public void apply (String s, TimeWindow window, Iterable input, Collector out) throws Exception {List list = (List) input Long sum = list.stream (). Map (x-> (Long) x.getField (2)). Reduce ((x, y)-> x + y). Get (); SimpleDateFormat format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss"); out.collect (format.format (window.getStart ()) + "-" + format.format (window.getEnd ()), s, sum)) ). Print (). SetParallelism (1); env.execute ("LogAnalysisWithMySQL");}}

Running result

(2021-02-02 13:58:00-2021-02-02 13Relay 5900Participe 80000000reie20933) (2021-02-02 13:58:00-2021-02-02 13veluza 5900Partil 80000001Lab 6928) (2021-02 13:59:00-2021-02-02 1400Jr 80000001ghe 38202) (2021-02-02 13:59:00-2021-02-02 14jiv 0000Jr 80000001 39394) (2021-0202 14jig 000000-2021-02202 14jiu001Rd 80000001ref 23070) (2021-02-02 02 14:00:00-2021-02-02 14 purl 01purl 0080000000jol 41701)

Scala code

Import java.text.SimpleDateFormatimport java.util.Propertiesimport com.guanjian.flink.scala.until.MySQLSourceimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.functions.co.CoFlatMapFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink. Streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.util.Collectorimport org.slf4j.LoggerFactoryimport scala.collection.mutableobject LogAnalysisWithMySQL {val log = LoggerFactory.getLogger (LogAnalysisWithMySQL.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) val topic = "pktest" Val properties = new Properties properties.setProperty ("bootstrap.servers" "Public network ip:9092") properties.setProperty ("group.id", "test") val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties) val logData = data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)}} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.fug2mx.fugmx.Zong4.toLong) val mysqlData = env.addSource (new MySQLSource) logData.connect (mysqlData) .flatMap (new CoFlatMapFunction [(Long,String)) Long), (String,String), (Long,String,Long,String)] {var userDomainMap = mutable.HashMap [String,String] () override def flatMap1 (value: (Long,String,Long), out: Collector [(Long,String,Long,String)]) = {val domain = value._2val userId = userDomainMap.getOrElse (domain, ") out.collect ((value._1,value._2,value._3) UserId)} override def flatMap2 (value: (String, String), out: Collector [(Long, String, Long, String)]) = {userDomainMap + = value._1-> value._2}}) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks [(Long, String, Long) String)] {var maxOutOfOrderness: Long = 10000l var currentMaxTimestamp: Long = _ override def getCurrentWatermark: Watermark = {new Watermark (currentMaxTimestamp-maxOutOfOrderness)} override def extractTimestamp (element: (Long, String, Long, String), previousElementTimestamp: Long): Long = {val timestamp = element._1currentMaxTimestamp = Math.max (timestamp CurrentMaxTimestamp) timestamp}}) .keyBy (_ .4) .timewindow (Time.minutes (1)) .apply (new WindowFunction [(Long,String,Long,String), (String,String,Long), String,TimeWindow] {override def apply (key: String, window: TimeWindow, input: Iterable [(Long,String,Long,String)], out: Collector [(String,String) Long)]: Unit = {val list = input.toList val sum = list.map (_. _ 3). Sum val format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") out.collect ((format.format (window.getStart) + "-" + format.format (window.getEnd), key,sum)}}). Print (). SetParallelism (1) env.execute ("LogAnalysisWithMySQL")}}

Sink to ES

@ Slf4jpublic class LogAnalysisWithMySQL {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test"); List httpHosts = new ArrayList () HttpHosts.add (new HttpHost ("public network ip", 9200, "http"); ElasticsearchSink.Builder builder = new ElasticsearchSink.Builder (httpHosts, new ElasticsearchSinkFunction () {@ Override public void process (Tuple3 value, RuntimeContext runtimeContext, RequestIndexer indexer) {Map json = new HashMap (); json.put ("time", value.getField (0)) Json.put ("userId", value.getField (1)); json.put ("traffic", value.getField (2)); String id = value.getField (0) + "-" + value.getField (1) Indexer.add (Requests.indexRequest () .index ("user") .type ("traffic") .id (id) .source (json);}}) / / set the buffer size of bulk write data builder.setBulkFlushMaxActions (1); DataStreamSource data = env.addSource (new FlinkKafkaConsumer (topic, new SimpleStringSchema (), properties)); SingleOutputStreamOperator logData = data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t")) String level = splits [2]; String timeStr = splits [3]; Long time = 0L; try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr) .getTime () } catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5]; String traffic = splits [6]; return new Tuple4 (level, time, domain, traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data where Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1)) Value.getField (2), Long.parseLong (value.getField (3)) }}); DataStreamSource mysqlData = env.addSource (new MySQLSource ()); / / dual stream convergence logData.connect (mysqlData) .flatMap (new CoFlatMapFunction () {private Map userDomainMap = new HashMap (); @ Override public void flatMap1 (Tuple3 value, Collector out) throws Exception {String domain = value.getField (1) String userId = userDomainMap.getOrDefault (domain, "); out.collect (new Tuple4 (value.getField (0), value.getField (1), value.getField (2), userId);} @ Override public void flatMap2 (Tuple2 value, Collector out) throws Exception {userDomainMap.put (value.getField (0), value.getField (1)) }) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks () {private Long maxOutOfOrderness = 10000L; private Long currentMaxTimestamp = 0L; @ Nullable @ Override public Watermark getCurrentWatermark () {return new Watermark (currentMaxTimestamp-maxOutOfOrderness);} @ Override public long extractTimestamp (Tuple4 element, long previousElementTimestamp) {Long timestamp = element.getField (0) CurrentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp); return timestamp }}) .keyby (x-> (String) x.getField (3)) .timeWindow (Time.minutes (1)) / / output format: one minute interval, user The total traffic of the user within one minute. Apply (new WindowFunction () {@ Override public void apply (String s, TimeWindow window, Iterable input, Collector out) throws Exception {List list = (List) input Long sum = list.stream (). Map (x-> (Long) x.getField (2)). Reduce ((x, y)-> x + y). Get (); SimpleDateFormat format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss"); out.collect (format.format (window.getStart ()) + "-" + format.format (window.getEnd ()), s, sum)) ) .addSink (builder.build ()); env.execute ("LogAnalysisWithMySQL");}}

Running result

Access http:// public network ip:9200/user/traffic/_search

Scala code

Port java.text.SimpleDateFormatimport java.utilimport java.util.Propertiesimport com.guanjian.flink.scala.until.MySQLSourceimport org.apache.flink.api.common.functions.RuntimeContextimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.functions.co.CoFlatMapFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink .streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.streaming.connectors.elasticsearch. {ElasticsearchSinkFunction RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSinkimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.util.Collectorimport org.apache.http.HttpHostimport org.elasticsearch.client.Requestsimport org.slf4j.LoggerFactoryimport scala.collection.mutableobject LogAnalysisWithMySQL {val log = LoggerFactory.getLogger (LogAnalysisWithMySQL.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) val topic = "pktest" val properties = new Properties properties.setProperty ("bootstrap.servers") "Public Network ip:9092") properties.setProperty ("group.id", "test") val httpHosts = new util.ArrayList [HttpHost] httpHosts.add (new HttpHost ("Public Network ip", 9200, "http")) val builder = new ElasticsearchSink.Builder [(String,String,Long)] (httpHosts,new ElasticsearchSinkFunction [(String,String,Long)] {override def process (t: (String,String,Long), runtimeContext: RuntimeContext, indexer: RequestIndexer): Unit = {val json = new util.HashMap [String Any] json.put ("time", t. UserId 1) json.put ("userId", t. Room2) json.put ("traffic") ) val id = t.room1 + "-" + t.room2 indexer.add (Requests.indexRequest () .index ("user") .`type` ("traffic") .id (id) .source (json)}}) builder.setBulkFlushMaxActions (1) val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties) val logData = data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)}} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.fug2mx.fugmx.Zong4.toLong) val mysqlData = env.addSource (new MySQLSource) logData.connect (mysqlData) .flatMap (new CoFlatMapFunction [(Long,String)) Long), (String,String), (Long,String,Long,String)] {var userDomainMap = mutable.HashMap [String,String] () override def flatMap1 (value: (Long,String,Long), out: Collector [(Long,String,Long,String)]) = {val domain = value._2val userId = userDomainMap.getOrElse (domain, ") out.collect ((value._1,value._2,value._3) UserId)} override def flatMap2 (value: (String, String), out: Collector [(Long, String, Long, String)]) = {userDomainMap + = value._1-> value._2}}) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks [(Long, String, Long) String)] {var maxOutOfOrderness: Long = 10000l var currentMaxTimestamp: Long = _ override def getCurrentWatermark: Watermark = {new Watermark (currentMaxTimestamp-maxOutOfOrderness)} override def extractTimestamp (element: (Long, String, Long, String), previousElementTimestamp: Long): Long = {val timestamp = element._1currentMaxTimestamp = Math.max (timestamp CurrentMaxTimestamp) timestamp}}) .keyBy (_ .4) .timewindow (Time.minutes (1)) .apply (new WindowFunction [(Long,String,Long,String), (String,String,Long), String,TimeWindow] {override def apply (key: String, window: TimeWindow, input: Iterable [(Long,String,Long,String)], out: Collector [(String,String) Long)]: Unit = {val list = input.toList val sum = list.map (_. _ 3). Sum val format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") out.collect ((format.format (window.getStart) + "-" + format.format (window.getEnd), key,sum)}) .addSink (builder.build) env.execute ("LogAnalysisWithMySQL")}}

Kibana chart display

Here, let's draw a ring diagram.

At this point, the study on "what is the overall process of the Flink simple project" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report