In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
As we said in the last article, RDD persistence is a must in spark optimization, and, in the case of insufficient memory, we can choose the persistence type to MEMORY_ONLY_SER, reduce memory footprint, persist more partition, and different serialization methods can also affect serialization performance.
Next, let's test the impact of the persistence level and serialization method choice on the RDD persistence size.
I chose a log file of 170.9MB and transferred it to Baidu network disk.
Extraction code: ffae
The test environment is windows
IDEA parameter configuration
MEMORY_ONLY
The code is
Case class CleanedLog (cdn:String,region:String,level:String,date:String,ip:String, domain:String, url:String, traffic:String) object KyroTest {def main (args: Array [String]) {val inputPath=new Path (args (0) val outputPath=new Path (args (1)) val fsConf=new Configuration () val fs= FileSystem.get (fsConf) if (fs.exists (outputPath)) {fs.delete (outputPath) True) val path=args (1) .toString println (s "deleted existing path $path")} val conf = new SparkConf () .setMaster ("local [2]") .setAppName ("KyroTest") / / conf.set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer") / / conf.set ("spark.kryo.registrationRequired") "true") val sc = new SparkContext (conf) val logs = sc.textFile (args (0)) / / logs.filter (_ .split ("\ t") .length = = 8) .take (10) .foreach (println (_)) val logsCache=logsCahe (logs) / / serialize rdd to saveAtLocal (logsCache) Args (1)) Thread.sleep (100000)} def logsCahe (logs: RDD [string]): RDD [CleanedLog] = {logs.filter (_ .split ("\ t") .length = = 8) .map (x = > {val fields=x.split ("\ t") CleanedLog (fields (0), fields (1), fields (2), fields (3), fields (4), fields (5), fields (6) Fields (7)}) .cache ()} def saveAtLocal (logsCache: RDD [CleanedLog], outputPath: String) = {logsCache.map (x = > {x.cdn+ "\ t" + x.region+ "\ t" + x.level + "\ t" + x.date+ "\ t" + x.ip+ "\ t" + x.domain+ "\ t" + x.url + "\ t" + x.traffic}) .repartition (1) .saveAsTextFile (outputPath)}
The code logic is what the input is, and the input is what the input is. In the middle, I persist the input text RDD with memory_only. Let's see how much memory this persistence takes.
Obviously, the size of the input size is 170.9 MB, but after persistence, it is 908.5 MB, which obviously takes up several times the memory space. In the case of insufficient memory resources in production, this method obviously cannot cache a lot of partition.
It takes 14 seconds.
MEMORY_ONLY_SER did not use kryo to serialize def logsCahe (logs: RDD [string]): RDD [CleanedLog] = {logs.filter (_ .split ("\ t") .length = = 8) .map (x = > {val fields=x.split ("\ t") CleanedLog (fields (0), fields (1), fields (2), fields (3), fields (4), fields (5), fields (6), fields (7)})
The code only changes persist (StorageLevel.MEMORY_ONLY_SER)
Obviously, the input size size is 170.9 MB, but there is 204.9MB after persistence, so serialization is very helpful in saving memory space.
The time takes 11s.
MEMORY_ONLY_SER uses kryo to serialize unregistered val conf = new SparkConf () .setMaster ("local [2]") .setAppName ("KyroTest") conf.set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Compared with the previous code, SparkConf is set to enable kryo serialization, not the default java serialization, but no specific class registration!
Obviously, the input size size is 170.9 MB, but there is 230.8MB after persistence, and using unregistered kryo serialization is even more bloated than using java serialization! The reason is: the serialization result of each object instance will contain a complete class name, resulting in a lot of space waste!
The time is 9s, which is a little faster than java serialization.
MEMORY_ONLY_SER uses kryo to serialize and register val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName ("KyroTest") conf.set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses (Array (classOf [CleanedLog], classOf [string]))
Added kryo registration for String class and custom sample class
Obviously, the input size size is 170.9 MB, and after serializing with the registered kryo, there is only 175.7MB, and the time is only 9 seconds, which is very comfortable!
So by far, using kryo to serialize and register is the best performance!
If CPU is still so leisurely, we have another further optimization point!
Register kryo serialization and turn on RDD compression
Note: RDD compression can only exist in serialization cases
Val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName ("KyroTest") conf.set ("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses (Array (classOf [CleanedLog], classOf [string]) conf.set ("spark.rdd.compress", "true")
The size of persistence is only 45.6 MBytes!
Spark.rdd.compress
This parameter determines whether RDD data is further compressed and stored in memory or disk during the RDD Cache process after serialization. Of course, in order to further reduce the size of Cache data, the absolute size of Cache on disk probably does not matter much, mainly considering the IO bandwidth of Disk. As for Cache in memory, it mainly considers the influence of size, whether it can Cache more data, whether it can reduce the pressure of Cache data on GC and so on.
Of the two, the former is not usually the main problem, especially if the purpose of RDD Cache itself is to pursue speed, reduce recalculation steps, and trade IO for CPU. And the latter, the GC problem of course needs to be considered, the amount of data is small, occupies less space, the problem of GC will probably be alleviated, but whether it really needs to go to the step of RDD Cache compression, it may be more effective to solve it in other ways.
So this value is off by default, but if disk IO does become a problem or there is really no better solution to the GC problem, consider enabling RDD compression.
Compare table type input size persistence size MEMORY_ONLY170.9 MB908.5 MB14sMEMORY_ONLY_SER170.9 MB204.9MB11skyro serialization unregistered 170.9 MB230.8MB9skyro serialization registration 170.9 MB175.7MB9s register kryo serialization and turn on RDD compression 170.9 MB45.6MB9s
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.