Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

An example of converting spark rdd to dataframe and writing to mysql

2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

Dataframe is a new api introduced in spark1.3.0, which gives spark the ability to handle large-scale structured data, and is said to be twice as fast as the original RDD conversion method. Spark can convert rdd to dataframe in offline batch processing or real-time computing and manipulate data through simple sql commands. For those who are familiar with sql, it is very convenient in the conversion and filtering process, and there can even be higher-level applications. For example, in real-time, the topic name and sql statement of kafka are passed in. The background reads the configured content field and reflects it into a class and uses the incoming and outgoing sql to calculate the real-time data. In this case, people who do not know how to spark streaming can easily enjoy the benefits of real-time computing.

The following example is the process of reading a local file into rdd and implicitly converting it to dataframe to query the data, and finally writing to the mysql table as an append. the scala code example is as follows

Import java.sql.Timestampimport org.apache.spark.sql. {SaveMode, SQLContext} import org.apache.spark. {SparkContext, SparkConf} object DataFrameSql {case class memberbase (data_date:Long,memberid:String,createtime:Timestamp,sp:Int) extends Serializable {override def toString: String= "% d\ t% s\ t% s\ t% d" .format (data_date,memberid,createtime) Sp)} def main (args: array [string]): Unit = {val conf = new SparkConf () conf.setMaster ("local [2]") / /-- / / Parameter spark.sql.autoBroadcastJoinThreshold sets whether a table should be broadcast. Default 10m, set to-1 indicates whether to disable / / spark.sql.codegen whether to precompile sql into java bytecode, long-term or frequent sql has optimization effect / / number of row processed by spark.sql.inMemoryColumnarStorage.batchSize at a time Be careful if oom / / spark.sql.inMemoryColumnarStorage.compressed sets whether column storage in memory needs to be compressed / /-- conf.set ("spark.sql.shuffle.partitions", "20") / / default partition is 200x200 conf.setAppName ("dataframe test") val sc = new SparkContext (conf) val sqc = new SQLContext (sc) val ac = sc.accumulator (0 "fail nums") val file = sc.textFile ("src\\ main\\ resources\\ 000000room0") val log = file.map (lines = > lines.split (")) .filter (line = > if (line.length! = 4) {/ / do a simple filter ac.add (1) false} else true) .map (line = > memberbase (line (0). ToLong, line (1), Timestamp.valueOf (line (2) Line (3) .toInt) / / method 1. Using implicit conversion import sqc.implicits._ val dftemp = log.toDF () / / conversion / * method 2. Using createDataFrame method, internal reflection is used to obtain fields and their types val dftemp = sqc.createDataFrame (log) * / val df = dftemp.registerTempTable ("memberbaseinfo") / * val sqlcommand = "select date_format (createtime,'yyyy-MM') as mm Count (1) as nums "+" from memberbaseinfo group by date_format (createtime,'yyyy-MM') "+" order by nums desc,mm asc "* / val sqlcommand=" select * from memberbaseinfo "val sel = sqc.sql (sqlcommand) val prop = new java.util.Properties prop.setProperty (" user "," etl ") prop.setProperty (" password " "xxx") / / call DataFrameWriter to write data to mysql val dataResult = sqc.sql (sqlcommand) .write.mode (SaveMode.Append) .JDBC ("jdbc:mysql://localhost:3306/test", "t_spark_dataframe_test", prop) / / Table may not exist println (ac.name.get+ "" + ac.value) sc.stop ()}}

The sample data in the above code textFile is as follows. The data comes from hive. The field information is partition number, user id, registration time, and third party number.

20160309 45386477 2012-06-12 20:13:15 90143820160309 453909772012-06-12 22:38:06 90103620160309 45446677 2012-06-14 21:57:39 90143820160309 45464977 2012-06-15 13:42:55 90143820160309 455723772012-06-18 14:55:03 9026020160309 45620577 2012-06-20 00:21:09 90260620160309 45628377 2012-06-20 10:48:05 9011820160309 45628877 2012-06-20 1111 1014 90260620160309 45628877 2012-06-20 1111 15 90260620160309 66667777 2012-06-1821 20160309 45687077 2012-06-22 11:23:22 902607

Note the field type mapping, that is, the mapping from case class class to dataframe. The screenshot from the official website is as follows

For more details, please see the official document Spark SQL and DataFrame Guide.

The above spark rdd to dataframe into mysql example explanation is the editor to share with you all the content, I hope to give you a reference, but also hope that you support more.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report