In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Abstract
One of the processes we are using spark is to use the spark.sql () function to read the data into memory to form DataSet [Row] (DataFrame) because Row is a new spark dataset that cannot be automatically encoded, so we need to encode this dataset in order to use these operators for related operations. How to code is a problem. Here we summarize these problems. Error reported: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
Generally, this error is reported that the data type of the return value when we use the operator is often not the automatic coding part that spark can complete through its own reflection. For example, through the map operator, the return value type of the function of the map operator is of Map type, and the above problem will occur, because the Map collection class is not within the scope of basic types and String,case class and tuples, and spark cannot be automatically encoded through reflection.
The cause of this problem
Later versions of spark2.0 adopt the new distributed dataset DataSet, where DataFrame is an alias for DataSet [Row]. The new dataset uses many optimizations, one of which is the use of Tungsten execution engine's computing engine, which uses a lot of optimizations. One of them is to maintain a memory manager, which frees the computing from the java jvm and greatly improves the memory optimization. At the same time, the new computing engine stores the data in memory in binary form, and most of the calculations are carried out on the binary data stream. There is no need to deserialize the binary data stream into java objects, and then serialize the calculation results into binary data streams, but operate directly on the binary stream. This situation requires that we have a mechanism, that is, the mapping of java objects to binary data streams, otherwise we do not know how many bytes the data objects corresponding to binary streams are. Spark is done through Encoders, and spark itself completes part of the automatic coding process through reflection: basic types and String,case class and tuples, for other collection types or for our custom classes. He can't do this kind of coding. We need to define this code ourselves, that is, to have a schema.
The way to solve this problem
Method 1:
In this way, it is converted to RDD and operated with RDD, but this is not recommended. Compared with RDD,DataSet, it has a lot of low-level optimization and has good performance.
Val orderInfo1 = spark.sql (
"
| | SELECT |
| | o.id |
| | o.user_id |
| | FROM default.api_order o |
| | limit 100 |
"" .stripMargin) .rdd.map (myfunction)
Method 2:
Let it automatically convert the data set [Row] to DataSet [P] if complex types appear in the Row.
Case class Orders (id: String, user_id: String)
/ / this case class is to be defined outside our singleton object
Object a {
Def main (args: Array [String]): Unit = {
Import spark.implicits._
Val orderInfo1 = spark.sql (
"
| | SELECT |
| | o.id |
| | o.user_id |
| | FROM default.api_order o |
| | limit 100 |
"" .stripMargin) .as [orders] .map (myfunction)
}
}
Method 3:
Customize a schema and then encode it using RowEncoder. This is just an example, and all the types in it can be encoded automatically through spark reflection.
Import spark.implicits._
Val schema = StructType (StructType (Seq (StructField ("id", StringType,true), StructField ("user_id", StringType,true)
Val encoders = RowEncoder (schema)
Val orderInfo1 = spark.sql (
"
| | SELECT |
| | o.id |
| | o.user_id |
| | FROM default.api_order o |
| | limit 100 |
"" .stripMargin) .map (row = > row) (encoders)
Method 4:
It is possible to directly use scala's pattern matching strategy case Row, because of the knowledge of case Row () scala pattern matching, so that you can know how many basic types there are in the set Row, then you can complete the automatic coding of Row through scala, and then you can deal with it accordingly.
Import spark.implicits._
Val orderInfo1 = spark.sql (
"
| | SELECT |
| | o.id |
| | o.user_id |
| | FROM default.api_order o |
| | limit 100 |
"" .stripMargin) .map {case Row (id: String, user_id: String) = > (id,user_id)}
The resulting schema is:
OrderInfo1: org.apache.spark.sql.Dataset [(String, String)] = [_ 1: string, _ 2: string]
If it were changed like this:
Val orderInfo1 = spark.sql (
"
| | SELECT |
| | o.id |
| | o.user_id |
| | FROM default.api_order o |
| | limit 100 |
"" .stripMargin) .map {case Row (id: String, user_id: String) = > List (id,user_id)}
The resulting schema is:
OrderInfo1: org.apache.spark.sql.Dataset [list [string]] = [value: array]
It can be seen that spark regards Yuanzu as a special form of case class, and the field name of schame is a special case clase such as _ 1 ~ # _ 2.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.