In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Abstract
When sparkSQL uses cache caching, sometimes the cache may not work and may send out the feeling that the cache is fake. Now let's talk about this problem.
problem
Scene description
When we count and process data through spark, we find that it is delayed. If there are multiple action in an application, and when the data source processes data from the same data source, the data source takes time to filter the data. Because there are multiple action operations, each action is a job, and each action will perform the operation of the data source to obtain data, because there is a time difference between the two action operations. The data obtained by the two action may be inconsistent.
For example, the following example
Data in the test1 table
2018-07-01 10:10:03
2pm 2018-07-01 11:12:04
The code is as follows
Val odsData = spark.sql ("
Select
From default.test1
Where time < "2018-07-02"
"")
Val targetData = odsData.map (fun _)
Val targetData.createOrReplaceTempView ("data1")
/ / the first Action operation
Val spark.sql ("
Insert overwrite table default.test2
*
From data1
"")
Val targetData1 = odsData.map (fun2 _) / / reference the same data source
TargetData1.createOrReplaceTempView ("data2")
/ / second action operation
Val spark.sql ("
Insert table default.test2
*
From data2
"")
If a record is added to the test1 table before the second Action operation is run, it is 13:12:04.
That is, the record is still two 1s and 2s when the first Action is executed, and after the first Action and before the second Action is executed.
A new list has been added: 3BI 2018-07-01 13:12:04
So what is the data like in the test2 table?
The first case (because the second action is insert, not insert overwrite)
2018-07-01 10:10:03
2pm 2018-07-01 11:12:04
2018-07-01 10:10:03
2pm 2018-07-01 11:12:04
The second case
2018-07-01 10:10:03
2pm 2018-07-01 11:12:04
2018-07-01 10:10:03
2pm 2018-07-01 11:12:04
3pm 2018-07-01 13:12:04
Result analysis
The result is the second case. If you think it is the first case, you are still not familiar with the implementation plan of spark. First of all, the spark is calculated by lazy, that is, the action operation is not triggered and the job is not submitted. In this application, there are two action, and these two aciton use the rdd of the same data source, which should be called variable odsData. When you encounter the first action, it will execute all the rdd on the execution chain, including odsData, and when it encounters the second aciton, it will also execute the data on its execution chain again, including odsData, and re-fetch the data from the data source. Some people will wonder, when the first action is executed, the odsData has already been executed, so shouldn't the results of this RDD be cached? Personally, spark is not so intelligent, and the division of job,stage,rdd,task that is often said on the Internet should be carried out in the same job. However, the stage split of boast job in the same application does not exist. So what should we do with this result?
The appearance of cache
When this happens, my application will leak dozens of pieces of data every day, which is very annoying. I finally found the above problem. When I wanted to solve the problem, the first thing I thought of was cache. I cached the odsData the first time I performed the Action operation, so there shouldn't be any problem. Thus, the data consistency of the two action operations and the same data source can be guaranteed. Let's just say too young to sample. This will not solve the problems above. Again, take an example.
Data in the test table:
1 2017-01-01 01:00:00 2016-05-04 9999-12-31
2 2017-01-01 02:00:00 2016-01-01 9999-12-31
Code:
Val curentData = spark.sql (
"
| | select |
| *
| | from default.test |
"" .stripMargin)
CurentData.cache () / / cache our results
CurentData.createOrReplaceTempView ("dwData")
/ / the first Action
Spark.sql (
"
| | INSERT OVERWRITE TABLE default.test1 |
| | SELECT |
| |
| | FROM dwData |
"" .stripMargin)
/ / change the data of the data source table test table and be the second Action
Spark.sql (
"
| | INSERT OVERWRITE TABLE default.test |
| | SELECT |
| | 1 |
| | "2017"
| | "2018"
| | "2018"
| | FROM default.test |
"" .stripMargin)
/ / the third Action and the first Action have the same data source, and the result of the first run of cache.
Spark.sql (
"
| | INSERT OVERWRITE TABLE default.test1 |
| | SELECT |
| |
| | FROM dwData |
"" .stripMargin)
So the results in the test1 table
The first situation:
1 2017-01-01 01:00:00 2016-05-04 9999-12-31
2 2017-01-01 02:00:00 2016-01-01 9999-12-31
The second case
1 2017 2018 2018
1 2017 2018 2018
Result analysis
The result is the second case, that is, our cache doesn't work at all, or the third Action doesn't use our cache data at all. I typed all the logs this time.
The first Action declaration cycle:
Log of the third Action:
As you can see from these two logs, we set cache which can only take effect in the same job. However, the use of job to cache data does not exist.
If you want to understand the principle and function of cache in more detail, you can go to the Internet to search for a lot of information, but you must remember that what is said on the Internet should be limited to a condition that the cache of job does not exist in the same job.
Solution
What we finally want to solve is how to ensure the consistency of their data when two action want to use rdd from the same data source.
Scheme:
Write the data source used by the first Action operator to a temporary table
Then in the second Action, read the data from the temporary table directly instead of using odsData directly
A better plan has not been figured out yet, and it can be done according to the different business.
The second solution is now that we use the checkpoint mechanism provided by spark, and checkpoint will put our data
Automatically cached to hdfs, it will delete all the data of the previous parent rdd of this rdd, no matter which job's rdd in the future
When you need to use the data of this rdd, you will read the data from the directory of this checkpoin.
Spark.sparkContext.setCheckpointDir ("hdfs://hadoop-1:5000/hanfangfang")
CurentData.cache () checkpoint
This makes the data of different job and the same data source consistent.
At the same time, we should also remember that when the program is finished, it will not delete the checkpoint data, we need to delete it manually.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.