Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Spark RDD's collect action is not suitable for example analysis when a single element size is too large.

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article shows you that Spark RDD's collect action is not suitable for a single element size too large example analysis, the content is concise and easy to understand, can definitely brighten your eyes, through the detailed introduction of this article, I hope you can get something.

Collect is a very easy-to-use action for Spark RDD, and you can easily get all the elements in a RDD through collect. When these elements are of type String, you can easily convert the entire RDD into a List, which is simply not too easy to use.

But wait a minute, such an easy-to-use action has a weakness. It is not suitable for element compared by size. Let me give you an example. Take a look at the following code:

......

JavaPairInputDStream messages = KafkaUtils.createDirectStream (

Jssc

String.class

String.class

StringDecoder.class

StringDecoder.class

KafkaParams

TopicsSet

);

JavaDStream lines = messages.map (new Function () {

@ Override

Public String call (Tuple2 tuple2) {

Return tuple2._2 ()

}

});

Lines.foreachRDD (new Function () {

@ Override

Public Void call (JavaRDD strJavaRDD) throws Exception {

List messages = strJavaRDD.collect ()

List sizeStrs = new ArrayList ()

For (String message: messages) {

If (message== null)

Continue

String logStr = "message size is" + message.length ()

Strs.add (logStr)

}

SaveToLog (outputLogPath, strs)

Return null

}

});

......

The above code works well when the size of a single message (that is,) in Kafka is very small (such as 200Bytes). However, when a single message size becomes large enough (for example, 10MB), the following exception is thrown:

SparkDriver-akka.actor.default-dispatcher-18 2015-10-15 21 52purl 28606 ERROR JobSc

Heduler-Error running job streaming job 1444971120000 ms.0

Org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 238.0 failed 4 times, most recent failure: Lost task 0.3 in stage 238.0 (TID421, 127.0.0.1): ExecutorLostFailure (executor 123 lost)

Driver stacktrace:

At org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages (DAGScheduler.scala:1215)

At org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply (DAGScheduler.scala:1204)

At org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply (DAGScheduler.scala:1203)

At scala.collection.mutable.ResizableArray$class.foreach (ResizableArray.scala:59)

At scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala:47)

At org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala:1203)

At org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply (DAGScheduler.scala:693)

At org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply (DAGScheduler.scala:693)

At scala.Option.foreach (Option.scala:236)

At org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala:693)

At org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala:1404)

At org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala:1365)

At org.apache.spark.util.EventLoop$$anon$1.run (EventLoop.scala:48)

The reason is simple: collect () cannot handle "big data". For a single message like 10MB size. We can replace the last part above with the following code:

Lines.foreachRDD (new Function () {

@ Override

Public Void call (JavaRDD strJavaRDD) throws Exception {

JavaRDD sizeRDD = strJavaRDD.map (new Function () {

@ Override

Public String call (String message) throws Exception {

If (message = = null)

Return null

String logStr = "Message size is" + message.length ()

Return logStr

}

});

List sizeStrs = sizeRDD.collect ()

SaveToLog (outputLogPat, sizeStrs)

Return null

}

});

The above is that the collect action of Spark RDD is not suitable for the example analysis of a single element size that is too large. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report