Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze Spark-RDD http Log in big data's Development

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail how to conduct Spark-RDD http log analysis in the development of big data. The content of the article is of high quality, so the editor will share it with you for reference. I hope you will have a certain understanding of the relevant knowledge after reading this article.

1. How to deal with configuration file & & table data processing in production environment

Configuration files, or configuration tables, are generally placed in an online db, such as a relational database such as mysql, or a backend rd directly throws you a file, and the amount of data is very small compared to the large table in the entire offline data warehouse, so in this case, the general practice is to broadcast small tables or small files, so the following example shows that the use of broadcast tables solves the problem of ip address mapping.

two。 Log analysis case 12.1 data description

Http.log:

The log generated by the user visiting the site. The log format is: timestamp, IP address, access URL, access data, browser information, etc. Examples are as follows:

Ip.dat:ip segment data, which records the corresponding positions of some ip segments. The total amount of data is about 110000, and the amount of data is very small. Examples are as follows

File location: data/http.log, data/ip.dat

Link: https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ extraction code: hell

Requirement: convert the ip in the http.log file to an address. If 122.228.96.111 is converted to Wenzhou, and the total number of visitors to each city is counted.

2.2. The implementation idea and code are as follows

There are three key points, the key information of http.log is the ip address, so according to the principle of data reduction, only ip can be read. In addition, when ip mapping is compared, the ip address mapping file is sorted, so in order to improve the search efficiency, we use the ip address to long type, and then use dichotomy to find the address and map it to the address.

