Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

RDD conversion DataSet of 11.spark sql

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Brief introduction

   Spark SQL provides two ways to convert RDD to Dataset.

Using reflection mechanism to infer the data structure of RDD

   this approach can be used when spark applications can infer RDD data structures. This reflection-based approach can make the code more concise and efficient.

Construct a data structure through a programming interface and then map it to RDD

   this approach can be used when spark applications cannot infer RDD data structures.

Reflection mode scala// For implicit conversions from RDDs to DataFramesimport spark.implicits._// Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF = spark.sparkContext .textFile ("examples/src/main/resources/people.txt") .map (_ .split (",") .map (attributes = > Person (attributes (0)) Attributes (1) .trim.toInt) .toDF () / / Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView ("people") / / SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql ("SELECT name") Age FROM people WHERE age BETWEEN 13 AND 19 ") / / The columns of a row in the result can be accessed by field indexteenagersDF.map (teenager = >" Name: "+ teenager (0)). Show () / / +-+ / / | value | / / +-+ / / | Name: Justin | / / +-+ / / or by field nameteenagersDF.map (teenager = >" Name : "+ teenager.getAs [String] (" name "). Show () / / +-+ / / | value | / / +-+ / / | Name: Justin | / / +-+ / / No pre-defined encoders for data [Map [K] V]], define explicitlyimplicit val mapEncoder = org.apache.spark.sql.Encoders.kryo [Map [String, Any]] / / Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder [Map [String, Any]] = ExpressionEncoder () / / row.getValuesMap [T] retrieves multiple columns at once into a Map [String, T] teenagersDF.map (teenager = > teenager.getValuesMap [Any] (List ("name", "age")). Collect () / / Array (Map ("name"-> "Justin") "age"-> 19)) javaimport org.apache.spark.api.java.JavaRDD Import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.Encoder;import org.apache.spark.sql.Encoders / / Create an RDD of Person objects from a text fileJavaRDD peopleRDD = spark.read () .textFile ("examples/src/main/resources/people.txt") .javaRDD () .map (line-> {String [] parts = line.split (","); Person person = new Person (); person.setName (parts [0]); person.setAge (Integer.parseInt (parts [1]. Trim ()); return person;})) / / Apply a schema to an RDD of JavaBeans to get a DataFrameDataset peopleDF = spark.createDataFrame (peopleRDD, Person.class); / / Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView ("people"); / / SQL statements can be run by using the sql methods provided by sparkDataset teenagersDF = spark.sql ("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); / / The columns of a row in the result can be accessed by field indexEncoder stringEncoder = Encoders.STRING () Dataset teenagerNamesByIndexDF = teenagersDF.map ((MapFunction) row-> "Name:" + row.getString (0), stringEncoder); teenagerNamesByIndexDF.show () / / +-+ / / | value | / / +-+ / / | Name: Justin | / / +-+ / / or by field nameDataset teenagerNamesByFieldDF = teenagersDF.map ((MapFunction) row-> "Name:" + row.getAs ("name"), stringEncoder); teenagerNamesByFieldDF.show () / / +-+ / / | value | / / +-+ / / | Name: Justin | / / +-+ pythonfrom pyspark.sql import Rowsc = spark.sparkContext# Load a text file and convert each line to a Row.lines = sc.textFile ("examples/src/main/resources/people.txt") parts = lines.map (lambda l: l.split (" People = parts.map (lambda p: Row (name=p [0], age=int (p [1])) # Infer the schema, and register the DataFrame as a table.schemaPeople = spark.createDataFrame (people) schemaPeople.createOrReplaceTempView ("people") # SQL can be run over DataFrames that have been registered as a table.teenagers = spark.sql ("SELECT name FROM people WHERE age > = 13 AND age")

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report