diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 6d04bf790e3a5..8f26baeea170b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx import scala.reflect.{classTag, ClassTag} +import org.apache.spark.Logging import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} import org.apache.spark.graphx.impl.EdgePartition import org.apache.spark.rdd.RDD @@ -30,7 +31,8 @@ import org.apache.spark.storage.StorageLevel */ class EdgeRDD[@specialized ED: ClassTag]( val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])]) - extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) + with Logging { partitionsRDD.setName("EdgeRDD") @@ -46,7 +48,11 @@ class EdgeRDD[@specialized ED: ClassTag]( override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context) - p.next._2.iterator.map(_.copy()) + if (p.hasNext) { + p.next._2.iterator.map(_.copy()) + } else { + Iterator.empty + } } override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() @@ -70,8 +76,12 @@ class EdgeRDD[@specialized ED: ClassTag]( private[graphx] def mapEdgePartitions[ED2: ClassTag]( f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = { new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => - val (pid, ep) = iter.next() - Iterator(Tuple2(pid, f(pid, ep))) + if (iter.hasNext) { + val (pid, ep) = iter.next() + Iterator(Tuple2(pid, f(pid, ep))) + } else { + Iterator.empty + } }, preservesPartitioning = true)) } @@ -108,9 +118,17 @@ class EdgeRDD[@specialized ED: ClassTag]( val ed3Tag = classTag[ED3] new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) { (thisIter, otherIter) => - val (pid, thisEPart) = thisIter.next() - val (_, otherEPart) = otherIter.next() - Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag))) + if (thisIter.hasNext && otherIter.hasNext) { + val (pid, thisEPart) = thisIter.next() + val (_, otherEPart) = otherIter.next() + Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag))) + } else { + if (thisIter.hasNext != otherIter.hasNext) { + logError("innerJoin: Dropped non-empty edge partition from `%s`".format( + if (thisIter.hasNext) "this" else "other")) + } + Iterator.empty + } }) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index c2b510a31ee3f..0ee0b612a0cc2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} +import org.apache.spark.Logging import org.apache.spark.util.collection.PrimitiveVector import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.SparkContext._ @@ -47,7 +48,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( @transient val edges: EdgeRDD[ED], @transient val routingTable: RoutingTable, @transient val replicatedVertexView: ReplicatedVertexView[VD]) - extends Graph[VD, ED] with Serializable { + extends Graph[VD, ED] with Serializable with Logging { /** Default constructor is provided to support serialization */ protected def this() = this(null, null, null, null) @@ -58,9 +59,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( val edTag = classTag[ED] edges.partitionsRDD.zipPartitions( replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) => - val (pid, ePart) = ePartIter.next() - val (_, vPart) = vPartIter.next() - new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag) + if (ePartIter.hasNext && vPartIter.hasNext) { + val (pid, ePart) = ePartIter.next() + val (_, vPart) = vPartIter.next() + new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag) + } else { + if (ePartIter.hasNext != vPartIter.hasNext) { + logError("triplets: Dropped non-empty %s partition".format( + if (ePartIter.hasNext) "edge" else "vertex")) + } + Iterator.empty + } } } @@ -131,22 +140,30 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( val newEdgePartitions = edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) { (ePartIter, vTableReplicatedIter) => - val (ePid, edgePartition) = ePartIter.next() - val (vPid, vPart) = vTableReplicatedIter.next() - assert(!vTableReplicatedIter.hasNext) - assert(ePid == vPid) - val et = new EdgeTriplet[VD, ED] - val inputIterator = edgePartition.iterator.map { e => - et.set(e) - et.srcAttr = vPart(e.srcId) - et.dstAttr = vPart(e.dstId) - et - } - // Apply the user function to the vertex partition - val outputIter = f(ePid, inputIterator) - // Consume the iterator to update the edge attributes - val newEdgePartition = edgePartition.map(outputIter) - Iterator((ePid, newEdgePartition)) + if (ePartIter.hasNext && vTableReplicatedIter.hasNext) { + val (ePid, edgePartition) = ePartIter.next() + val (vPid, vPart) = vTableReplicatedIter.next() + assert(!vTableReplicatedIter.hasNext) + assert(ePid == vPid) + val et = new EdgeTriplet[VD, ED] + val inputIterator = edgePartition.iterator.map { e => + et.set(e) + et.srcAttr = vPart(e.srcId) + et.dstAttr = vPart(e.dstId) + et + } + // Apply the user function to the vertex partition + val outputIter = f(ePid, inputIterator) + // Consume the iterator to update the edge attributes + val newEdgePartition = edgePartition.map(outputIter) + Iterator((ePid, newEdgePartition)) + } else { + if (ePartIter.hasNext != vTableReplicatedIter.hasNext) { + logError("mapTriplets: Dropped non-empty %s partition".format( + if (ePartIter.hasNext) "edge" else "ReplicatedVertexView")) + } + Iterator.empty + } } new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView) } @@ -216,50 +233,58 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( // Map and combine. val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) => - val (ePid, edgePartition) = ePartIter.next() - val (vPid, vPart) = vPartIter.next() - assert(!vPartIter.hasNext) - assert(ePid == vPid) - // Choose scan method - val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat - val edgeIter = activeDirectionOpt match { - case Some(EdgeDirection.Both) => - if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) - .filter(e => vPart.isActive(e.dstId)) - } else { - edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) + if (ePartIter.hasNext && vPartIter.hasNext) { + val (ePid, edgePartition) = ePartIter.next() + val (vPid, vPart) = vPartIter.next() + assert(!vPartIter.hasNext) + assert(ePid == vPid) + // Choose scan method + val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat + val edgeIter = activeDirectionOpt match { + case Some(EdgeDirection.Both) => + if (activeFraction < 0.8) { + edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) + .filter(e => vPart.isActive(e.dstId)) + } else { + edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) + } + case Some(EdgeDirection.Either) => + // TODO: Because we only have a clustered index on the source vertex ID, we can't filter + // the index here. Instead we have to scan all edges and then do the filter. + edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId)) + case Some(EdgeDirection.Out) => + if (activeFraction < 0.8) { + edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) + } else { + edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) + } + case Some(EdgeDirection.In) => + edgePartition.iterator.filter(e => vPart.isActive(e.dstId)) + case _ => // None + edgePartition.iterator + } + + // Scan edges and run the map function + val et = new EdgeTriplet[VD, ED] + val mapOutputs = edgeIter.flatMap { e => + et.set(e) + if (mapUsesSrcAttr) { + et.srcAttr = vPart(e.srcId) } - case Some(EdgeDirection.Either) => - // TODO: Because we only have a clustered index on the source vertex ID, we can't filter - // the index here. Instead we have to scan all edges and then do the filter. - edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId)) - case Some(EdgeDirection.Out) => - if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) - } else { - edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) + if (mapUsesDstAttr) { + et.dstAttr = vPart(e.dstId) } - case Some(EdgeDirection.In) => - edgePartition.iterator.filter(e => vPart.isActive(e.dstId)) - case _ => // None - edgePartition.iterator - } - - // Scan edges and run the map function - val et = new EdgeTriplet[VD, ED] - val mapOutputs = edgeIter.flatMap { e => - et.set(e) - if (mapUsesSrcAttr) { - et.srcAttr = vPart(e.srcId) + mapFunc(et) } - if (mapUsesDstAttr) { - et.dstAttr = vPart(e.dstId) + // Note: This doesn't allow users to send messages to arbitrary vertices. + vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator + } else { + if (ePartIter.hasNext != vPartIter.hasNext) { + logError("mapReduceTriplets: Dropped non-empty %s partition".format( + if (ePartIter.hasNext) "edge" else "vertex")) } - mapFunc(et) + Iterator.empty } - // Note: This doesn't allow users to send messages to arbitrary vertices. - vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator } // do the final reduction reusing the index map diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala index a8154b63ce5fb..cfda66dfd1c84 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} +import org.apache.spark.Logging import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet} @@ -42,7 +43,7 @@ class ReplicatedVertexView[VD: ClassTag]( updatedVerts: VertexRDD[VD], edges: EdgeRDD[_], routingTable: RoutingTable, - prevViewOpt: Option[ReplicatedVertexView[VD]] = None) { + prevViewOpt: Option[ReplicatedVertexView[VD]] = None) extends Logging { /** * Within each edge partition, create a local map from vid to an index into the attribute @@ -103,9 +104,13 @@ class ReplicatedVertexView[VD: ClassTag]( // Update the view with shippedActives, setting activeness flags in the resulting // VertexPartitions get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) => - val (pid, vPart) = viewIter.next() - val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator)) - Iterator((pid, newPart)) + if (viewIter.hasNext) { + val (pid, vPart) = viewIter.next() + val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator)) + Iterator((pid, newPart)) + } else { + Iterator.empty + } } } @@ -126,57 +131,73 @@ class ReplicatedVertexView[VD: ClassTag]( // VertexPartitions prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) { (prevViewIter, shippedVertsIter) => - val (pid, prevVPart) = prevViewIter.next() - val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator)) - Iterator((pid, newVPart)) + if (prevViewIter.hasNext) { + val (pid, prevVPart) = prevViewIter.next() + val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator)) + Iterator((pid, newVPart)) + } else { + Iterator.empty + } }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst)) case None => // Within each edge partition, place the shipped vertex attributes into the correct // locations specified in localVertexIdMap localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => - val (pid, vidToIndex) = mapIter.next() - assert(!mapIter.hasNext) - // Populate the vertex array using the vidToIndex map - val vertexArray = vdTag.newArray(vidToIndex.capacity) - for ((_, block) <- shippedVertsIter) { - for (i <- 0 until block.vids.size) { - val vid = block.vids(i) - val attr = block.attrs(i) - val ind = vidToIndex.getPos(vid) - vertexArray(ind) = attr + if (mapIter.hasNext) { + val (pid, vidToIndex) = mapIter.next() + assert(!mapIter.hasNext) + // Populate the vertex array using the vidToIndex map + val vertexArray = vdTag.newArray(vidToIndex.capacity) + for ((_, block) <- shippedVertsIter) { + for (i <- 0 until block.vids.size) { + val vid = block.vids(i) + val attr = block.attrs(i) + val ind = vidToIndex.getPos(vid) + vertexArray(ind) = attr + } } + val newVPart = new VertexPartition( + vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag) + Iterator((pid, newVPart)) + } else { + Iterator.empty } - val newVPart = new VertexPartition( - vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag) - Iterator((pid, newVPart)) }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst)) } } } -private object ReplicatedVertexView { +private object ReplicatedVertexView extends Logging { protected def buildBuffer[VD: ClassTag]( pid2vidIter: Iterator[Array[Array[VertexId]]], vertexPartIter: Iterator[VertexPartition[VD]]) = { - val pid2vid: Array[Array[VertexId]] = pid2vidIter.next() - val vertexPart: VertexPartition[VD] = vertexPartIter.next() - - Iterator.tabulate(pid2vid.size) { pid => - val vidsCandidate = pid2vid(pid) - val size = vidsCandidate.length - val vids = new PrimitiveVector[VertexId](pid2vid(pid).size) - val attrs = new PrimitiveVector[VD](pid2vid(pid).size) - var i = 0 - while (i < size) { - val vid = vidsCandidate(i) - if (vertexPart.isDefined(vid)) { - vids += vid - attrs += vertexPart(vid) + if (pid2vidIter.hasNext && vertexPartIter.hasNext) { + val pid2vid: Array[Array[VertexId]] = pid2vidIter.next() + val vertexPart: VertexPartition[VD] = vertexPartIter.next() + + Iterator.tabulate(pid2vid.size) { pid => + val vidsCandidate = pid2vid(pid) + val size = vidsCandidate.length + val vids = new PrimitiveVector[VertexId](pid2vid(pid).size) + val attrs = new PrimitiveVector[VD](pid2vid(pid).size) + var i = 0 + while (i < size) { + val vid = vidsCandidate(i) + if (vertexPart.isDefined(vid)) { + vids += vid + attrs += vertexPart(vid) + } + i += 1 } - i += 1 + (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array)) + } + } else { + if (pid2vidIter.hasNext != vertexPartIter.hasNext) { + logError("buildBuffer: Dropped non-empty %s partition".format( + if (pid2vidIter.hasNext) "pid2vid" else "vertex")) } - (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array)) + Iterator.empty } } @@ -184,22 +205,30 @@ private object ReplicatedVertexView { pid2vidIter: Iterator[Array[Array[VertexId]]], activePartIter: Iterator[VertexPartition[_]]) : Iterator[(Int, Array[VertexId])] = { - val pid2vid: Array[Array[VertexId]] = pid2vidIter.next() - val activePart: VertexPartition[_] = activePartIter.next() - - Iterator.tabulate(pid2vid.size) { pid => - val vidsCandidate = pid2vid(pid) - val size = vidsCandidate.length - val actives = new PrimitiveVector[VertexId](vidsCandidate.size) - var i = 0 - while (i < size) { - val vid = vidsCandidate(i) - if (activePart.isDefined(vid)) { - actives += vid + if (pid2vidIter.hasNext && activePartIter.hasNext) { + val pid2vid: Array[Array[VertexId]] = pid2vidIter.next() + val activePart: VertexPartition[_] = activePartIter.next() + + Iterator.tabulate(pid2vid.size) { pid => + val vidsCandidate = pid2vid(pid) + val size = vidsCandidate.length + val actives = new PrimitiveVector[VertexId](vidsCandidate.size) + var i = 0 + while (i < size) { + val vid = vidsCandidate(i) + if (activePart.isDefined(vid)) { + actives += vid + } + i += 1 } - i += 1 + (pid, actives.trim().array) + } + } else { + if (pid2vidIter.hasNext != activePartIter.hasNext) { + logError("buildBuffer: Dropped non-empty %s partition".format( + if (pid2vidIter.hasNext) "pid2vid" else "active vertex")) } - (pid, actives.trim().array) + Iterator.empty } } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala index fe44e1ee0c391..35245971912d7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala @@ -49,24 +49,28 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => - val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() - val numEdges = edgePartition.size - val vSet = new VertexSet - if (includeSrcAttr) { // Add src vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.srcIds(i)) - i += 1 + if (iter.hasNext) { + val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() + val numEdges = edgePartition.size + val vSet = new VertexSet + if (includeSrcAttr) { // Add src vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.srcIds(i)) + i += 1 + } } - } - if (includeDstAttr) { // Add dst vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.dstIds(i)) - i += 1 + if (includeDstAttr) { // Add dst vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.dstIds(i)) + i += 1 + } } + vSet.iterator.map { vid => (vid, pid) } + } else { + Iterator.empty } - vSet.iterator.map { vid => (vid, pid) } } val numPartitions = vertices.partitions.size