Scala 高级算子
==> mapPartitionsWithIndex
--->定义:def mapPartitionsWithIndex[U](f:(Int, Iterator[T]) => Iterator[U], preserversPartitioning: Boolean = false)
--->作用:对RDD每个分区进行操作,带有分区号
--->示例:输出分区号和内容
//创建一个RDDvalrdd1=sc.parallelize(List(1,2,3,4,5,6,7,8,9))//创建一个函数,作为f的值deffunc(index:Int,iter:Iterator[Int]):Iterator[String]={iter.toList.map(x=>"[PartID:"+index+",value="+x+"]").iterator}//调用rdd1.mapPartitionsWithIndex(func).colect//结果res15:Array[String]=Array([PartitionID:0,value=1],[PartitionID:0,value=2],[PartitionID:0,value=3],[PartitionID:0,value=4],[PartitionID:1,value=5],[PartitionID:1,value=6],[PartitionID:1,value=7],[PartitionID:1,value=8],[PartitionID:1,value=9])
==>aggregate
--->定义:def aggregate[U: ClassTag](zeroValue: U)(seqOp:(U, T) => U, combOp: (U, U) => U): U
---- (zeroValue: U)初始值
---- seqOp:(U, T) => U局部操作
---- combOp:(U, U) => U全局操作
--->作用:先对局部进行操作,再对全局进行操作
--->示例:
//求两个分区最大值的和,初始值为0valrdd1=sc.parallelize(List(1,2,3,4,5,6,7,8,9))rdd1.aggregate(0)(math.max(_,_),_+_)//结果为:res16:Int=13
==>aggregateByKey
--->定义:
--->作用:对key-value格式的数据进行aggregate操作
--->示例:
//准备一个key-value格式的RDDvalparRDD=sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)//计算每个分区中的动物最多的个数求和parRDD.aggregateByKey(0)(math.max(_,_),_+_)//结果为:Array[(String,Int)]=Array((dog,12),(cat,17),(mouse,6))//计算每种动物的总数量parRDD.aggregateByKey(0)(_+_,_+_).collect//方法一parRDD.reduceByKey(_+_).collect
==> coalesce与 repartition
--->作用:将RDD中的分区进行重分区
--->区别: coalesce默认不会进行 shuffle(false)
repartition会进行 shuffle(true),会将数据真正通过网络进行重分区
--->示例:
//定义一个RDDvalrdd=sc.parallelize(List(1,2,3,4,5,6,7,8),2)//显示分区中的分区号和分区号中的内容deffunc(index:Int,iter:Iterator[Int]):Iterator[String]={iter.toList.map(x=>"[PartID:"+index+",value="+x+"]").iterator}//查看rdd中的分区情况rdd.mapPartitionsWithIndex(func).collect//结果为:Array[String]=Array(//[PartID:0,value=1],[PartID:0,value=2],[PartID:0,value=3],[PartID:0,value=4],//[PartID:1,value=5],[PartID:1,value=6],[PartID:1,value=7],[PartID:1,value=8])//使用repartition将分区数改为3valrdd2=rdd1.repartition(3)valrdd3=rdd1.coalesce(3,true)//查看rdd2与rdd3的分区情况rdd2.mapPartitionsWithIndex(func).collectrdd3.mapPartitionsWithIndex(func).collect//结果为:Array[String]=Array(//[PartID:0,value=3],[PartID:0,value=6],//[PartID:1,value=1],[PartID:1,value=4],[PartID:1,value=7],//[PartID:2,value=2],[PartID:2,value=5])
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。