What I will do here is the following. In a large, but simple network of persons with "knows" relations, I want to store each person's ego network as a vertex property. The ego network is simply the network of friends of friends ... to a certain limit.
A first observation: most or all of the public domain tutorials on Spark GraphX restrict themselves to API descriptions or very basic usage scenarios. If you want to design your own graph algorithm, you are on your own. Luckily, the Spark GraphX API has a library with a few higher level algorithms from which the connected components algorithm serves as a good example for what we want.
Below, you find the Scala code for extracting the egonetworks that I ended up with. The code can be run in a Jupyter notebook with the IBM Spark kernel.
// Create egonetworks for all vertices with Pregel import scala.reflect.ClassTag import org.apache.spark.rdd.RDD import org.apache.spark.graphx._ import org.apache.spark.graphx.util.GraphGenerators // Creates egonetworks as a Vector((id1, id2), (id1, id3), (id2, id4), etc.) in the vertex attribute // VDego should be serializable (so cannot be an Iterator) type VDego = Vector[(VertexId, VertexId)] // Both vertex atttribute type and Pregel message type A def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxIter: Int): Graph[VDego, ED] = { val egoGraph = graph.mapVertices { case (vid, _) => Vector[(VertexId, VertexId)]() } def sendMessage(edge: EdgeTriplet[VDego, ED]) = { // interpret edges in both directions if (edge.srcAttr.isEmpty) { // First iteration Iterator( (edge.srcId, Vector((edge.srcId, edge.dstId))), // one direction (edge.dstId, Vector((edge.srcId, edge.dstId))) // other direction ) } else { // Second and later iterations def directionIterator(nbAttr: VDego, tgtAttr: VDego, tgtId: VertexId) = ( for ((x1, x2) <- font=""> nbAttr; if !tgtAttr.contains((x1, x2)) ) yield (tgtId, Vector((x1, x2))) ).toIterator // Pregel requires an Iterator, not a Vector directionIterator(edge.dstAttr, edge.srcAttr, edge.srcId) ++ // one direction directionIterator(edge.srcAttr, edge.dstAttr, edge.dstId) // other direction } } val initialMessage: VDego = Vector() // For first run of vprog Pregel(egoGraph, initialMessage, maxIter, activeDirection = EdgeDirection.Either)( vprog = (id, attr, msg) => attr ++ msg, sendMsg = sendMessage, mergeMsg = (a, b) => a ++ b ) } // Simple testcase val ingraph1 = Graph.fromEdgeTuples( sc.parallelize(Vector((1L,2L), (2L,3L), (3L,4L), (4L,5L))), Vector[(VertexId, VertexId)]() ) // Large generated testcase val ingraph2 = GraphGenerators.logNormalGraph(sc, numVertices=100*1000, mu=0.2, sigma=0.1) val maxIter = 3 // Number of rings to be constructed in egonetworks val graph = run(ingraph1, maxIter) println("Egonetworks:\n" + graph.vertices.take(100).mkString("\n\n")) ->
So, what is going on, here?
Pregel iteratively runs a vertex program (vprog) on all vertices that have incoming messages, in this case a simple Vector addition. The vertex properties are initialized with an empty Vector. Apache Spark GraphX' Pregel implementation provides the message generation function sendMsg with the graph's edge triplets, consisting of the edge attribute and the attributes of the source and destination vertices. The algorithm stops when no messages are generated anymore or after a predefined number of iterations.
The sendMsg functions I defined discriminates between the first and later iterations. In the first iteration it generates edge tuples from the vertex ID's. In the later iterations it generate edge tuples from the vector of edge tuples in the vertex attributes. In hindsight, the purpose of the first iteration could also have been achieved with a separate Graph.mapTriplets call prior to the Graph.Pregel call.
EdgeDirection and mergeMsg are obligatory code items intended to optimize the computation. EdgeDirection.In means that an edge triplet only generates messages if the invertex received a message in the previous iteration, etc. The mergeMsg function merges messages originating from one Spark executor to the same destination vertex.
No comments:
Post a Comment