Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Lesson 96: write the processed data to an external storage system through Spark Streaming's foreachRDD

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

Content of this issue

Analysis of technical implementation

Realize the actual combat

SparkStreaming's DStream provides a dstream.foreachRDD method, which is a powerful raw API that allows data to be sent to external systems. However, it is important to understand how to use this primitive method correctly and effectively. Some common mistakes to avoid are as follows:

To write data to an external system, you need to establish a data connection object (such as a TCP connection to a remote server) and use it to send the data to the external storage system. To do this, developers may try to create a connection in Driver and then use it in worker to save records to external data. For example, the following scala code:

Dstream.foreachRDD {rdd = > val connection = createNewConnection () / / executed at the driver rdd.foreach {record = > connection.send (record) / / executed at the worker}}

The above code is an erroneous demonstration because the connection is created in Driver and the data is written in worker. At this point, the connection needs to be serialized and sent to worker. But we know that connection information cannot be serialized and sent serialized (different machines need to use different server ports to connect to the server, even if the connection is serialized)

We can then move the connection to worker to implement it, as follows:

Dstream.foreachRDD {rdd = > rdd.foreach {record = > val connection = createNewConnection () connection.send (record) connection.close ()}

But at this point, every time you process a data record, you need to connect to an external system, which is a serious problem for performance. This is not a perfect realization either.

We can improve the code as follows:

Dstream.foreachRDD {rdd = > rdd.foreachPartition {partitionOfRecords = > val connection = createNewConnection () partitionOfRecords.foreach (record = > connection.send (record)) connection.close ()}

Such a partition only needs to connect to external storage once. The performance has been greatly improved. However, connections cannot be reused between different partition. We can use connection pooling so that connections can be shared between partition. The code is as follows:

Dstream.foreachRDD {rdd = > rdd.foreachPartition {partitionOfRecords = > / / ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection () partitionOfRecords.foreach (record = > connection.send (record)) ConnectionPool.returnConnection (connection) / / return to the pool for future reuse}}

Let's use the SparkStreaming implementation to write the data to MySQL:

Add the following dependency package to pom.xml

Mysql mysql-connector-java 5.1.38 commons-dbcp commons-dbcp 1.4

Create databases and tables in MySql

Mysql > create database spark;Query OK, 1 row affected (0.01sec) mysql > use spark;Database changedmysql > show tables;Empty set (0.01sec) mysql > create table searchKeyWord (insert_time date,keyword varchar (30), search_count integer); Query OK, 0 rows affected (0.05sec)

Write a database connection pool class using Java

Package com.dt.spark.common;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import org.apache.commons.dbcp.BasicDataSource;import org.apache.log4j.Logger;/** * Description: database connection pool class * @ author dinglq * / public class ConnectPool {private static Logger log = Logger.getLogger (ConnectPool.class); private static BasicDataSource bs = null / * create a data source * @ return * / public static BasicDataSource getDataSource () throws Exception {if (bs==null) {bs= new BasicDataSource (); bs.setDriverClassName ("com.mysql.jdbc.Driver"); bs.setUrl ("jdbc:mysql://spark-master:3306/spark"); bs.setUsername ("root") Bs.setPassword ("vincent"); bs.setMaxActive (200); / / set the maximum number of concurrency bs.setInitialSize (30); / / number of connections created during database initialization bs.setMinIdle (50); / / minimum idle connections bs.setMaxIdle (200); / / maximum database connections bs.setMaxWait (1000) Bs.setMinEvictableIdleTimeMillis (60 / 1000); / / release bs.setTimeBetweenEvictionRunsMillis (5 / 60 / 1000) after idle connection for 60 seconds; / / detect whether there is a dead thread bs.setTestOnBorrow (true) once every 5 minutes;} return bs } / * release data source * / public static void shutDownDataSource () throws Exception {if (bsstores null) {bs.close ();}} / * obtain database connection * @ return * / public static Connection getConnection () {Connection con=null Try {if (bsfolk null) {con=bs.getConnection ();} else {con=getDataSource (). GetConnection ();}} catch (Exception e) {log.error (e.getMessage (), e);} return con } / * close connection * / public static void closeCon (ResultSet rs,PreparedStatement ps,Connection con) {if (rswatches null) {try {rs.close ();} catch (Exception e) {log.error ("close result set ResultSet exception!" + e.getMessage (), e);} if (psorified null) {try {ps.close ();} catch (Exception e) {log.error ("precompiled SQL statement object PreparedStatement close exception!" + e.getMessage (), e);} if (contextual null) {try {con.close ();} catch (Exception e) {log.error ("close connection object Connection exception!" + e.getMessage (), e);}

Write Spark code:

Package com.dt.spark.streamingimport com.dt.spark.common.ConnectPoolimport org.apache.spark.SparkConfimport org.apache.spark.streaming. {Seconds, StreamingContext} / * * take the ranking of website hot words as an example Write the processing result to MySQL * Created by dinglq on 2016-5-3. * / object WriteDataToMySQL {def main (args: Array [String]) {val conf = new SparkConf (). SetAppName ("WriteDataToMySQL") val ssc = new StreamingContext (conf,Seconds (5)) / / suppose the socket input data format is: searchKeyword,timeval ItemsStream = ssc.socketTextStream ("spark-master", 9999) / / change the input data into (searchKeyword) 1) var ItemPairs = ItemsStream.map (line = > (line.split (",") (0), 1)) val ItemCount = ItemPairs.reduceByKeyAndWindow ((v1line) Int) = > v1hands v2line seconds (60), Seconds (10) / / ssc.checkpoint ("/ user/checkpoints/") / val ItemCount = ItemPairs.reduceByKeyAndWindow ((v1jjjint v2jjnt) = > v1+v2, (v1lv2lInt) = > v1murv2seconds (60) Seconds (10)) / * * next we need to sort the frequency of hot words. DStream does not provide a method for sort. Then we can implement the transform function Use RDD's sortByKey to implement * / val hottestWord = ItemCount.transform (itemRDD = > {val top3 = itemRDD.map (pair = > (pair._2, pair._1)) .sortByKey (false) .map (pair = > (pair._2, pair._1)) .take (3) ssc.sparkContext.makeRDD (top3)}) hottestWord.foreachRDD (rdd = > {rdd.foreachPartition (partitionOfRecords = > {val conn = ConnectPool.getConnection conn.setAutoCommit (false)) / / set to manually submit val stmt = conn.createStatement (); partitionOfRecords.foreach (record = > {stmt.addBatch ("insert into searchKeyWord (insert_time,keyword,search_count) values (now (),'" + record._1+ ",'" + record._2+ "'));}) stmt.executeBatch (); conn.commit () / / commit transaction})}) ssc.start () ssc.awaitTermination () ssc.stop ()}}

Open netcat to send data

Root@spark-master:~# nc-lk 9999 HadoopPartice1111sparkPark 2222sparkWork 33333HadoopPart 1111sparkPark 222222 sparkPark 333333 Hadooppw1111spark2222sparkWork 33333HadoopPart 1111sparkPark 2222sparkPark

Run the spark code

Root@spark-master:~# / usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit-- class com.dt.spark.streaming.WriteDataToMySQL-- jars=mysql-connector-java-5.1.38.jar,commons-dbcp-1.4.jar. / spark.jar

View the results in the database:

Mysql > select * from searchKeyWord +-+ | insert_time | keyword | search_count | +-+ | 2016-05-03 | spark | 4 | | 2016-05-03 | hadoop | 2 | | 2016-05-03 | spark | 4 | | 2016-05-03 | hadoop | 2 | +-+ 4 rows in set (sec)

Note:

1. DT big data DreamWorks Wechat official account DT_Spark

2. IMF 8: 00 p.m. Big data actual combat YY live broadcast channel number: 68917580

3. Sina Weibo: http://www.weibo.com/ilovepains

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report