aggregateByKey
aggregateByKey的用法同combineByKey,针对combineByKey的三个参数:
createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C
将createCombiner: V => C替换成一个初始值 C ,相当于aggregateByKey的三个参数为:
zeroValue: C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C
注意—>>: 需要注意的是,zeroValue这个值一般要求置为,0、“”、Nil。 因为最终的合并结果和分区个数有关。 mergeValue是针对每一个分区进行合并,每个分区都会调用一下初始值zeroValue; 如果初始值zeroValue非空,会导致最终合并每一个分区的值:mergeCombiners的合并结果不同。
源码
/**
* 底层同样调用的是 combineByKeyWithClassTag
*/
def aggregateByKey[
U: ClassTag](
zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
...
combineByKeyWithClassTag[
U](
(v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
}
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
案例同combineByKey
请参考:Spark算子[08]:combineByKey详解
Scala实战案例
/***/
def avgScore(): Unit = {
val avgscoreRdd = studentDetailRdd.aggregateByKey(
(
0.0f,
0) ,
(acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 +
1),
(acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
}
Java实战案例
public static
void avgScore() {
Function2
<Tuple2
<Float,
Integer>,ScoreDetail003,Tuple2
<Float,
Integer>> mergeValue
= new Function2
<Tuple2
<Float,
Integer>, ScoreDetail003, Tuple2
<Float,
Integer>>() {
@Override
public Tuple2
<Float,
Integer> call(Tuple2
<Float,
Integer> v1, ScoreDetail003 v2) throws Exception {
return new Tuple2
<Float,
Integer>(v1
._1()
+v2
.score,v1
._2()
+1);
}
};
Function2
<Tuple2
<Float,
Integer>,Tuple2
<Float,
Integer>,Tuple2
<Float,
Integer>> mergeCombiners
= new Function2
<Tuple2
<Float,
Integer>, Tuple2
<Float,
Integer>, Tuple2
<Float,
Integer>>() {
@Override
public Tuple2
<Float,
Integer> call(Tuple2
<Float,
Integer> v1, Tuple2
<Float,
Integer> v2) throws Exception {
return new Tuple2
<Float,
Integer>(v1
._1()
+v2
._1(),v1
._2()
+v2
._2());
}
};
JavaPairRDD
<String,Float
> res
= pairRDD
.aggregateByKey(
new Tuple2
<Float,
Integer>(
0.0f,
0), mergeValue, mergeCombiners,
2)
.mapToPair(x
-> new Tuple2
<String, Float
>(x
._1(),x
._2()
._1()/x
._2()
._2()));
}
aggregate
/**
* 聚合每个分区的元素,然后是聚合所有分区的结果,使用给定的组合函数和一个中性的
"zero value".
*
*
@param zeroValue "seqOp"操作符的每个分区的合并结果的初始值,同时也是"combOp"操作对于不同分区合并结果的初始值;
* 这通常是中性元素(比如 `Nil` 对于集合操作 或者 `
0`
for 求和操作)
*
@param seqOp 用于在分区中累积结果的操作符
*
@param combOp 用于组合不同分区的结果的关联运算符
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {}
注意—>>: 在seqOp的每一个分区内合并均调用一次初始值zeroValue; 在combOp操作中,每一个分区间合并时,也会掉用一次初始值zeroValue;
Scala实战案例
def aggregateOp(): Unit ={
val conf = new SparkConf().setAppName(
"aggregateOp").setMaster(
"local")
val sc = new SparkContext(conf)
val list = List(
"aa",
"bb",
"cc")
val rdd = sc.parallelize(list,
2)
def seqOp(a:String,b:String): String ={
a+
"-"+b
}
def combOp(a:String,b:String): String ={
a+
":"+b
}
val res = rdd.aggregate(
"oo")(seqOp,combOp)
println(res)
}
结果 oo:oo-aa:oo-bb-cc
def main(args: Array[String]) {
val conf =
new SparkConf().setAppName(
"aggregateOp").setMaster(
"local")
val sc =
new SparkContext(conf)
val list = List(
5)
val rdd = sc.parallelize(list)
def seqOp(a: Int, b: Int): Int = {
a + b
}
def combOp(a: Int, b: Int): Int = {
a + b
}
val res = rdd.aggregate(
2)(seqOp,combOp)
println(res)
}
从结果可以看出,zeroValue分别: 在seqOp的每一个分区中均作为一次初始值; 在combOp操作中,也作为一次初始值;
Java实战案例
public static
void aggregateOp(){
SparkConf conf
= new SparkConf()
.setMaster(
"local")
.setAppName(
"aggregateOp");
JavaSparkContext sc
= new JavaSparkContext(conf);
JavaRDD
<Integer> rdd1
= sc
.parallelize(Arrays
.asList(
1,
2,
3,
4,
5,
6,
7,
8,
9),
2);
Function2
<Integer,
Integer,
Integer> seqOp
= new Function2
<Integer,
Integer,
Integer>() {
@Override
public Integer call(
Integer v1,
Integer v2) throws Exception {
return Math
.max(v1,v2);
}
};
Function2
<Integer,
Integer,
Integer> combOp
= new Function2
<Integer,
Integer,
Integer>() {
@Override
public Integer call(
Integer v1,
Integer v2) throws Exception {
return Math
.min(v1,v2);
}
};
Integer res
= rdd1
.aggregate(
0,seqOp,combOp);
System
.out
.println(res);
}
结果:0