In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-02 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
The code is as follows:
Package com.dt.spark.streamingimport org.apache.spark.sql.SQLContextimport org.apache.spark. {SparkContext, SparkConf} import org.apache.spark.streaming. {StreamingContext, Duration} / * use SparkStreaming and SparkSQL to analyze the log. * assume that the click log format of the e-commerce website (simplified) is as follows: * userid,itemId,clickTime * requirements: the number of item clicks within 10 minutes of processing is sorted by Top10, and the product name is displayed. The corresponding relationship between commodity itemId and trade name is stored in MySQL database * Created by dinglq on 2016-5-4. * / object LogAnalyzerStreamingSQL {val WINDOW_LENGTH = new Duration (1000) val SLIDE_INTERVAL = new Duration (10 * 1000) def main (args: Array [String]) {val sparkConf = new SparkConf (). SetAppName ("LogAnalyzerStreamingSQL"). SetMaster ("local [4]") val sc = new SparkContext (sparkConf) val sqlContext = new SQLContext (sc) import sqlContext.implicits._ / / load the itemInfo table val itemInfoDF = sqlContext.read.format ("jdbc") .options (Map ("url"-> "jdbc:mysql://spark-master:3306/spark") from the database "driver"-> "com.mysql.jdbc.Driver", "dbtable"-> "iteminfo", "user"-> "root", "password"-> "vincent"). Load () itemInfoDF.registerTempTable ("itemInfo") val streamingContext = new StreamingContext (sc SLIDE_INTERVAL) val logLinesDStream = streamingContext.textFileStream ("D:/logs_incoming") val accessLogsDStream = logLinesDStream.map (AccessLog.parseLogLine). Cache () val windowDStream = accessLogsDStream.window (WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD (accessLogs = > {if (accessLogs.isEmpty ()) {println ("No logs received in this time interval")} else {accessLogs.toDF (). RegisterTempTable ("accessLogs") val sqlStr = "SELECT a.itemid" A.itemname.cnt FROM itemInfo a JOIN "+" (SELECT itemId,COUNT (*) cnt FROM accessLogs GROUP BY itemId) b "+" ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 "val topTenClickItemLast10Minus = sqlContext.sql (sqlStr) / / Persist top ten table for this window to HDFS as parquet file topTenClickItemLast10Minus.show ()}) streamingContext.start () streamingContext.awaitTermination ()}} case class AccessLog (userId: String ItemId: String, clickTime: String) {} object AccessLog {def parseLogLine (log: String): AccessLog = {val logInfo = log.split (",") if (logInfo.length = = 3) {AccessLog (logInfo (0), logInfo (1), logInfo (2))} else {AccessLog ("0", "0")}
The contents of the table in MySQL are as follows:
Mysql > select * from spark.iteminfo;+-+-+ | itemid | itemname | +-+-+ | 001 | phone | | 002 | computer | | 003 | TV | +-+-+ 3 rows in set (0.00 sec)
Create a directory logs_incoming in D
Run the Spark Streaming program.
Create a new file with the following contents:
0001memo 2016-05-04 22 purl 10purl 200002 beacon 001meme2016-05-04 22purl 10purl 210003LER 001pr 2016-05-04 22purl 10maze 230005 Mel 001MAH 2016-05-04 22Rd 10Rd 240006MAL 001Eng 2016-05-04 2222 purl 10Rd 250007ler 002 2016-05-04 22Rover 10Rod 260008ler 2016-05-04 22purl 10 2700090000314-2016 05-04 22purr 10here 270009003May-2016 05-04 22thought 10rel 280010thought 32016-04-22purl10blub 290011thought 12016-05-2204 Pluto 10thought 300012003no-2016 05-2204 10Rod 10Rod 130052016-2204-220410Rd 13052204
Save the file to the directory logs_incoming and observe the output of the Spark program:
+-- + | itemid | itemname | cnt | +-- + | 001 | phone | 6 | | 003 | TV | 4 | | 002 | computer | 3 | +-- +
Note:
1. DT big data DreamWorks Wechat official account DT_Spark
2. IMF 8: 00 p.m. Big data actual combat YY live broadcast channel number: 68917580
3. Sina Weibo: http://www.weibo.com/ilovepains
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.