In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "how is the overall process of Flink simple project". In daily operation, I believe many people have doubts about the overall process of Flink simple project. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the doubts of "how is the overall process of Flink simple project?" Next, please follow the editor to study!
Project Overview
CDN popular distribution network, log data analysis, log data content includes
AliyunCNE [17/Jul/2018:17:07:50 + 0800] 223.104.18.110v2.go2yd.com17168
The type of data accessed is log.
Offline: Flume== > HDFS
Real-time: Kafka== > streaming engine = > ES== > Kibana
Data query
API name function description summary statistics query
Peak bandwidth
Total flow
Total number of requests
Project function
Count the traffic generated by each domain name access within one minute, and Flink receives Kafka data for processing
Count the traffic generated by each user within one minute. There is a corresponding relationship between domain name and user. Flink receives Kafka data for processing + Flink reads domain name and user configuration data (in MySQL) for processing.
Project architecture
Mock data
@ Component@Slf4jpublic class KafkaProducer {private static final String TOPIC = "pktest"; @ Autowired private KafkaTemplate kafkaTemplate; @ SuppressWarnings ("unchecked") public void produce (String message) {try {ListenableFuture future = kafkaTemplate.send (TOPIC, message); SuccessCallback successCallback = new SuccessCallback () {@ Override public void onSuccess (@ Nullable SendResult result) {log.info ("message sent successfully");}} FailureCallback failureCallback = new FailureCallback () {@ Override public void onFailure (Throwable ex) {log.error ("send message failed", ex); produce (message);}}; future.addCallback (successCallback,failureCallback);} catch (Exception e) {log.error ("send message exception", e) } @ Scheduled (fixedRate = 1000 * 2) public void send () {StringBuilder builder = new StringBuilder () Builder.append ("aliyun"). Append ("\ t") .append ("CN"). Append ("\ t") .append (getLevels ()) .append ("\ t") .append (new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .format (new Date () .append ("\ t") ") .append (getIps ()) .append ("\ t ") .append (getDomains ()) .append ("\ t ") .append (getTraffic ()) .append ("\ t ") Log.info (builder.toString ()); produce (builder.toString ());} / * production Level data * @ return * / private String getLevels () {List levels = Arrays.asList ("M", "E"); return levels.get (new Random (). NextInt (levels.size () } / * production IP data * @ return * / private String getIps () {List ips = Arrays.asList ("222.104.18.111", "223.101.75.185", "27.17.127.133", "183.225.121.16", "112.1.65.32" "175.147.222.190", "183.227.43.68", "59.88.168.87", "117.28.44.29", "117.59.34.167") Return ips.get (new Random () .nextInt (ips.size () } / * production domain name data * @ return * / private String getDomains () {List domains = Arrays.asList ("v1.go2yd.com", "v2.go2vd.com", "v3.go2yd.com", "v4.go2yd.com", "vmi.go2yd.com") Return domains.get (new Random (). NextInt (domains.size ();} / * * production flow data * @ return * / private int getTraffic () {return new Random () .nextInt (10000);}}
For other configurations of Springboot Kafka, please refer to Springboot2 Integration Kafka
Open the Kafka server consumer and you can see
It indicates that the Kafka data was sent successfully.
Flink consumers
Public class LogAnalysis {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test"); DataStreamSource data = env.addSource (topic, new SimpleStringSchema (), properties)) Data.print () .setParallelism (1); env.execute ("LogAnalysis");}}
Messages received
Aliyun CN M 2021-01-31 23:43:07 222.104.18.111 v1.go2yd.com 4603 aliyun CN E 2021-01-31 23:43:09 222.104.18.111 v4.go2yd.com 6313 aliyun CN E 2021-01-31 23:43:11 222.104.18.111 v2.go2vd.com 4233 aliyun CN E 2021-01- 31 23:43:13 222.104.18.111 v4.go2yd.com 2691 aliyun CN E 2021-01-31 23:43:15 183.225.121.16 v1.go2yd.com 212 aliyun CN E 2021-01-31 23:43:17 183.225.121.16 v4.go2yd.com 7744 aliyun CN M 2021-01-31 23:43:19 175.147.222.190 vmi.go2yd.com 1318
Data cleaning
Data cleaning is to process the original input data according to our business rules, so as to meet our business needs.
@ Slf4jpublic class LogAnalysis {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test"); DataStreamSource data = env.addSource (topic, new SimpleStringSchema (), properties)) Data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t"); String level = splits [2]; String timeStr = splits [3]; Long time = 0L Try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime ();} catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5]; String traffic = splits [6] Return new Tuple4 (level,time,domain,traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data whose Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map) (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1) Value.getField (2), Long.parseLong (value.getField (3)) ) .print () .setParallelism (1); env.execute ("LogAnalysis");}}
Running result
(1612130315000 retro v1.go2yd.comrec. 533) (1612130319000 reactant v4.go2yd.commeme 8657) (1612130327000 memo v1.go2yd.com.com.9566) (1612130329000memoto v2.go2yd.com.com.1460) (161213033000spectv3.go2yd.com.com.6955) (161213033000v1.go2yd.com.com.9612) (1612130341000vmi.go2yd.com.com.com612) (1612130341000vmi.go2yd.com.694)
Scala code
Import java.text.SimpleDateFormatimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.slf4j.LoggerFactoryimport org.apache.flink.api.scala._object LogAnalysis {val log = LoggerFactory.getLogger (LogAnalysis.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment val topic = "pktest" val properties = new Properties properties.setProperty ("bootstrap.servers" "Public network ip:9092") properties.setProperty ("group.id", "test") val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties)) data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.fug2 env.execute x.coach 4.toLong)) .print () .print (1) env.execute ("LogAnalysis")}}
Data analysis
What we are going to analyze now is the domain name traffic within one minute.
@ Slf4jpublic class LogAnalysis {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test") DataStreamSource data = env.addSource (new FlinkKafkaConsumer (topic, new SimpleStringSchema (), properties); data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t"); String level = splits [2]; String timeStr = splits [3]; Long time = 0L Try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime ();} catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5]; String traffic = splits [6] Return new Tuple4 (level,time,domain,traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data whose Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map) (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1) Value.getField (2), Long.parseLong (value.getField (3)) }) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks () {private Long maxOutOfOrderness = 10000L; private Long currentMaxTimestamp = 0L; @ Nullable @ Override public Watermark getCurrentWatermark () {return new Watermark (currentMaxTimestamp-maxOutOfOrderness)) } @ Override public long extractTimestamp (Tuple3 element, long previousElementTimestamp) {Long timestamp = element.getField (0); currentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp); return timestamp }}) .keyby (x-> (String) x.getField (1)) .timeWindow (Time.minutes (1)) / / output format: one minute interval, domain name The total traffic of the domain name within one minute. Apply (new WindowFunction () {@ Override public void apply (String s, TimeWindow window, Iterable input, Collector out) throws Exception {List list = (List) input Long sum = list.stream (). Map (x-> (Long) x.getField (2)). Reduce ((x, y)-> x + y). Get (); SimpleDateFormat format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss"); out.collect (new Tuple3 (format.format (window.getStart ()) + "-" + format.format (window.getEnd ()), new Tuple3 sum)) ) .print () .setParallelism (1); env.execute ("LogAnalysis");}}
Running result
(2021-02-01 07:14:00-2021-02-01 07 purl 1500grader vmi.go2yd.commem6307) (2021-02-01 07:15:00-2021-02-01 07V 1600recoverv4.go2yd.commem15474) (2021-02-01 07:15:00-2021-02-01 071167purl V2.go2vd.compjj9210) (2021-02-01 07:15:00-2021-02-01 0716purl v3.go2yd.com ) (2021-02-01 07:15:00-2021-02-01 07 purl 1600Magna v1.go2yd.commem12787) (2021-02-01 07:15:00-2021-02-01 07 purl 1600Magne vmi.go2yd.commem14250) (2021-02-01 07:16:00-2021-02-01 07 purl 1700Jet v4.go2yd.com.33298) (2021-02-01 07:16:00-2021-02-01 07JV 1700JV v1.go2yd.comjue 37140)
Scala code
Import java.text.SimpleDateFormatimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.slf4j.LoggerFactoryimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.watermark.Watermarkimport org .apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorobject LogAnalysis {val log = LoggerFactory.getLogger (LogAnalysis.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) val topic = "pktest" val properties = new Properties properties.setProperty ("bootstrap.servers") "Public network ip:9092") properties.setProperty ("group.id", "test") val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties)) data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.TimestampsAndWatermarks [(Long, String, new AssignerWithPeriodicWatermarks)). Long)] {var maxOutOfOrderness: Long = 10000l var currentMaxTimestamp: Long = _ override def getCurrentWatermark: Watermark = {new Watermark (currentMaxTimestamp-maxOutOfOrderness)} override def extractTimestamp (element: (Long, String, Long), previousElementTimestamp: Long): Long = {val timestamp = element._1currentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp) timestamp}) .keyBy (_ .2) .timeWindow (Time.minutes (1)) .apply (new WindowFunction [(Long) String,Long), (String,String,Long), String,TimeWindow] {override def apply (key: String, window: TimeWindow, input: Iterable [(Long, String,Long)], out: Collector [(String,String) Long)]: Unit = {val list = input.toListval sum = list.map (_. _ 3). Sumval format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") out.collect ((format.format (window.getStart) + "-" + format.format (window.getEnd), key,sum)}) .print () .setParallelism (1) env.execute ("LogAnalysis")}}
Sink to Elasticsearch
Install ES
The version we use here is 6.2.4.
Wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.4.tar.gz
After decompression, enter the config directory, edit elasticsearch.yml, and modify
Network.host: 0.0.0.0
Add a non-root user
Useradd es
Change all files in the ES directory to the es owner
Chown-R es:es elasticsearch-6.2.4
Modify / etc/security/limits.conf to change the bottom content to
Es soft nofile 65536es hard nofile 65536
Modify / etc/sysctl.conf, add
Vm.max_map_count=655360
Execute a command
Sysctl-p
Go to the bin folder of es and switch the user es
Su es
Execute under es user
. / elasticsearch-d
At this point, you can see the ES information in the Web interface (public network ip:9200)
Add ES Sink to Flink, add dependencies first
Org.apache.flink flink-connector-elasticsearch7_2.11 ${flink.version} @ Slf4jpublic class LogAnalysis {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test") List httpHosts = new ArrayList (); httpHosts.add (new HttpHost ("public network ip", 9200, "http"); ElasticsearchSink.Builder builder = new ElasticsearchSink.Builder (httpHosts, new ElasticsearchSinkFunction () {@ Override public void process (Tuple3 value, RuntimeContext runtimeContext, RequestIndexer indexer) {Map json = new HashMap (); json.put ("time", value.getField (0)) Json.put ("domain", value.getField (1)); json.put ("traffic", value.getField (2)); String id = value.getField (0) + "-" + value.getField (1) Indexer.add (Requests.indexRequest () .index ("cdn") .type ("traffic") .id (id) .source (json);}}) / / set the buffer size of bulk write data builder.setBulkFlushMaxActions (1); DataStreamSource data = env.addSource (new FlinkKafkaConsumer (topic, new SimpleStringSchema (), properties)); data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t")) String level = splits [2]; String timeStr = splits [3]; Long time = 0L; try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr) .getTime () } catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5]; String traffic = splits [6]; return new Tuple4 (level,time,domain,traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data whose Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map) (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1) Value.getField (2), Long.parseLong (value.getField (3)) }) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks () {private Long maxOutOfOrderness = 10000L; private Long currentMaxTimestamp = 0L; @ Nullable @ Override public Watermark getCurrentWatermark () {return new Watermark (currentMaxTimestamp-maxOutOfOrderness)) } @ Override public long extractTimestamp (Tuple3 element, long previousElementTimestamp) {Long timestamp = element.getField (0); currentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp); return timestamp }}) .keyby (x-> (String) x.getField (1)) .timeWindow (Time.minutes (1)) / / output format: one minute interval, domain name The total traffic of the domain name within one minute. Apply (new WindowFunction () {@ Override public void apply (String s, TimeWindow window, Iterable input, Collector out) throws Exception {List list = (List) input Long sum = list.stream (). Map (x-> (Long) x.getField (2)). Reduce ((x, y)-> x + y). Get (); SimpleDateFormat format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss"); out.collect (new Tuple3 (format.format (window.getStart ()) + "-" + format.format (window.getEnd ()), new Tuple3 sum)) ) .addSink (builder.build ()); env.execute ("LogAnalysis");}}
After execution, you can query the data in ES.
Http:// public network ip:9200/cdn/traffic/_search
Scala code
Import java.text.SimpleDateFormatimport java.utilimport java.util.Propertiesimport org.apache.flink.api.common.functions.RuntimeContextimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.slf4j.LoggerFactoryimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.scala.function .WindowFunctionimport org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.streaming.connectors.elasticsearch. {ElasticsearchSinkFunction RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSinkimport org.apache.flink.util.Collectorimport org.apache.http.HttpHostimport org.elasticsearch.client.Requestsobject LogAnalysis {val log = LoggerFactory.getLogger (LogAnalysis.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) val topic = "pktest" val properties = new Properties properties.setProperty ("bootstrap.servers", "public network ip:9092") properties.setProperty ("group.id") "test") val httpHosts = new util.ArrayList [HttpHost] httpHosts.add (new HttpHost ("public network ip", 9200, "http")) val builder = new ElasticsearchSink.Builder [(String,String,Long)] (httpHosts,new ElasticsearchSinkFunction [(String,String,Long)] {override def process (t: (String,String,Long), runtimeContext: RuntimeContext, indexer: RequestIndexer): Unit = {val json = new util.HashMap [String,Any] json.put ("time") ) json.put ("domain", t.room2) json.put ("traffic") ) val id = t.room1 + "-" + t.room2 indexer.add (Requests.indexRequest () .index ("cdn") .`type` ("traffic") .id (id) .source (json)}}) builder.setBulkFlushMaxActions (1) val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties)) data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.TimestampsAndWatermarks [(Long, String, new AssignerWithPeriodicWatermarks)). Long)] {var maxOutOfOrderness: Long = 10000l var currentMaxTimestamp: Long = _ override def getCurrentWatermark: Watermark = {new Watermark (currentMaxTimestamp-maxOutOfOrderness)} override def extractTimestamp (element: (Long, String, Long), previousElementTimestamp: Long): Long = {val timestamp = element._1currentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp) timestamp}) .keyBy (_ .2) .timeWindow (Time.minutes (1)) .apply (new WindowFunction [(Long) String,Long), (String,String,Long), String,TimeWindow] {override def apply (key: String, window: TimeWindow, input: Iterable [(Long, String,Long)], out: Collector [(String,String) Long)]: Unit = {val list = input.toListval sum = list.map (_. _ 3). Sumval format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") out.collect ((format.format (window.getStart) + "-" + format.format (window.getEnd), key,sum)}) .addSink (builder.build) env.execute ("LogAnalysis")}}
Kibana graphic display
Install kibana
Wget https://artifacts.elastic.co/downloads/kibana/kibana-6.2.4-linux-x86_64.tar.gz
To keep the version of kibana the same as ES, unzip it and enter the config directory to edit kibana.yml.
Server.host: "host2" elasticsearch.url: "http://host2:9200"
The contents will vary from version to version. After saving, enter the bin directory.
Switch es users, execute
. / kibana &
Visit the Web page, public network ip:5601
Here I made a table, a bar chart.
The second requirement is to count the traffic generated by each user in one minute.
Add a table user_domain_config to the MySQL database with the following fields
The contents of the table are as follows
Data cleaning
/ * Custom MySQL data source * / public class MySQLSource extends RichParallelSourceFunction {private Connection connection; private PreparedStatement pstmt; private Connection getConnection () {Connection conn = null; try {Class.forName ("com.mysql.cj.jdbc.Driver"); String url = "jdbc:mysql:// public network ip:3306/flink"; conn = DriverManager.getConnection (url, "root", "*") } catch (Exception e) {e.printStackTrace ();} return conn;} @ Override public void open (Configuration parameters) throws Exception {super.open (parameters); connection = getConnection (); String sql = "select user_id,domain from user_domain_config"; pstmt = connection.prepareStatement (sql) @ Override @ SuppressWarnings ("unchecked") public void run (SourceContext ctx) throws Exception {ResultSet rs = pstmt.executeQuery (); while (rs.next ()) {Tuple2 tuple2 = new Tuple2 (rs.getString ("domain"), rs.getString ("user_id")); ctx.collect (tuple2);} pstmt.close () } @ Override public void cancel () {} @ Override public void close () throws Exception {super.close (); if (pstmt! = null) {pstmt.close ();} if (connection! = null) {connection.close ();} @ Slf4jpublic class LogAnalysisWithMySQL {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); String topic = "pktest" Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test"); DataStreamSource data = env.addSource (topic, new SimpleStringSchema (), properties)) SingleOutputStreamOperator logData = data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t"); String level = splits [2]; String timeStr = splits [3]; Long time = 0L Try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime ();} catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5]; String traffic = splits [6] Return new Tuple4 (level, time, domain, traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data where Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1)) Value.getField (2), Long.parseLong (value.getField (3)) }}); DataStreamSource mysqlData = env.addSource (new MySQLSource ()); / / dual stream convergence logData.connect (mysqlData) .flatMap (new CoFlatMapFunction () {private Map userDomainMap = new HashMap (); @ Override public void flatMap1 (Tuple3 value, Collector out) throws Exception {String domain = value.getField (1) String userId = userDomainMap.getOrDefault (domain, "); out.collect (new Tuple4 (value.getField (0), value.getField (1), value.getField (2), userId);} @ Override public void flatMap2 (Tuple2 value, Collector out) throws Exception {userDomainMap.put (value.getField (0), value.getField (1);}}) .print () .setParallelism (1) Env.execute ("LogAnalysisWithMySQL");}}
Running result
(1612239325000 retro vmi.go2yd.commeme 7115magic80000001) (1612239633000relegate v4.go2yd.com.com.841212recover80000001) (1612239635000mv3.go2yd.comre3527pr 80000000) (1612239639000mv1.go2yd.comre7385flint 80000000) (16122396433000mvmi.go2yd.comre86500580000001) (1612239645000mavmi.go2yd.comne2642dire80000001) (1612239647000fear vmi.go2yd.com.com.1612239647000vmi.go2yd.com.com.88322,80000000)
Scala code
Import java.sql. {Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.source. {RichParallelSourceFunction, SourceFunction} class MySQLSource extends RichParallelSourceFunction [(String,String)] {var connection: Connection = null var pstmt: PreparedStatement = null def getConnection:Connection = {var conn: Connection = null Class.forName ("com.mysql.cj.jdbc.Driver") val url = "jdbc:mysql:// public network ip:3306/flink" conn = DriverManager.getConnection (url, "root") "*") conn} override def open (parameters: Configuration): Unit = {connection = getConnectionval sql = "select user_id,domain from user_domain_config" pstmt = connection.prepareStatement (sql)} override def cancel () = {} override def run (ctx: SourceFunction.SourceContext [(String, String)]) = {val rs = pstmt.executeQuery () while (rs.next) {val tuple2 = (rs.getString ("domain") Rs.getString ("user_id") ctx.collect (tuple2)} pstmt.close ()} override def close (): Unit = {if (pstmt! = null) {pstmt.close ()} if (connection! = null) {connection.close ()}} import java.text.SimpleDateFormatimport java.util.Propertiesimport com.guanjian.flink.scala.until.MySQLSourceimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org. Apache.flink.api.scala._import org.apache.flink.streaming.api.functions.co.CoFlatMapFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.util.Collectorimport org.slf4j.LoggerFactoryimport scala.collection.mutableobject LogAnalysisWithMySQL {val log = LoggerFactory.getLogger (LogAnalysisWithMySQL.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment val topic = "pktest" val properties = new Properties properties.setProperty ("bootstrap.servers" "Public network ip:9092") properties.setProperty ("group.id", "test") val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties) val logData = data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)}} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.fug2mx.fugmx.Zong4.toLong) val mysqlData = env.addSource (new MySQLSource) logData.connect (mysqlData) .flatMap (new CoFlatMapFunction [(Long,String)) Long), (String,String), (Long,String,Long,String)] {var userDomainMap = mutable.HashMap [String,String] () override def flatMap1 (value: (Long,String,Long), out: Collector [(Long,String,Long,String)]) = {val domain = value._2val userId = userDomainMap.getOrElse (domain, ") out.collect ((value._1,value._2,value._3) UserId))} override def flatMap2 (value: (String, String), out: Collector [(Long, String, Long, String)]) = {userDomainMap + = value._1-> value._2}}. Print (). SetParallelism (1) env.execute ("LogAnalysisWithMySQL")}}
Data analysis
@ Slf4jpublic class LogAnalysisWithMySQL {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test") DataStreamSource data = env.addSource (new FlinkKafkaConsumer (topic, new SimpleStringSchema (), properties); SingleOutputStreamOperator logData = data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t"); String level = splits [2]; String timeStr = splits [3] Long time = 0L; try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime ();} catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5] String traffic = splits [6]; return new Tuple4 (level, time, domain, traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data where Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1)) Value.getField (2), Long.parseLong (value.getField (3)) }}); DataStreamSource mysqlData = env.addSource (new MySQLSource ()); / / dual stream convergence logData.connect (mysqlData) .flatMap (new CoFlatMapFunction () {private Map userDomainMap = new HashMap (); @ Override public void flatMap1 (Tuple3 value, Collector out) throws Exception {String domain = value.getField (1) String userId = userDomainMap.getOrDefault (domain, "); out.collect (new Tuple4 (value.getField (0), value.getField (1), value.getField (2), userId);} @ Override public void flatMap2 (Tuple2 value, Collector out) throws Exception {userDomainMap.put (value.getField (0), value.getField (1)) }) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks () {private Long maxOutOfOrderness = 10000L; private Long currentMaxTimestamp = 0L; @ Nullable @ Override public Watermark getCurrentWatermark () {return new Watermark (currentMaxTimestamp-maxOutOfOrderness);} @ Override public long extractTimestamp (Tuple4 element, long previousElementTimestamp) {Long timestamp = element.getField (0) CurrentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp); return timestamp }}) .keyby (x-> (String) x.getField (3)) .timeWindow (Time.minutes (1)) / / output format: one minute interval, user The total traffic of the user within one minute. Apply (new WindowFunction () {@ Override public void apply (String s, TimeWindow window, Iterable input, Collector out) throws Exception {List list = (List) input Long sum = list.stream (). Map (x-> (Long) x.getField (2)). Reduce ((x, y)-> x + y). Get (); SimpleDateFormat format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss"); out.collect (format.format (window.getStart ()) + "-" + format.format (window.getEnd ()), s, sum)) ). Print (). SetParallelism (1); env.execute ("LogAnalysisWithMySQL");}}
Running result
(2021-02-02 13:58:00-2021-02-02 13Relay 5900Participe 80000000reie20933) (2021-02-02 13:58:00-2021-02-02 13veluza 5900Partil 80000001Lab 6928) (2021-02 13:59:00-2021-02-02 1400Jr 80000001ghe 38202) (2021-02-02 13:59:00-2021-02-02 14jiv 0000Jr 80000001 39394) (2021-0202 14jig 000000-2021-02202 14jiu001Rd 80000001ref 23070) (2021-02-02 02 14:00:00-2021-02-02 14 purl 01purl 0080000000jol 41701)
Scala code
Import java.text.SimpleDateFormatimport java.util.Propertiesimport com.guanjian.flink.scala.until.MySQLSourceimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.functions.co.CoFlatMapFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink. Streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.util.Collectorimport org.slf4j.LoggerFactoryimport scala.collection.mutableobject LogAnalysisWithMySQL {val log = LoggerFactory.getLogger (LogAnalysisWithMySQL.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) val topic = "pktest" Val properties = new Properties properties.setProperty ("bootstrap.servers" "Public network ip:9092") properties.setProperty ("group.id", "test") val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties) val logData = data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)}} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.fug2mx.fugmx.Zong4.toLong) val mysqlData = env.addSource (new MySQLSource) logData.connect (mysqlData) .flatMap (new CoFlatMapFunction [(Long,String)) Long), (String,String), (Long,String,Long,String)] {var userDomainMap = mutable.HashMap [String,String] () override def flatMap1 (value: (Long,String,Long), out: Collector [(Long,String,Long,String)]) = {val domain = value._2val userId = userDomainMap.getOrElse (domain, ") out.collect ((value._1,value._2,value._3) UserId)} override def flatMap2 (value: (String, String), out: Collector [(Long, String, Long, String)]) = {userDomainMap + = value._1-> value._2}}) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks [(Long, String, Long) String)] {var maxOutOfOrderness: Long = 10000l var currentMaxTimestamp: Long = _ override def getCurrentWatermark: Watermark = {new Watermark (currentMaxTimestamp-maxOutOfOrderness)} override def extractTimestamp (element: (Long, String, Long, String), previousElementTimestamp: Long): Long = {val timestamp = element._1currentMaxTimestamp = Math.max (timestamp CurrentMaxTimestamp) timestamp}}) .keyBy (_ .4) .timewindow (Time.minutes (1)) .apply (new WindowFunction [(Long,String,Long,String), (String,String,Long), String,TimeWindow] {override def apply (key: String, window: TimeWindow, input: Iterable [(Long,String,Long,String)], out: Collector [(String,String) Long)]: Unit = {val list = input.toList val sum = list.map (_. _ 3). Sum val format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") out.collect ((format.format (window.getStart) + "-" + format.format (window.getEnd), key,sum)}}). Print (). SetParallelism (1) env.execute ("LogAnalysisWithMySQL")}}
Sink to ES
@ Slf4jpublic class LogAnalysisWithMySQL {public static void main (String [] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); String topic = "pktest"; Properties properties = new Properties (); properties.setProperty ("bootstrap.servers", "extranet ip:9092"); properties.setProperty ("group.id", "test"); List httpHosts = new ArrayList () HttpHosts.add (new HttpHost ("public network ip", 9200, "http"); ElasticsearchSink.Builder builder = new ElasticsearchSink.Builder (httpHosts, new ElasticsearchSinkFunction () {@ Override public void process (Tuple3 value, RuntimeContext runtimeContext, RequestIndexer indexer) {Map json = new HashMap (); json.put ("time", value.getField (0)) Json.put ("userId", value.getField (1)); json.put ("traffic", value.getField (2)); String id = value.getField (0) + "-" + value.getField (1) Indexer.add (Requests.indexRequest () .index ("user") .type ("traffic") .id (id) .source (json);}}) / / set the buffer size of bulk write data builder.setBulkFlushMaxActions (1); DataStreamSource data = env.addSource (new FlinkKafkaConsumer (topic, new SimpleStringSchema (), properties)); SingleOutputStreamOperator logData = data.map (new MapFunction () {@ Override public Tuple4 map (String value) throws Exception {String [] splits = value.split ("\ t")) String level = splits [2]; String timeStr = splits [3]; Long time = 0L; try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr) .getTime () } catch (ParseException e) {log.error ("time conversion error:" + timeStr + "," + e.getMessage ());} String domain = splits [5]; String traffic = splits [6]; return new Tuple4 (level, time, domain, traffic) ) .filter (x-> (Long) x.getField (1)! = 0) / / here we only need data where Level is E. filter (x-> x.getField (0). Equals ("E")) / / discard level .map (new MapFunction () {@ Override public Tuple3 map (Tuple4 value) throws Exception {return new Tuple3 (value.getField (1)) Value.getField (2), Long.parseLong (value.getField (3)) }}); DataStreamSource mysqlData = env.addSource (new MySQLSource ()); / / dual stream convergence logData.connect (mysqlData) .flatMap (new CoFlatMapFunction () {private Map userDomainMap = new HashMap (); @ Override public void flatMap1 (Tuple3 value, Collector out) throws Exception {String domain = value.getField (1) String userId = userDomainMap.getOrDefault (domain, "); out.collect (new Tuple4 (value.getField (0), value.getField (1), value.getField (2), userId);} @ Override public void flatMap2 (Tuple2 value, Collector out) throws Exception {userDomainMap.put (value.getField (0), value.getField (1)) }) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks () {private Long maxOutOfOrderness = 10000L; private Long currentMaxTimestamp = 0L; @ Nullable @ Override public Watermark getCurrentWatermark () {return new Watermark (currentMaxTimestamp-maxOutOfOrderness);} @ Override public long extractTimestamp (Tuple4 element, long previousElementTimestamp) {Long timestamp = element.getField (0) CurrentMaxTimestamp = Math.max (timestamp,currentMaxTimestamp); return timestamp }}) .keyby (x-> (String) x.getField (3)) .timeWindow (Time.minutes (1)) / / output format: one minute interval, user The total traffic of the user within one minute. Apply (new WindowFunction () {@ Override public void apply (String s, TimeWindow window, Iterable input, Collector out) throws Exception {List list = (List) input Long sum = list.stream (). Map (x-> (Long) x.getField (2)). Reduce ((x, y)-> x + y). Get (); SimpleDateFormat format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss"); out.collect (format.format (window.getStart ()) + "-" + format.format (window.getEnd ()), s, sum)) ) .addSink (builder.build ()); env.execute ("LogAnalysisWithMySQL");}}
Running result
Access http:// public network ip:9200/user/traffic/_search
Scala code
Port java.text.SimpleDateFormatimport java.utilimport java.util.Propertiesimport com.guanjian.flink.scala.until.MySQLSourceimport org.apache.flink.api.common.functions.RuntimeContextimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.functions.co.CoFlatMapFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink .streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.streaming.connectors.elasticsearch. {ElasticsearchSinkFunction RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSinkimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.util.Collectorimport org.apache.http.HttpHostimport org.elasticsearch.client.Requestsimport org.slf4j.LoggerFactoryimport scala.collection.mutableobject LogAnalysisWithMySQL {val log = LoggerFactory.getLogger (LogAnalysisWithMySQL.getClass) def main (args: Array [String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime) val topic = "pktest" val properties = new Properties properties.setProperty ("bootstrap.servers") "Public Network ip:9092") properties.setProperty ("group.id", "test") val httpHosts = new util.ArrayList [HttpHost] httpHosts.add (new HttpHost ("Public Network ip", 9200, "http")) val builder = new ElasticsearchSink.Builder [(String,String,Long)] (httpHosts,new ElasticsearchSinkFunction [(String,String,Long)] {override def process (t: (String,String,Long), runtimeContext: RuntimeContext, indexer: RequestIndexer): Unit = {val json = new util.HashMap [String Any] json.put ("time", t. UserId 1) json.put ("userId", t. Room2) json.put ("traffic") ) val id = t.room1 + "-" + t.room2 indexer.add (Requests.indexRequest () .index ("user") .`type` ("traffic") .id (id) .source (json)}}) builder.setBulkFlushMaxActions (1) val data = env.addSource (new FlinkKafkaConsumer [String] (topic, new SimpleStringSchema) Properties) val logData = data.map (x = > {val splits = x.split ("\ t") val level = splits (2) val timeStr = splits (3) var time: Long = 0l try {time = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") .parse (timeStr). GetTime} catch {case e: Exception = > {log.error (s "time conversion error: $timeStr" E.getMessage)}} val domain = splits (5) val traffic = splits (6) (level,time,domain,traffic)}). Filter (_. _ 2! = 0) .filter (_. _ 1 = = "E") .map (x = > (x.fug2mx.fugmx.Zong4.toLong) val mysqlData = env.addSource (new MySQLSource) logData.connect (mysqlData) .flatMap (new CoFlatMapFunction [(Long,String)) Long), (String,String), (Long,String,Long,String)] {var userDomainMap = mutable.HashMap [String,String] () override def flatMap1 (value: (Long,String,Long), out: Collector [(Long,String,Long,String)]) = {val domain = value._2val userId = userDomainMap.getOrElse (domain, ") out.collect ((value._1,value._2,value._3) UserId)} override def flatMap2 (value: (String, String), out: Collector [(Long, String, Long, String)]) = {userDomainMap + = value._1-> value._2}}) .setParallelism (1) .assignTimestampsAndWatermarks (new AssignerWithPeriodicWatermarks [(Long, String, Long) String)] {var maxOutOfOrderness: Long = 10000l var currentMaxTimestamp: Long = _ override def getCurrentWatermark: Watermark = {new Watermark (currentMaxTimestamp-maxOutOfOrderness)} override def extractTimestamp (element: (Long, String, Long, String), previousElementTimestamp: Long): Long = {val timestamp = element._1currentMaxTimestamp = Math.max (timestamp CurrentMaxTimestamp) timestamp}}) .keyBy (_ .4) .timewindow (Time.minutes (1)) .apply (new WindowFunction [(Long,String,Long,String), (String,String,Long), String,TimeWindow] {override def apply (key: String, window: TimeWindow, input: Iterable [(Long,String,Long,String)], out: Collector [(String,String) Long)]: Unit = {val list = input.toList val sum = list.map (_. _ 3). Sum val format = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss") out.collect ((format.format (window.getStart) + "-" + format.format (window.getEnd), key,sum)}) .addSink (builder.build) env.execute ("LogAnalysisWithMySQL")}}
Kibana chart display
Here, let's draw a ring diagram.
At this point, the study on "what is the overall process of the Flink simple project" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.