In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >
Share
Shulou(Shulou.com)06/01 Report--
Abstract
If we want to really master sparkSQL programming, we must first have an overall understanding of the overall framework of sparkSQL and what problems sparkSQL can help us solve, and then have a clear understanding of the relationship at various levels before we can really master it. The overall framework of sparkSQL has been introduced in the previous blog. If you have any questions about this area, you can see my previous blog: http://9269309.blog.51cto.com/9259309/1845525. This blog is mainly to explain and summarize the actual combat of sparkSQL, but not to explain the source code of sparkSQL. If you want to see the source code, please make a detour.
To say a little more, for beginners, I adhere to the view is not to look at the source code, this effect is not very great, but also a waste of time, do not have a general grasp of this thing, do not know what it is, look at the source code, the threshold is too high, and look at the source code for personal promotion is not very high. We do software development, our development order is also, the first is the requirements, have a detailed understanding of the requirements, what problems need to be solved, and then the design of the software, the writing of the code. Similarly, the learning framework is also, we only have the requirements of this framework, what problems it needs to solve, what it needs to do, have a very good understanding, and then look at the source code, so that the effect can be greatly improved. For reading the source code of this part, is my opinion, said right or wrong, welcome to complain.
1. SparkSQL level
When we want to use sparkSQL to solve our needs, it is actually simple and simple. We have gone through three steps: read the data-> process the data-> write the final result. Then the main classes used in these three steps are actually three: read in the data and write the final result using two classes, HiveContext and SQLContext, and use the DataFrame class to process the data, which is after you read the data into memory from the outside. The basic data structure in which the data is stored in memory will also be explained by some intermediate classes when processing the data. As shown in the following figure:
2. HiveContext and SQLContext
Put HiveContext and SQLContext together to explain because they are similar, because HiveContext inherits from SQLContext, why there are two such classes, in fact, and hive and sql have something to do with, although hive has HQL language, but it is a sql-like language, and sql language is still different, some sql syntax, HQL is not supported. So there are differences between them. Choose different classes, and the driver of the query engine executed is different. But if you don't elaborate on how the underlying layer is different, you just know that if you use different classes that read data, the bottom layer will mark it, automatically identify which class is used for data manipulation, and then use different execution plans to perform operations, which was introduced in the previous sparkSQL overall framework, but not here. When reading data from the hive library, you must use HiveContext to read the data, otherwise there will be some strange errors in the query. You can choose from both other data sources, but it's best to use SQLContext to do it. Because it supports more sql syntax. Since HiveContext inherits from SQLContext, only SQLContext is described in detail here, but the following methods can be used in HiveContext. In fact, the HiveContext class extends SQLContext's two methods that we can use (we cannot use the methods that start with protected and private when looking at the source code, this is the control logic of scala, on the contrary, the methods that are not marked with these two keywords are methods that we can use directly): analyze (tableName:String) and refreshTable (tableName:String).
Method usage analyze method we generally do not use, it is used to analyze the sql query statement we wrote, generally not used. RefreshTable method
When the storage location of a table we are dealing with in sparkSQL changes, but we cache the table in the in-memory metaData, we need to call this method to invalidate the cache and need to reload.
2.1 read data
We solve our needs, the first is to read the data, need to read the data into memory, read data SQLContext provides two methods, we provide two data tables, in order to facilitate the demonstration, I used to use JSON format for storage, written in this format, but can be saved as a .txt format file.
1. The first kind of data reading: this is the operation on the data source file.
Import org.apache.spark.sql.SQLContextval sql = new SQLContext (sc) / / declare an object of SQLContext to manipulate the data val peopleInfo = sql.read.json ("file path") / / where the result returned by peopleInfo is: org.apache.spark.sql.DataFrame = / / [age: bigint, id: bigint, name: string], so the data is read into memory
After writing what happens after writing these lines of code, sparkSQL first finds the file and parses it in the form of parsing json. At the same time, the order in which the fields of schema,scheam are formed through the key of json is not in the default order when we read in the data. As mentioned above, the order of the fields is reorganized by the order of strings. By default, integers are parsed into bigint and strings are parsed into string. When the data is read in through this method, the result is a DataFrame data type.
What is DataFrame? In fact, it is the basic and core data structure for sparkSQL to deal with big data. It is the basic data structure in which the sparkSQL reads the data into memory and the data is stored in memory. The storage it uses is in the form of tables similar to the database. Let's think about it, a data table consists of several parts: 1, data, this data is stored in a row, a record is a row, 2, the data dictionary of the data table, including the name of the table, table fields and field types and other metadata information. Then DataFrame is also stored on a row-by-row basis, and this class is Row, which stores data one by one. In general, the processing granularity is row granularity, and there is no need to operate on the intra-row data. It is also possible to operate the intra-row data alone, but you should be careful when dealing with the data in the row, because it is easy to make mistakes when dealing with the data in the row, such as selecting the wrong data, array out of bounds and so on. The form of data storage is available, and where are the fields and field types of the data table stored, that is, in schema. We can call schema to see what it stores.
The result returned by peopleInfo.schema// is: org.apache.spark.sql.types.StructType = / / StructType (StructField (age,LongType,true), StructField (id,LongType,true), / / StructField (name,StringType,true))
You can see that peopleInfo stores data, and schema stores information about these fields. Note that the field type of the table corresponds to the scala data type: bigint- > Long,int-> Int,Float-> Float,double-> Double,string-> String, and so on. A DataFrame consists of two parts: data stored in rows and scheam,schema of type StructType. When we have data but no schema, we can construct it in this form to form a DataFrame.
The read function also provides other interfaces for reading data:
Function use
Json (path:String)
Read json file use this method table (tableName:String) to read the table JDBC in the database (url: String,table: String,predicates:Array [String], connectionProperties:Properties)
Read the table in the database through jdbc Orc (path:String) read the file stored in orc format parquet (path:String) read the file stored in parquet format schema (schema:StructType) this is an optimization, when we specify its schema when we read the data, the underlying layer will not parse the schema again and optimize it. Generally, there is no need for such optimization, without this optimization, time efficiency is still acceptable.
2. The second method of reading data: this method of reading data mainly deals with selecting some fields from a data table, rather than selecting all the fields in the table. Well, for this kind of demand, it is more advantageous to use this data reading method. This way is to write sql's query statement directly. Save the data in the above json format to the table format in the database. It is important to note that this can only deal with database table data.
Val peopleInfo = sql.sql ("" | select | id, | name, | age | from peopleInfo ".stripMargin) / / where the stripMargin method is used to parse the sql statement we wrote. / / the result returned is the same as that returned by read: / / org.apache.spark.sql.DataFrame = / / [age: bigint, id: bigint, name: string]
It is important to note that the order of the fields returned in the schmea is not consistent with the order of our query.
2.2 write data
It is relatively easy to write data, because it has a certain mode, according to which data is written. In general, the data we need to write is of type DataFrame, and if it is not of type DataFrame, we need to convert it to
DataFrame type, some people may have doubts, the data is read into memory, its type is DataFrame type, we use methods in the DataFrame class when dealing with the data, but the method in DataFrame does not necessarily return the value of DataFrame type, and sometimes we need to build our own type, so we need to build DataFrame type for our data. There are two ways to build schema types from data without schema, as far as I know.
1. Build schema through classes, and take the peopleInfo above as an example.
Val sql = new SQLContext (sc) / / create a SQLContext object import sql.implicits._ / / this sql is the sql we defined above, not a jar package There are a lot of / / import sqlContext.implicits._, on the Internet, which they define as / / sqlContext = SQLContext (sc), which is a feature of scala val people = sc.textFile ("people.txt") / / We use the type of spark to read the data, because if we read it with / / SQLContext They automatically have schemacase clase People (id:Int,name:String,age:Int) / / define a class val peopleInfo = people.map (lines = > lines.split (","). Map (p = > People (p (0) .toInt, p (1), p (2) .toInt). ToDF / / such a toDF creates a DataFrame You can't use the toDF method without importing / / sql.implicits._,.
The above example uses scala's reflection technology to generate a DataFrame type. You can see that we convert RDD to DataFrame.
2. Construct schema directly, taking peopelInfo as an example. To construct directly, we need to convert our data type to Row type, otherwise an error will be reported.
Val sql = new SQLContext (sc) / / create a SQLContext object val people = sc.textFile ("people.txt") .map (lines = > lines.split (",")) val peopleRow = sc.map (p = > Row (p (0), p (1), (2)) / / convert RDD to RDD (Row) type val schema = StructType (StructFile ("id", IntegerType,true):: StructFile ("name", StringType) True):: StructFile ("age", IntegerType,true):: Nil) val peopleInfo = sql.createDataFrame (peopleRow,schema) / / the data / / type of each row of peopleRow must be the same as that of schema / / otherwise an error will be reported It is said that the type does not match / / and the length of each line of peopleRow should also be the same as that of schema, otherwise / / will also report an error.
Two classes StructType and StructFile are used to construct schema. The three parameters of StructFile class are (field name, type, whether the data can be filled with null).
Using direct construction is very restrictive, and it is OK to have fewer fields. if there are dozens or even more than a hundred fields, this method is more time-consuming. It is not only necessary to ensure that the type of data in Row is the same as our defined schema type, but also the same length, otherwise errors will be reported, so if you want to construct schema directly, you must be careful and more careful, I will be abused by my own carelessness, and the processed fields will be nearly a hundred. As the definition of schema and my data type is inconsistent, I need every field to confirm, field more in the right time is easy to fatigue, such a mistake, because I am relatively stupid, spent an afternoon, so more fields, in the direct construction of schema, must be careful, important things said three times, or else will die miserably.
Okay, now that we've converted our data to DataFrame, let's write our data to the database.
Write data operations:
Val sql = new SQLContext (sc) val people = sc.textFile ("people.txt") .map (lines = > lines.split (",")) val peopleRow = sc.map (p = > Row (p (0), p (1), (2)) val schema = StructType (StructFile ("id", IntegerType,true):: StructFile ("name", StringType,true):: StructFile ("age", IntegerType) True):: Nil) val peopleInfo = sql.createDataFrame (peopleRow,schema) peopleInfo.registerTempTable ("tempTable") / / only the table tempTable with this registration We / / can query through sql.sql (") / / this is to register a temporary table in memory and the user queries sql.sql.sql ("| insert overwrite table tagetTable | select | id, | name | | age | from tempTable ".stripMargin) / / thus writes the data to the database target table tagetTable |
As you can see above, sparkSQL's sql () is actually used to execute the sql statement we wrote.
Well, the read and write operations have been introduced above, and now we need to operate on the most important places.
2.3 manipulate the data through the methods in DataFrame
Before we introduce DataFrame, let's make it clear what sparkSQL is used for, what convenience it mainly provides for us, and why we use it. It is so that we can write code to deal with sql, this may be a bit inaccurate, if it is so simple, just a simple replacement of sql, if I, I will not learn it, because I already know sql, will deal with data warehouse etl through sql, I also learn why sparkSQL, and the cost of learning is so high. SparkSQL must be good, otherwise there wouldn't be this blog. We all know that the processing of data logic by writing sql is limited, and it is very flexible to write programs to deal with data logic, so sparkSQL is used to deal with data logic that cannot be processed by sql or data logic that is more complex to deal with with sql. The general principle is to use sql to deal with, try to use sql to deal with, after all, simple to develop, sql can not deal with, and then choose to use sparkSQL by writing code to deal with. All right, cut the crap and start your journey to DataFrame.
SparkSQL is so powerful that it provides all the functions that are being deleted and changed in our sql, each of which corresponds to a way to achieve this function.
Operation on schema
Val sql = new SQLContext (sc) val people = sql.read.json ("people.txt") / / people is an object of type DataFrame / / data is read in Let's take a look at its schema. The type returned by people.schema// is / / org.apache.spark.sql.types.StructType = / / StructType (StructField (age,LongType,true), / / StructField (id,LongType,true), / / StructField (name,StringType,true)) / / the result returned by schemapeople.dtypes// in an array: / / Array [(String, String)] = / / Array ((age,LongType), (id,LongType)) (name,StringType)) / / return the result returned by the field people.columns// in schema: / / Array [String] = Array (age, id, name) / / print out the result returned by schemapeople.printSchema// in the form of tree: / / root// |-- age: long (nullable = true) / / |-- id: long (nullable = true) / / |-- name: string (nullable = true)
The operation of the table, the operation statement of the table is generally not commonly used, because although sparkSQL encapsulates every function of sql lookup into a method, it is not flexible to deal with it. In general, we use the sql () method to write sql directly, which is more practical and more flexible, and the readability of the code is also very high. Then let's give a brief description of the methods that can be used.
Method (sql causes us to define sql = new SQLContext (sc)) df is a DataFrame object instance sql.read.table (tableName)
Read the data of a table df.where (), df.filter ()
Filter condition, which is equivalent to the where part of sql
Usage: select the fields that are older than 20 in the age field.
Return value type: DataFrame
Df.where ("age > = 20"), df.filter ("age > = 20")
Df.limit ()
Limit the number of rows output, corresponding to the limit of sql
Usage: limit output to 100 lines
Return value type: DataFrame
Df.limit (100)
Df.join ()
Link operation, which is equivalent to join of sql
For join operations, the following will be described separately
Df.groupBy ()
Aggregation operation, which is equivalent to groupBy of sql
Usage: aggregate a few lines
Return value type: DataFrame
Df.groupBy ("id")
Df.agg () is used to find the relevant functions for aggregation. Df.intersect (other:DataFrame) is described in detail below.
Find the intersection of two DataFrame df.except (other:DataFrame) find the line df.withcolumn (colName:String,col:Column) in df but not in other
Add a column
Df.withColumnRenamed (exName,newName) renames the name of a column
Df.map ()
Df.flatMap
Df.mapPartitions ()
Df.foreach ()
Df.foreachPartition ()
Df.collect ()
Df.collectAsList ()
Df.repartition ()
Df.distinct ()
Df.count ()
These methods are the basic operations of spark's RDD, and these methods are also encapsulated in the DataFrame class. It should be noted that the return value of these methods is of type RDD, not of type DataFrame. In the use of these methods, you must remember the return type clearly, otherwise it will be prone to error df.select ().
Select a few columns of elements, which is equivalent to the function of sql's select
Usage: returns selected columns of data
Return value type: DataFrame
Df.select ("id", "name")
Both of the above are basic methods for writing. Here's a detailed description of join and agg,na,udf operations.
2.4join operation of sparkSQL
Spark's join operation is not as flexible as writing sql's join operation directly. When linking, fields in two tables cannot be renamed, so two identical fields appear in the same table. The following two tables are used to expand bit by bit, one is the user information table, and the other is the user's income and salary scale:
1. Inner join, equivalent link, which will merge the linked columns into one column.
Val sql = new SQLContext (sc) val pInfo = sql.read.json ("people.txt") val pSalar = sql.read.json ("salary.txt") val info_salary = pInfo.join (pSalar, "id") / / single field internal connection val info_salary1 = pInfo.join (pSalar,Seq ("id", "name")) / / Multi-field link
The returned result is as follows:
A single id is linked (two name fields appear in a table) and two fields are linked
2, join also supports left join and right link, but its left link and right link and our sql link is the same, also in the link when the field can not be renamed, if the two tables have the same field, it will appear in the same join table, colleagues left and right link, will not merge the fields used for the link. Key words used in the link: outer,inner,left_outer,right_outer
/ / single-field link val left = pInfo.join (pSalar,pInfo ("id") = pSalar ("id"), "left_outer") / / Multi-field link val left2 = pInfo.join (pSalar,pInfo ("id") = = pSalar ("id") and pInfo ("name") = = pSalar ("name"), "left_outer")
The returned result:
Single field link multi-field link
From the above, it can be found that the join operation of sparkSQL is still not as flexible as sql's join, and it is easy to have duplicate fields in the same table. Generally, when we link, we first use the registerTempTable () function to register this DataFrame as an internal table, and then link through the method of sql.sql ("") to write sql, which can better solve the problem of duplicate fields.
Agg operation of 2.5 sparkSQL
Where the agg of sparkSQL is an expression of sparkSQL aggregation operation. When we call agg, it is usually used together with groupBy (). The data table of the select operation is:
Val pSalar = new SQLContext (sc) .read.json ("salary.txt") val group = pSalar.groupBy ("name"). Agg ("salary"-> "avg") val group2 = pSalar.groupBy ("id", "name"). Agg ("salary"-> "avg") val group3 = pSalar.groupBy ("name") .agg (Map ("id"-> "avg", "salary"-> "max"))
The results are as follows:
Result group2 group3 of group
When using agg, it should be noted that the same field cannot be operated twice, for example: agg (Map ("salary"-> "avg", "salary"-> "max"). He can only calculate the operation of max. The reason is very simple. The parameter connected to agg is a key-value pair of Map type. When the key is the same, the previous value will be overwritten. You can also use agg directly, which is true for all lines. The calculation parameters used for aggregation are: avg,max,min,sum,count, not avg, which is only used in the example
2.6na operation of sparkSQL
The na method of sparkSQL returns a DataFrameFuctions object, which is mainly an operation on a row with a value of null in DataFrame, providing only three methods: drop () to delete the row, fill () to populate the row, and replace () to replace the row operation. It's simple. Don't make too many introductions.
3. Summary
The purpose of using sparkSQL is to solve the problems that can not be solved or difficult to solve with writing sql. In the usual development process, we can't use sparkSQL for all kinds of sql problems with high pressure, which is not the most efficient. Using sparkSQL mainly takes advantage of the flexibility of writing code to deal with data logic, but we can't completely use only the sql method provided by sparkSQL, which also goes to the other extreme. As can be seen from the above discussion, when using join operation, if you use sparkSQL's join operation, there are a lot of disadvantages. In order to combine the advantages of the sql statement, we can first register the DataFrame object to be linked as an internal intermediate table, and then execute the sql we write by writing the sql statement and using the sql () method provided by SQLContext, which is more reasonable and efficient. In the working development process, we have to combine the respective strengths of writing code and writing sql to deal with our problems, which will be more efficient.
It took me two weeks to write this blog. Because I am busy with my work, I can only think and sum up in my spare time. It can also be regarded as an account of his own study. With regard to the two classes of sparkSQL, HiveContext and the udf method provided by SQLContext, if we make good use of the udf method, we can make the development of our code more concise and efficient, and the readability is also very strong. Due to registering the udf method in the code, there are still many detailed knowledge points to pay attention to, so I am going to write another blog to introduce it in detail.
I'm exhausted. I've been at home for two days. It's time to go for a walk!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.