From d4e56b930b6be0748ac1b1367b3e6f1992901d4c Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sat, 22 Jun 2013 18:51:14 -0700 Subject: [PATCH 1/4] Made combOp of aggregate() work as a reduce instead of an implicit fold --- core/src/main/scala/spark/RDD.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index f336c2ea1e..f359477415 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -635,13 +635,14 @@ abstract class RDD[T: ClassManifest]( */ def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { // Clone the zero value since we will also be serializing it as part of tasks - var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) + var jobResult = Utils.clone(Option.empty[U], sc.env.closureSerializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) - val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) - val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) + def optCombOp(a: Option[U], b: Option[U]): Option[U] = for (u <- b) yield a.fold(u)(cleanCombOp(_, _)) + val aggregatePartition = (it: Iterator[T]) => Option(it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)) + val mergeResult = (index: Int, taskResult: Option[U]) => jobResult = optCombOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) - jobResult + jobResult.get } /** From 67b66f2dc53ee121cfcaf3c13c181a090268e41f Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sat, 22 Jun 2013 22:33:57 -0700 Subject: [PATCH 2/4] RDD.aggregate: - preserve sequential order within partitions - reformat code --- core/src/main/scala/spark/RDD.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index f359477415..708d84a85c 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -638,9 +638,12 @@ abstract class RDD[T: ClassManifest]( var jobResult = Utils.clone(Option.empty[U], sc.env.closureSerializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) - def optCombOp(a: Option[U], b: Option[U]): Option[U] = for (u <- b) yield a.fold(u)(cleanCombOp(_, _)) - val aggregatePartition = (it: Iterator[T]) => Option(it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)) - val mergeResult = (index: Int, taskResult: Option[U]) => jobResult = optCombOp(jobResult, taskResult) + def optCombOp(a: Option[U], b: Option[U]): Option[U] = + for (u <- b) yield a.fold(u)((u1, u2) => cleanCombOp(u2, u1)) + val aggregatePartition = + (it: Iterator[T]) => Option(it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)) + val mergeResult = + (index: Int, taskResult: Option[U]) => jobResult = optCombOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult.get } From 2330e605b6a4c7e1fcc614be881e2fd5fdc23a1a Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sun, 23 Jun 2013 11:40:44 -0700 Subject: [PATCH 3/4] RDD.aggregate: - no need to clone None for jobResult - use combOp instead of cleanCombOp --- core/src/main/scala/spark/RDD.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 708d84a85c..ad02bbd1d1 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -634,12 +634,11 @@ abstract class RDD[T: ClassManifest]( * allocation. */ def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { - // Clone the zero value since we will also be serializing it as part of tasks - var jobResult = Utils.clone(Option.empty[U], sc.env.closureSerializer.newInstance()) + var jobResult = Option.empty[U] val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) def optCombOp(a: Option[U], b: Option[U]): Option[U] = - for (u <- b) yield a.fold(u)((u1, u2) => cleanCombOp(u2, u1)) + for (u <- b) yield a.fold(u)((u1, u2) => combOp(u2, u1)) val aggregatePartition = (it: Iterator[T]) => Option(it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)) val mergeResult = From 75d8681f575893cb90273c45e0eac33aa66aba4b Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 25 Jun 2013 10:23:00 -0700 Subject: [PATCH 4/4] Handle RDDs with no partitions in aggregate() --- core/src/main/scala/spark/RDD.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index ad02bbd1d1..c1636e25a4 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -637,8 +637,12 @@ abstract class RDD[T: ClassManifest]( var jobResult = Option.empty[U] val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) - def optCombOp(a: Option[U], b: Option[U]): Option[U] = - for (u <- b) yield a.fold(u)((u1, u2) => combOp(u2, u1)) + def optCombOp(a: Option[U], b: Option[U]): Option[U] = (a, b) match { + case (None, None) => Option(zeroValue) + case (None, _) => b + case (Some(u1), Some(u2)) => Option(combOp(u1, u2)) + case (_, _) => throw new SparkException("Have a jobResult but no taskResult in aggregate()") + } val aggregatePartition = (it: Iterator[T]) => Option(it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)) val mergeResult =