Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to transfer functions from RDD to spark

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "how RDD transfers functions to spark". In the operation process of actual cases, many people will encounter such difficulties. Next, let Xiaobian lead you to learn how to deal with these situations! I hope you can read carefully and learn something!

lazy evaluation

RDD conversion operations are lazy evaluations.

Lazy evaluation means that we do not immediately perform the translation operation (e.g. map operation) on the RDD call, instead spark internally records information about the operation requested.

Reading data into RDD is also lazy, so when we call sc.textFile(), the data is not read immediately, but only when necessary. Read data operations can be performed multiple times as well as conversion operations. This is something to pay special attention to when writing code.

There may be something counterintuitive about lazy evaluation for beginners. Those who have been exposed to functional language classes such as haskell should not be unfamiliar.

When we first came into contact with spark, we also had this question.

I have also participated in such discussions:

val sc = new SparkContext("local[2]", "test") val f:Int ⇒ Int = (x:Int) ⇒ x + 1 val g:Int ⇒ Int = (x:Int) ⇒ x + 1 val rdd = sc.parallelize(Seq(1,2,3,4),1) //1 val res1 = rdd.map(x ⇒ g(f(x))).collect //2 val res2 = rdd.map(g).map(f).collect

Operations 1 and 2 both get the results we want, but which is better?

Intuitively, we think that the first operation is better, because the first operation can only take one iteration to get the result we want. The second operation takes two iterations to complete.

Is this what we think it is? Let's add printing to calls to functions f and g. According to the above assumptions. The outputs of 1 and 2 are as follows:

1: f g f g f g f g 2: g g g g f f f f

Code:

val sc = new SparkContext("local[2]", "test")val f:Int ⇒ Int = (x:Int) ⇒ { print("f\t") x + 1 }val g:Int ⇒ Int = (x:Int) ⇒ { print("g\t") x + 1}val rdd = sc.parallelize(Seq(1,2,3,4), 1//1val res1 = rdd.map(x ⇒ g(f(x))).collect()//2val res2 = rdd.map(f).map(g).collect()

Copy the above code and try it out. We get something like this on the console.

f g f g f g f gf g f g f g f g

Is that a big surprise? What does that mean? Spark is lazy evaluation! When we call map(f), we don't actually compute map (f), we just tell spark how the data is computed. map(f).map(g) is actually telling spark that the data is first calculated through f and then through g. Then, when collecting (), spark calls f and g on the data in one iteration.

Returning to our original question, since there is no difference in performance between the two invocation methods, which invocation method is better? We recommend the second method, except that the api is clearer. In the case of long call chains, we can use spark's checkpoint mechanism to add checkpoints in the middle, so that data recovery is less expensive. In the first case, if the chain fails, the data can only be calculated from scratch.

So what kind of magic did spark cast that was so magical? Let's lift the veil of spark. The best way is to look at the source code. Take map for example:

Map method of RDD

/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }

Compute Methods for MapPartitionsRDD

override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))

The key is this iter.map (cleanF)), we call a map method is actually calling a map method on the iter object. The iter object is an instance of scala.collection.Iterator.

Take a look at Iterator's map method.

def map[B](f: A => B): Iterator[B]= new AbstractIterator[B] { def hasNext = self.hasNext def next() = f(self.next())}

Recall that we just said that calling a map method on the RDD just tells spark how the data is calculated, not actually calculated. Is that a realization?

Transfer function to spark

We can pass defined inline functions, references to methods, or static methods to spark. Just like other functional APIs of scala. We also need to consider some details, such as whether the passed function and the variables it references are serializable (implementing java's Serializable interface). In addition, when you pass a method or field of an object, you include a reference to the entire object. We can place the field in a local variable to avoid passing the entire object containing the field.

Function transfer in scala

class SearchFunctions(val query:String){ def isMatch(s:String) = s.contains(query) def getMatchFuncRef(rdd:RDD[String]) :RDD[String]= { //isMatch stands for this.isMatch so we pass the whole this rdd.map(isMatch) } def getMatchFieldRef(rdd:RDD[String])={ //query means this.query so we pass the whole this rdd.map(x=>x.split(query)) } def getMatchsNoRef(rdd:RDD[String])={ //security Just put the fields we need into local variables val q = this.query rdd.map(x=>x.split(query)) }}

If a NotSerializableException occurs in scala, usually the problem is that we passed a function or field in a nonserializable class. It is always safe to pass a locally serializable variable or function in a top-level object.

persistence

As mentioned earlier, spark's RDD is lazy to evaluate, and sometimes we want to be able to use the same RDD more than once. Spark recalculates the RDD and its dependencies each time it simply invokes an action operation on the RDD. This is costly in iterative algorithms. You can use RDD.persist() to have spark cache RDD.

Avoid GroupByKey

Let's look at two ways to workCount, one using reduceByKey and the other using groupByKey.

val words = Array("one", "two", "two", "three", "three", "three")val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))val wordCountsWithReduce = wordPairsRDD .reduceByKey(_ + _) .collect()val wordCountsWithGroup = wordPairsRDD .groupByKey() .map(t => (t._ 1, t._ 2.sum)) .collect()

While both methods produce correct results, reduceByKey works better on large datasets. This is because spark adds a combine operation to each partition before shuffling the data. This will greatly reduce shuffling before the data.

See below to understand the reduceBykey process

GroupBykey shuffs all data, which greatly increases the amount of data transmitted over the network. If one key corresponds to many values, this may also cause out of memory.

The process of groupby

"How RDD transfers functions to spark" is introduced here. Thank you for reading it. If you want to know more about industry-related knowledge, you can pay attention to the website. Xiaobian will output more high-quality practical articles for everyone!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report