In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article is to share with you what sparl sql has. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.
1. Read files in json format and create DataFrame
Java (spark1.6)
Public static void main (String [] args) {SparkConf conf = new SparkConf (); conf.setMaster ("local"). SetAppName ("javaSpark01"); SparkContext sc = new SparkContext (conf); SQLContext sqlContext = new SQLContext (sc); / / Dataset df = sqlContext.read (). Format ("json"). Load ("G:/idea/scala/spark02/json") Dataset df2 = sqlContext.read (). Json ("G:/idea/scala/spark02/json"); df2.show (); / / display schema information df2.printSchema () in tree form; / / register temporary table to register DataFrame as a temporary table, which is temporarily registered in memory and is a logical table that will not be atomized to disk df2.registerTempTable ("baidukt_table"). Dataset sql = sqlContext.sql ("select * from baidukt_table"); sql.show (); Dataset sql1 = sqlContext.sql ("select age,count (1) from baidukt_table group by age"); sql1.show ();}
Scala (spark 1.6)
Def main (args: Array [String]): Unit = {val conf = new SparkConf () conf.setMaster ("local"). SetAppName ("Spark08 1.6") val sc = new SparkContext (conf) val sqlContext: SQLContext = new SQLContext (sc) val df = sqlContext.read.format (" json "). Load (" G:/idea/scala/spark02/json ") / / val df1 = sqlContext.read.json (" G:/idea/scala/spark02/json ") ") / / display the first 50 rows of data df.show (50) / / display schema information df.printSchema () / / register temporary table df.registerTempTable (" baidukt_com_table ") val result = sqlContext.sql (" select age ") Count (1) from baidukt_com_table group by age ") result.show () val result1 = sqlContext.sql (" select * from baidukt_com_table ") result1.show () sc.stop ()}
Java (spark 2.0 +)
Public static void main (String [] args) {SparkConf conf = new SparkConf (). SetMaster ("local"). SetAppName ("Spark 2.0 + +"); SparkSession spark = SparkSession.builder (). Config (conf). GetOrCreate (); Dataset df = spark.read (). Json ("G:/idea/scala/spark02/json"); / / Dataset df1 = spark.read (). Format ("json"). Load ("G:/idea/scala/spark02/json") Df.show (); df.printSchema (); df.createOrReplaceGlobalTempView ("baidu_com_spark2"); Dataset resut = spark.sql ("select * from baidu_com_spark2"); resut.show (); spark.stop ();}
Scala (spark 2.0 +)
Def main (args: Array [String]): Unit = {/ / the user's current working directory / / val location = System.setProperties ("user.dir") "spark_2.0" val conf = new SparkConf (). SetAppName ("Spark08 2.0 +"). SetMaster ("local [3]") val spark: SparkSession = SparkSession.builder (). Config (conf). GetOrCreate () / / data import method val df: DataFrame = spark.read.json ("G:/idea/scala/spark02/json") / / val df1: DataFrame = spark.read.format ("json"). Load ("G:/idea") / scala/spark02/json ") / / View table df.show () / / View table df.printSchema () / / query directly using spark SQL / / register as temporary table / / createOrReplaceTempView: create temporary view The life cycle of this view is associated with the [SparkSession] used to create this dataset. / / createGlobalTempView: create a global temporary view where the life cycle of the graph is bound to Spark Application. Df.createOrReplaceTempView ("people") val result: DataFrame = spark.sql ("select * from people") result.show () spark.stop ()
2. Create DataFrame through RDD in json format
Java (spark 1.6)
Public static void main (String [] args) {SparkConf conf = new SparkConf (); conf.setMaster ("local"). SetAppName ("jsonRDD"); JavaSparkContext sc = new JavaSparkContext (conf); SQLContext sqlContext = new SQLContext (sc) JavaRDD data = sc.parallelize (Arrays.asList ("{\" name\ ":\" zhangsan\ ",\" score\ ":\" 100\ "}", "{\" name\ ":\" lisi\ ",\" score\ ":\" 200\ "}", "{\" name\ ":\" wangwu\ ",\" score\ ":\" 300\ "}") Dataset df = sqlContext.read (). Json (data); df.show (); df.printSchema (); df.createOrReplaceTempView ("baidu_com_spark2"); Dataset resut = sqlContext.sql ("select * from baidu_com_spark2"); resut.show (); sc.stop ();}
Scala (spark 1.6)
Def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local"). SetAppName ("spark10") val sc = new SparkContext (conf) val sqlContext = new SQLContext (sc) val data: RDD [String] = sc.parallelize (Array ("{\" name\ ":\" zhangsan\ ",\" age\ ": 18}", "{\" name\ ":\" lisi\ ",\" age\ ": 19}" "{\" name\ ":\" wangwu\ ",\" age\ ": 20}") val df = sqlContext.read.json (data) df.show () df.printSchema () df.createOrReplaceTempView ("baidukt_com_spark1.6") val result = sqlContext.sql ("select * from baidukt_com_spark1.6") result.show () result.printSchema () sc.stop ()}
3. Create DataFrame with RDD in non-json format
3.1 convert RDD in non-json format to DataFrame by reflection (not recommended, so don't copy the code)
3.2.State creation Schema converts non-json format RDD to DataFrame
4. Read parquet files to create DataFrame (not recommended for multiple io)
5. Read the data in JDBC and create DataFrame (MySql as an example)
Java (spark 1.6)
Public static void main (String [] args) {SparkConf conf = new SparkConf (); conf.setMaster ("local"). SetAppName ("mysql"); JavaSparkContext sc = new JavaSparkContext (conf); SQLContext sqlContext = new SQLContext (sc); / * * the first way to read MySql database tables, loaded as DataFrame * / Map options = new HashMap () Options.put ("url", "jdbc:mysql://localhost:3306/spark"); / / connection address and database name options.put ("driver", "com.mysql.jdbc.Driver"); / / driver options.put ("user", "root"); / / username options.put ("password", "admin"); / / password options.put ("dbtable", "person") / Table Dataset person = sqlContext.read () .format ("jdbc") .options (options) .load (); person.show (); / register temporary table person.registerTempTable ("person"); / * the second way to read the MySql data table is loaded as DataFrame * / DataFrameReader reader = sqlContext.read () .format ("jdbc") Reader.option ("url", "jdbc:mysql://localhost:3306/spark"); reader.option ("driver", "com.mysql.jdbc.Driver"); reader.option ("user", "root"); reader.option ("password", "admin"); reader.option ("dbtable", "score"); Dataset score = reader.load (); score.show () Score.registerTempTable ("score"); Dataset result = sqlContext.sql ("select person.id,person.name,score.score from person,score where person.name = score.name"); result.show (); / * Save DataFrame results in Mysql * / Properties properties = new Properties (); properties.setProperty ("user", "root") Properties.setProperty ("password", "admin"); result.write (). Mode (SaveMode.Overwrite) .JDBC ("jdbc:mysql://localhost:3306/spark", "result", properties); sc.stop ();}
Scala (spark 1.6)
Def main (args: Array [String]): Unit = {val conf = new SparkConf () conf.setMaster ("local"). SetAppName ("mysql") val sc = new SparkContext (conf) val sqlContext = new SQLContext (sc) / * the first way to read Mysql database tables and create DF * / val options = new mutable.HashMap [String,String] () Options.put ("url", "jdbc:mysql://localhost:3306/spark") options.put ("driver", "com.mysql.jdbc.Driver") options.put ("user", "root") options.put ("password", "admin") options.put ("dbtable") "person") val person = sqlContext.read.format ("jdbc") .options (options). Load () person.show () person.registerTempTable ("person") / * the second way to read Mysql database tables and create DF * / val reader = sqlContext.read.format ("jdbc") reader.option ("url", "jdbc:mysql://localhost:3306/spark") reader.option ("driver") "com.mysql.jdbc.Driver") reader.option ("user", "root") reader.option ("password", "admin") reader.option ("dbtable", "score") val score = reader.load () score.show () score.registerTempTable ("score") val result = sqlContext.sql ("select person.id,person.name,score.score from person" Score where person.name = score.name ") result.show () / * write data to the Mysql table * / val properties = new Properties () properties.setProperty (" user "," root ") properties.setProperty (" password "," admin ") result.write.mode (SaveMode.Append) .JDBC (" jdbc:mysql://localhost:3306/spark "," result ", properties) sc.stop ()}
6. Read the data in Hive and load it into DataFrame
HiveContext is a subclass of SQLContext, and it is recommended to use HiveContext to connect to Hive.
Java (spark 1.6)
Public static void main (String [] args) {SparkConf conf = new SparkConf (); conf.setAppName ("hive"); JavaSparkContext sc = new JavaSparkContext (conf); / / HiveContext is a subclass of SQLContext. HiveContext hiveContext = new HiveContext (sc); hiveContext.sql ("USE spark"); hiveContext.sql ("DROP TABLE IF EXISTS student_infos"); / / create student_infos table hiveContext.sql ("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by'\ t'") in hive; hiveContext.sql ("load data local inpath'/ root/test/student_infos' into table student_infos") HiveContext.sql ("DROP TABLE IF EXISTS student_scores"); hiveContext.sql ("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by'\ t'"); hiveContext.sql ("LOAD DATA LOCAL INPATH'/ root/test/student_scores INTO TABLE student_scores") / * query table generates DataFrame * / Dataset goodStudentsDF = hiveContext.sql ("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score > = 80"); hiveContext.sql ("DROP TABLE IF EXISTS good_student_infos"); goodStudentsDF.registerTempTable ("goodstudent"); Dataset result = hiveContext.sql ("select * from goodstudent") Result.show (); result.printSchema (); / * Save the results to the hive table good_student_infos * / goodStudentsDF.write (). Mode (SaveMode.Overwrite) .saveAsTable ("good_student_infos"); Row [] goodStudentRows = hiveContext.table ("good_student_infos"). Collect () For (Row goodStudentRow: goodStudentRows) {System.out.println (goodStudentRow);} sc.stop ();}
Scala (spark 1.6)
Def main (args: Array [String]): Unit = {val conf = new SparkConf () conf.setAppName ("HiveSource") val sc = new SparkContext (conf) / * HiveContext is a subclass of SQLContext. * / val hiveContext = new HiveContext (sc) hiveContext.sql ("use spark") hiveContext.sql ("drop table if exists student_infos") hiveContext.sql ("create table if not exists student_infos (name string) Age int) row format delimited fields terminated by'\ t' ") hiveContext.sql (" load data local inpath'/ root/test/student_infos' into table student_infos ") hiveContext.sql (" drop table if exists student_scores ") hiveContext.sql (" create table if not exists student_scores (name string) " Score int) row format delimited fields terminated by'\ t' ") hiveContext.sql (" load data local inpath'/ root/test/student_scores' into table student_scores ") val df = hiveContext.sql (" select si.name,si.age,ss.score from student_infos si ") Student_scores ss where si.name = ss.name ") hiveContext.sql (" drop table if exists good_student_infos ") / * write the result to the hive table * / df.write.mode (SaveMode.Overwrite) .saveAsTable (" good_student_infos ") sc.stop ()}
7. Custom udf
Scala (spark 1.6)
Def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local"). SetAppName ("spark13") val spark = SparkSession.builder (). Config (conf). GetOrCreate () / / rdd to df val rdd: RDD [String] = spark.sparkContext.parallelize (Array ("zhangsan", "wangwu", "masi")) val rowRDD: RDD [Row] = rdd.map (RowFactory.create (_) val schema = DataTypes.createStructType (Array (StructField ("name")) StringType,true)) val df: DataFrame = spark.sqlContext.createDataFrame (rowRDD,schema) df.show (50) df.printSchema () df.createOrReplaceTempView ("test") / / Custom udf function named StrLen Parameter is String. There is a problem with Int String. It is said on the Internet that the java.lang.String type / / spark.sqlContext.udf.register ("StrLen", (sburestringjigjint) = > {s.length+i}) / / spark.sqlContext.udf.register ("StrLen", (i:Int) = > {I}) / / spark.sql ("select name, StrLen (name,10) as length from test"). Show (20) spark.sql ("select name, StrLen (10) as length from test"). Show (20)}
Java (spark 1.6)
Public static void main (String [] args) {SparkConf conf = new SparkConf (); conf.setMaster ("local"); conf.setAppName ("udf"); JavaSparkContext sc = new JavaSparkContext (conf); SQLContext sqlContext = new SQLContext (sc); JavaRDD parallelize = sc.parallelize (Arrays.asList ("zhansan", "lisi", "wangwu")) JavaRDD rowRDD = parallelize.map (new Function () {private static final long serialVersionUID = 1L; @ Override public Row call (String s) throws Exception {return RowFactory.create (s);}}); List fields = new ArrayList (); fields.add (DataTypes.createStructField ("name", DataTypes.StringType,true)) StructType schema = DataTypes.createStructType (fields); Dataset df = sqlContext.createDataFrame (rowRDD, schema); df.registerTempTable ("user"); / * decide which UDF UDF1,UDF2 to implement based on the number of arguments to the UDF function. UDF1xxx * / sqlContext.udf () .register ("StrLen", new UDF1 () {private static final long serialVersionUID = 1L; @ Override public Integer call (String T1) throws Exception {return t1.length ();}}, DataTypes.IntegerType); sqlContext.sql ("select name, StrLen (name) as length from user") .show () Sc.stop ();} Thank you for your reading! This is the end of this article on "what sparl sql has". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, you can share it for more people to see!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.