Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle empty partition iterators #367

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.graphx

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.Logging
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.rdd.RDD
Expand All @@ -30,7 +31,8 @@ import org.apache.spark.storage.StorageLevel
*/
class EdgeRDD[@specialized ED: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD)))
with Logging {

partitionsRDD.setName("EdgeRDD")

Expand All @@ -46,7 +48,11 @@ class EdgeRDD[@specialized ED: ClassTag](

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
p.next._2.iterator.map(_.copy())
if (p.hasNext) {
p.next._2.iterator.map(_.copy())
} else {
Iterator.empty
}
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
Expand All @@ -70,8 +76,12 @@ class EdgeRDD[@specialized ED: ClassTag](
private[graphx] def mapEdgePartitions[ED2: ClassTag](
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
} else {
Iterator.empty
}
}, preservesPartitioning = true))
}

Expand Down Expand Up @@ -108,9 +118,17 @@ class EdgeRDD[@specialized ED: ClassTag](
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
if (thisIter.hasNext && otherIter.hasNext) {
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
} else {
if (thisIter.hasNext != otherIter.hasNext) {
logError("innerJoin: Dropped non-empty edge partition from `%s`".format(
if (thisIter.hasNext) "this" else "other"))
}
Iterator.empty
}
})
}

Expand Down
143 changes: 84 additions & 59 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.graphx.impl

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.Logging
import org.apache.spark.util.collection.PrimitiveVector
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.SparkContext._
Expand Down Expand Up @@ -47,7 +48,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
@transient val edges: EdgeRDD[ED],
@transient val routingTable: RoutingTable,
@transient val replicatedVertexView: ReplicatedVertexView[VD])
extends Graph[VD, ED] with Serializable {
extends Graph[VD, ED] with Serializable with Logging {

/** Default constructor is provided to support serialization */
protected def this() = this(null, null, null, null)
Expand All @@ -58,9 +59,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val edTag = classTag[ED]
edges.partitionsRDD.zipPartitions(
replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) =>
val (pid, ePart) = ePartIter.next()
val (_, vPart) = vPartIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
if (ePartIter.hasNext && vPartIter.hasNext) {
val (pid, ePart) = ePartIter.next()
val (_, vPart) = vPartIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
} else {
if (ePartIter.hasNext != vPartIter.hasNext) {
logError("triplets: Dropped non-empty %s partition".format(
if (ePartIter.hasNext) "edge" else "vertex"))
}
Iterator.empty
}
}
}

Expand Down Expand Up @@ -131,22 +140,30 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val newEdgePartitions =
edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) {
(ePartIter, vTableReplicatedIter) =>
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vTableReplicatedIter.next()
assert(!vTableReplicatedIter.hasNext)
assert(ePid == vPid)
val et = new EdgeTriplet[VD, ED]
val inputIterator = edgePartition.iterator.map { e =>
et.set(e)
et.srcAttr = vPart(e.srcId)
et.dstAttr = vPart(e.dstId)
et
}
// Apply the user function to the vertex partition
val outputIter = f(ePid, inputIterator)
// Consume the iterator to update the edge attributes
val newEdgePartition = edgePartition.map(outputIter)
Iterator((ePid, newEdgePartition))
if (ePartIter.hasNext && vTableReplicatedIter.hasNext) {
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vTableReplicatedIter.next()
assert(!vTableReplicatedIter.hasNext)
assert(ePid == vPid)
val et = new EdgeTriplet[VD, ED]
val inputIterator = edgePartition.iterator.map { e =>
et.set(e)
et.srcAttr = vPart(e.srcId)
et.dstAttr = vPart(e.dstId)
et
}
// Apply the user function to the vertex partition
val outputIter = f(ePid, inputIterator)
// Consume the iterator to update the edge attributes
val newEdgePartition = edgePartition.map(outputIter)
Iterator((ePid, newEdgePartition))
} else {
if (ePartIter.hasNext != vTableReplicatedIter.hasNext) {
logError("mapTriplets: Dropped non-empty %s partition".format(
if (ePartIter.hasNext) "edge" else "ReplicatedVertexView"))
}
Iterator.empty
}
}
new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView)
}
Expand Down Expand Up @@ -216,50 +233,58 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (

// Map and combine.
val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vPartIter.next()
assert(!vPartIter.hasNext)
assert(ePid == vPid)
// Choose scan method
val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
.filter(e => vPart.isActive(e.dstId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
if (ePartIter.hasNext && vPartIter.hasNext) {
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vPartIter.next()
assert(!vPartIter.hasNext)
assert(ePid == vPid)
// Choose scan method
val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
.filter(e => vPart.isActive(e.dstId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
}
case Some(EdgeDirection.Either) =>
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
// the index here. Instead we have to scan all edges and then do the filter.
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
}
case Some(EdgeDirection.In) =>
edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
case _ => // None
edgePartition.iterator
}

// Scan edges and run the map function
val et = new EdgeTriplet[VD, ED]
val mapOutputs = edgeIter.flatMap { e =>
et.set(e)
if (mapUsesSrcAttr) {
et.srcAttr = vPart(e.srcId)
}
case Some(EdgeDirection.Either) =>
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
// the index here. Instead we have to scan all edges and then do the filter.
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
if (mapUsesDstAttr) {
et.dstAttr = vPart(e.dstId)
}
case Some(EdgeDirection.In) =>
edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
case _ => // None
edgePartition.iterator
}

// Scan edges and run the map function
val et = new EdgeTriplet[VD, ED]
val mapOutputs = edgeIter.flatMap { e =>
et.set(e)
if (mapUsesSrcAttr) {
et.srcAttr = vPart(e.srcId)
mapFunc(et)
}
if (mapUsesDstAttr) {
et.dstAttr = vPart(e.dstId)
// Note: This doesn't allow users to send messages to arbitrary vertices.
vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
} else {
if (ePartIter.hasNext != vPartIter.hasNext) {
logError("mapReduceTriplets: Dropped non-empty %s partition".format(
if (ePartIter.hasNext) "edge" else "vertex"))
}
mapFunc(et)
Iterator.empty
}
// Note: This doesn't allow users to send messages to arbitrary vertices.
vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
}

// do the final reduction reusing the index map
Expand Down
Loading