-->

2016-01-02

Extracting ego networks with Apache Spark GraphX

Suppose you want to do graph computations on a social network with tens of millions of vertices, but all your data are stored in a Hortonworks secure Hadoop cluster and are supposed to stay there: what are your options? Having installed Neo4j on the cluster is a no-go: the devops team is having a good time getting the cluster and its associated services in production and distractions cannot be allowed. The first thing that comes to mind is applying Apache Spark GraphX, which is part of the Hortonworks Data Platform. It provides a scalable solution to graph computing based on Google's Pregel algorithm. Is Spark GraphX a suitable tool for devops teams to develop and maintain graph-based information services? Let's see.

What I will do here is the following. In a large, but simple network of persons with "knows" relations, I want to store each person's ego network as a vertex property. The ego network is simply the network of friends of friends ... to a certain limit.
A first observation: most or all of the public domain tutorials on Spark GraphX restrict themselves to API descriptions or very basic usage scenarios. If you want to design your own graph algorithm, you are on your own. Luckily, the Spark GraphX API has a library with a few higher level algorithms from which the connected components algorithm serves as a good example for what we want.

Below, you find the Scala code for extracting the egonetworks that I ended up with. The code can be run in a Jupyter notebook with the IBM Spark kernel.


// Create egonetworks for all vertices with Pregel

import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
 

// Creates egonetworks as a Vector((id1, id2), (id1, id3), (id2, id4), etc.) in the vertex attribute
// VDego should be serializable (so cannot be an Iterator)
type VDego = Vector[(VertexId, VertexId)]  // Both vertex atttribute type and Pregel message type A

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxIter: Int): Graph[VDego, ED] = {
  val egoGraph = graph.mapVertices { case (vid, _) => Vector[(VertexId, VertexId)]() }
    
  def sendMessage(edge: EdgeTriplet[VDego, ED]) = {
    // interpret edges in both directions
    if (edge.srcAttr.isEmpty) { // First iteration
      Iterator(
        (edge.srcId, Vector((edge.srcId, edge.dstId))),              // one direction
        (edge.dstId, Vector((edge.srcId, edge.dstId)))               // other direction
      )
    } else { // Second and later iterations
      def directionIterator(nbAttr: VDego, tgtAttr: VDego, tgtId: VertexId) = (
        for ((x1, x2) <- font=""> nbAttr; if !tgtAttr.contains((x1, x2))
        ) yield (tgtId, Vector((x1, x2)))
      ).toIterator // Pregel requires an Iterator, not a Vector
      directionIterator(edge.dstAttr, edge.srcAttr, edge.srcId) ++   // one direction
        directionIterator(edge.srcAttr, edge.dstAttr, edge.dstId)    // other direction
    }
  }

  val initialMessage: VDego = Vector()  // For first run of vprog
  Pregel(egoGraph, initialMessage, maxIter, activeDirection = EdgeDirection.Either)(
    vprog = (id, attr, msg) => attr ++ msg,
    sendMsg = sendMessage,
    mergeMsg = (a, b) => a ++ b
  )
}
 
// Simple testcase
val ingraph1 = Graph.fromEdgeTuples(
  sc.parallelize(Vector((1L,2L), (2L,3L), (3L,4L), (4L,5L))),
  Vector[(VertexId, VertexId)]()
)
 
// Large generated testcase
val ingraph2 = GraphGenerators.logNormalGraph(sc, numVertices=100*1000, mu=0.2, sigma=0.1)
 
 
val maxIter = 3  // Number of rings to be constructed in egonetworks
val graph = run(ingraph1, maxIter)
println("Egonetworks:\n" + graph.vertices.take(100).mkString("\n\n"))
 

So, what is going on, here?


Pregel iteratively runs a vertex program (vprog) on all vertices that have incoming messages, in this case a simple Vector addition. The vertex properties are initialized with an empty Vector. Apache Spark GraphX' Pregel implementation provides the message generation function sendMsg with the graph's edge triplets, consisting of the edge attribute and the attributes of the source and destination vertices. The algorithm stops when no messages are generated anymore or after a predefined number of iterations.
The sendMsg functions I defined discriminates between the first and later iterations. In the first iteration it generates edge tuples from the vertex ID's. In the later iterations it generate edge tuples from the vector of edge tuples in the vertex attributes. In hindsight, the purpose of the first iteration could also have been achieved with a separate Graph.mapTriplets call prior to the Graph.Pregel call.
EdgeDirection and mergeMsg are obligatory code items intended to optimize the computation. EdgeDirection.In means that an edge triplet only generates messages if the invertex received a message in the previous iteration, etc. The mergeMsg function merges messages originating from one Spark executor to the same destination vertex.

Conclusion

Playing with Spark GraphX is fun, but the abstraction level needed for developing new algorithms is simply too high to be useful to ordinary devops teams. I will keep you posted with alternatives that I intend to explore.


No comments:

Post a Comment