Package com.hoult.workimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSession/** * data source: access log of 1.ip address 2.ip address mapping table * need to broadcast the mapping table Compare address translation to long type * / object FindIp {def main (args: Array [String]): Unit = {val spark = SparkSession .builder () .master ("local [*]") .appName (this.getClass.getCanonicalName) .getOrCreate () val sc = spark.sparkContext import spark.implicits._ val ipLogsRDD = sc.textFile ("data/http.log") .map (_ .split () (1) val ipInfoRDD = sc.textFile ("data/ip.dat"). Map {case line: String = > {val strSplit: Array [String] = line.split ("\ |") Ip (strSplit (0) StrSplit (1), strSplit (7)}} val brIPInfo = sc.broadcast (ipInfoRDD.map (x = > (ip2Long (x.startIp), ip2Long (x.endIp), x.address)) collect () / / rdd ipLogsRDD .map (x = > {val index = binarySearch (brIPInfo.value)) Ip2Long (x)) if (index! =-1) brIPInfo.value (index). _ 3 else "NULL"}) .map (x = > (x, 1)) .reduceByKey (_ + _) .map (x = > s "City: ${x.room1}) Number of visits: ${x.visits 2}) .saveAsTextFile ("data/work/output_ips")} / / ip is converted to long type def ip2Long (ip: String): Long = {val fragments = ip.split ("[.]") Var ipNum = 0L for (i true case _ = > false}} def getTimeAndSize (line: String) = {var res = (", 0L) try {val httpSizePattern (code, size) = lineval timePattern (year, hour) = line res = (hour Size.toLong)} catch {case ex: Exception = > ex.printStackTrace ()} res} val IPPattern = "(: (?) 25 [0-5] | 2 [0-4]\ d | (1\\ d {2}) | (1\ 9]?\\ d)) {3} (?: 25 [0-5] | 2 [0-4]\ d | (1\\ d {2)) }) | ([1-9]?\\ d) ".r val videoPattern =" ([0-9] +) .mp4 ".r val res = cdnRDD .filter (x = > x.matches (". * ([0-9] +)\\ .mp4. * ") .map (x = > (videoPattern findFirstIn x toString) IPPattern findFirstIn x toString) .accounateByKey (list [string] ()) ((lst, str) = > (lst: + str), (lst1, lst2) = > (lst1 + + lst2) .mapValues (_ .strings) .sortBy (_. _ 2.size) False) res.saveAsTextFile ("data/cdn/videoIPs")} / * Traffic per hour of the day * * / def hourPoor (cdnRDD: RDD [String]) = {val httpSizePattern = ". *\\ s ([0-9] +)\\ s ([0-9] +)\" .r val timePattern = ". * (2017): ([0-9] {2}): [0-9] {2} : [0-9] {2}. * ".r import scala.util.matching.Regex def isMatch (pattern: Regex Str: String) = {str match {case pattern (_ *) = > true case _ = > false}} def getTimeAndSize (line: String) = {var res = (", 0L) try {val httpSizePattern (code, size) = lineval timePattern (year, hour) = line res = (hour) Size.toLong)} catch {case ex: Exception = > ex.printStackTrace ()} res} cdnRDD .filter (x = > isMatch (httpSizePattern,x)) .filter (x = > isMatch (timePattern,x)) .map (x = > getTimeAndSize (x)) .groupByKey () .map (x = > (x.filter 1) X._2.sum) .sortByKey () .map (x = > x.room1 + "CDN traffic =" + x.room2 / (102424024024) + "G") .saveAsTextFile ("data/cdn/hourPoor")}}

Screenshot of the running result:

4. An Analysis of Advertising exposure

Suppose you click on the log file (click.log) and the exposure log imp.log, and the format of each line is as follows

/ Click the log INFO 2019-09-01 00:29:53 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=31INFO 2019-09-01 00:30:31 requestURI:/click?app=2&p=1&adid=18005472&industry=469&adid=31INFO 2019-09-01 00:31:03 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=32INFO 2019-09-01 00:31:51 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=33// exposure log INFO 2019-09-01 00:29:53 RequestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=34

Using Spark-Core to count the number of exposures and clicks of each adid, the idea is relatively simple, directly on the code.

Code:

Package com.hoult.workimport org.apache.spark.sql.SparkSessionobject AddLog {def main (args: Array [String]): Unit = {val spark = SparkSession .builder () .master ("local [*]") .appName (this.getClass.getCanonicalName) .getOrCreate () val sc = spark.sparkContext val clickRDD = sc.textFile ("data/click.log") val impRDD = sc.textFile ("data/imp.log") Val clickRes = clickRDD.map {line = > {val arr = line.split ("\\ s +") val adid = arr (3) .substring (arr (3) .lastIndexOf ("=") + 1) (adid (1)} .reduceByKey (_ + _) val impRes = impRDD.map {line = > val arr = line.split ("\\ s +") val adid = arr (3) .substring (arr (3). LastIndexOf ("=") + 1) (adid, 1)} .reduceByKey (_ + _) / / Save to hdfs clickRes.fullOuterJoin (impRes) .map (x = > x.room1 + "," + x._2._1.getOrElse (0) + " "+ x._2._2.getOrElse (0) .repartition (1) / / .saveAsTextFile (" hdfs://linux121:9000/data/ ") .saveAsTextFile (" data/add_log ") sc.stop ()}}

Analysis: there are two shuffle times, fulljon can be modified to union + reduceByKey, and shuffle can be reduced to once

5. Use spark-sql to complete the following conversion

Table A has three fields: ID, startdate, enddate, and three pieces of data:

1 2019-03-04 2020-02-03

2 2020-04-05 2020-08-04

3 2019-10-09 2020-06-11

Write SQL (requires SQL and DSL) to change the above data to:

2019-03-04 2019-10-09

2019-10-09 2020-02-03

2020-02-03 2020-04-05

2020-04-05 2020-06-11

2020-06-11 2020-08-04

2020-08-04 2020-08-04

Analysis: observe that the first column is actually the result of the superposition of startdate and enddate, while the second column is the next, which can be used with lead

Window function

The code is as follows:

Package com.hoult.workimport org.apache.spark.sql. {DataFrame, SparkSession} object DataExchange {def main (args: Array [String]): Unit = {val spark = SparkSession .builder () .appName ("DateSort") .master ("local [*]") .getOrCreate () spark.sparkContext.setLogLevel ("warn") / / original val tab = List ((1, "2019-03-04", "2020-02-03"), (2) 2020-04-05, 2020-08-04), (3, "2019-10-09", "2020-06-11") val df: DataFrame = spark.createDataFrame (tab) .toDF ("ID", "startdate") "enddate") val dateset: DataFrame = df.select ("startdate") .union (df.select ("enddate")) dateset.createOrReplaceTempView ("t") val result: DataFrame = spark.sql ("| select tmp.startdate, nvl (lead (tmp.startdate) over (partition by col order by tmp.startdate), startdate) enddate from | (select" 1 "col, startdate from t) tmp |".stripMargin) result.show ()}}

Running result:

About big data development how to carry on the Spark-RDD http log analysis to share here, hope that the above content can have some help to everyone, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report