Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark-10994] Add local clustering coefficient computation in GraphX #148

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,13 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] = {
StronglyConnectedComponents.run(graph, numIter)
}

/**
* Compute the local clustering coefficient for each vertex
*
* @see [[org.apache.spark.graphx.lib.LocalClusteringCoefficient#run]]
*/
def localClusteringCoefficient(): Graph[Double, ED] = {
LocalClusteringCoefficient.run(graph)
}
} // end of GraphOps
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.lib

import org.apache.spark.graphx._

import scala.reflect.ClassTag

import scala.collection.mutable.ListBuffer

/**
* Local clustering coefficient algorithm
*
* In a directed graph G=(V, E), we define the neighbourhood N_i of a vertex v_i as
* N_i={v_j: e_ij \in E or e_ji \in E}
*
* The local clustering coefficient C_i of a vertex v_i is then defined as
* C_i = |{e_jk: v_j, v_k \in N_i, e_jk \in E}| / (K_i * (K_i - 1))
* where K_i=|N_i| is the number of neighbors of v_i
*
* Note that the input graph must have been partitioned using
* [[org.apache.spark.graphx.Graph#partitionBy]].
*/
object LocalClusteringCoefficient {
/**
* Compute the local clustering coefficient for each vertex and
* return a graph with vertex value representing the local clustering coefficient of that vertex
*
* @param graph the graph for which to compute the connected components
*
* @return a graph with vertex attributes containing
* the local clustering coefficient of that vertex
*
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Double, ED] = {
// Remove redundant edges
val g = graph.groupEdges((a, b) => a).cache()

// Construct set representations of the neighborhoods
// ()
val nbrSets: VertexRDD[VertexSet] =
g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(4)
var i = 0
while (i < nbrs.size) {
// prevent self cycle
if(nbrs(i) != vid) {
set.add(nbrs(i))
}
i += 1
}
set
}

// join the sets with the graph
val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
}

// Edge function computes intersection of smaller vertex with larger vertex
def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexId, Double)] = {
assert(et.srcAttr != null)
assert(et.dstAttr != null)
val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
(et.srcAttr, et.dstAttr)
} else {
(et.dstAttr, et.srcAttr)
}
val iter = smallSet.iterator
val buf = new ListBuffer[(VertexId, Double)]
while (iter.hasNext) {
val vid = iter.next()
if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) {
buf += ((vid, 1.0))
}
}
buf.toIterator
}

// compute the intersection along edges
val counters: VertexRDD[Double] = setGraph.mapReduceTriplets(edgeFunc, _ + _)

// count number of neighbors for each vertex
var nbNumMap = Map[VertexId, Int]()
nbrSets.collect().foreach { case (vid, nbVal) =>
nbNumMap += (vid -> nbVal.size)
}

// Merge counters with the graph
g.outerJoinVertices(counters) {
(vid, _, optCounter: Option[Double]) =>
val dblCount: Double = optCounter.getOrElse(0)
val nbNum = nbNumMap(vid)
if (nbNum > 1) {
dblCount / (nbNum * (nbNum - 1))
}
else {
0
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.graphx.lib

import org.apache.spark.graphx._
import org.scalatest.FunSuite

class LocalClusteringCoefficientSuite extends FunSuite with LocalSparkContext {

def approEqual(a: Double, b: Double): Boolean = {
(a - b).abs < 1e-20
}

test("test in a complete graph") {
withSpark { sc =>
val edges = Array( 0L->1L, 1L->2L, 2L->0L )
val revEdges = edges.map { case (a, b) => (b, a) }
val rawEdges = sc.parallelize(edges ++ revEdges, 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val lccCount = graph.localClusteringCoefficient()
val verts = lccCount.vertices
verts.collect.foreach { case (_, count) => assert(approEqual(count, 1.0)) }
}
}

test("test in a triangle with one bi-directed edges") {
withSpark { sc =>
val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->1L, 2L->0L ), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val lccCount = graph.localClusteringCoefficient()
val verts = lccCount.vertices
verts.collect.foreach { case (vid, count) =>
if (vid == 0) {
assert(approEqual(count, 1.0))
} else {
assert(approEqual(count, 0.5))
}
}
}
}

test("test in a graph with only self-edges") {
withSpark { sc =>
val rawEdges = sc.parallelize(Array(0L -> 0L, 1L -> 1L, 2L -> 2L, 3L -> 3L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val lccCount = graph.localClusteringCoefficient()
val verts = lccCount.vertices
verts.collect.foreach { case (vid, count) => assert(approEqual(count, 0.0)) }
}
}

test("test in a graph with duplicated edges") {
withSpark { sc =>
val edges = Array( 0L->1L, 1L->2L, 2L->0L )
val rawEdges = sc.parallelize(edges ++ edges, 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val lccCount = graph.localClusteringCoefficient()
val verts = lccCount.vertices
verts.collect.foreach { case (vid, count) => assert(approEqual(count, 1.0)) }
}
}
}