In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
本篇内容介绍了"mapPartitions的简单介绍及使用方法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1. mappartition简介
首先,说到mapPartitions大家肯定想到的是map和MapPartitions的对比。大家都知道mapPartition算子是使用一个函数针对分区计算的,函数参数是一个迭代器。而map只针对每条数据调用的,所以存在访问外部数据库等情况时mapParititons更加高效。
mapPartitions函数: /** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) }有代码可知mapPartitions的函数参数是传入一个迭代器,返回值是另一个迭代器。map函数:
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) }map函数就是将rdd的元素由T类型转化为U类型。综上可知,map和foreach这类的是针对一个元素调用一次我们的函数,也即是我们的函数参数是单个元素,假如函数内部存在数据库链接、文件等的创建及关闭,那么会导致处理每个元素时创建一次链接或者句柄,导致性能底下,很多初学者犯过这种毛病。而foreachpartition/mapPartitions是针对每个分区调用一次我们的函数,也即是我们函数传入的参数是整个分区数据的迭代器,这样避免了创建过多的临时链接等,提升了性能。下面的例子都是1-20这20个数字,经过map完成a*3的转换:val a = sc.parallelize(1 to 20, 2)
def mapTerFunc(a : Int) : Int = {a*3}
val mapResult = a.map(mapTerFunc)
println(mapResult.collect().mkString(","))
结果 3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60
3. mappartitions低效用法
大家通常的做法都是申请一个迭代器buffer,将处理后的数据加入迭代器buffer,然后返回迭代器。如下面的demo。val a = sc.parallelize(1 to 20, 2) def terFunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res.::= (cur*3) ; } res.iterator}
val result = a.mapPartitions(terFunc)println(result.collect().mkString(","))结果乱序了,因为我的list是无序的,可以使用LinkList:30,27,24,21,18,15,12,9,6,3,60,57,54,51,48,45,42,39,36,33
4. mappartitions高效用法
注意,3中的例子,会在mappartition执行期间,在内存中定义一个数组并且将缓存所有的数据。假如数据集比较大,内存不足,会导致内存溢出,任务失败。对于这样的案例,Spark的RDD不支持像mapreduce那些有上下文的写方法。其实,浪尖有个方法是无需缓存数据的,那就是自定义一个迭代器类。如下例:
class CustomIterator(iter: Iterator[Int]) extends Iterator[Int] { def hasNext : Boolean = { iter.hasNext } def next : Int= { val cur = iter.next cur*3 } }
val result = a.mapPartitions(v => new CustomIterator(v)) println(result.collect().mkString(","))结果: 3,6,9,12,15,18,21,24,27,30,33,36,39,42,45,48,51,54,57,60"mapPartitions的简单介绍及使用方法"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.