Skip to content

Local and Distributed Traversal Engines

April 19, 2011

In the graph database space, there are two types of traversal engines: local and distributed. Local traversal engines are typically for single-machine graph databases and are used for real-time production applications. Distributed traversal engines are typically for multi-machine graph databases and are used for batch processing applications. This divide is quite sharp in the community, but there is nothing that prevents the unification of both models. A discussion of this divide and its unification is presented in this post.

Local Traversal Engines

In a local traversal engine, there is typically a single processing agent that obeys a program. The agent is called a traverser and the program it follows is called a path description. In Gremlin, a friend-of-a-friend path description is defined as such:

g.v(1).outE('friend').inV.outE('friend').inV

When this path description is interpreted by a traverser over a graph, an instance of the description is realized as actual paths in the graph that match that description. For example, the Gremlin traverser starts at vertex 1 and then steps to its outgoing friend-edges. Next, it will move to the head/target vertices of those edges (i.e. vertices 3 and 4). After that, it will go to the friend-edges of those vertices and finally, to the head of the previous edges (i.e. vertices 6 and 7). In this way, a single traverser is following all the paths that are exposed with each new atomic graph operation (i.e. each new step after a .). The abstract syntax being: step.step.step. What is returned by this friend-of-a-friend path description, on the example graph diagrammed, is vertices 6 and 7.

In many situations, its not the end of the path that is desired, but some side-effect of the traversal. For example, as the traverser walks it can update some global data structure such as a ranking of the vertices. This idea is presented in the path description below, where the outE.inV path is looped over 1000 times. Each time a vertex is traversed over, the map m is updated. This global map m maintains keys that are vertices and values that denote the number of times that each vertex has been touched (groupCount‘s behavior).

m = [:]
g.v(1).outE.inV.groupCount(m).loop(3){it.loops < 1000}

The local traversal engine pattern is abstractly diagrammed on the right, where a single traverser is obeying some path description (a.b.c) and in doing so, moving around on a graph and updating a global data structure (the red boxed map).

Given the need for traversers to move from element to element, graph databases of this form tend to support strong data locality by means of a direct-reference graph data structure (i.e. vertices have pointers to edges and edges to vertices). A few examples of such graph databases include Neo4j, OrientDB, and DEX.

Distributed Traversal Engines

In a distributed traversal engine, a traversal is represented as a flow of messages between the elements of the graph. Generally, each element (e.g. vertex) is operating independently of the other elements. Each element is seen as its own processor with its own (usually homogenous) program to execute. Elements communicate with each other via message passing. When no more messages have been passed, the traversal is complete and the results of the traversal are typically represented as a distributed data structure over the elements. Graph databases of this nature tend to use the Bulk Synchronous Parallel model of distributed computing. Each step is synchronized in a manner analogous to a clock cycle in hardware. Instances of this model include Agrapa, Pregel, Trinity, and GoldenOrb.

An example of distributed graph traversing is now presented using a ranking algorithm in Java. [NOTE: In this example, edges are not first class citizens. This is typical of the state of the art in distributed traversal engines. They tend to be for single-relational, unlabeled-edge graphs.]

public void evaluateStep(int step) {
  if(!this.inbox.isEmpty() && step < 1000) {
    this.rank = this.rank + this.inbox.size();
    for(Vertex vertex : this.adjacentVertices()) {
      for(int i=0; i<this.inbox.size(); i++) {
        this.sendMessage(vertex);
      }
    }
    this.inbox.clear();
  }
}

Each vertex is provided the above piece of code. This code is executed by every vertex at each step (in the Bulk Synchronous Parallel sense — “clock cycle”). For a concrete example, a “start message” to, lets say, vertex 1 initiates the process. Vertex 1 will increment its rank by 1. It will then send messages to its adjacent vertices. On the next step, the vertices adjacent to 1 update their rank by the number of messages they received in the previous step. This process continues until no more messages are found in the system. The aggregation of all the rank values is the result of the algorithm. The general pattern of a Bulk Synchronous Parallel step is: 1.) get messages 2.) process messages 3.) send messages.

When edges are labeled (e.g. friend, created, purchased, etc. — multi-relational graphs), its necessary to have greater control over how messages are passed. In other words, its necessary to filter out particular paths emanating from the start vertex. For instance, in a friend-of-a-friend traversal, created-edges should be ignored. To do so, one can imagine using the Gremlin language within a distributed graph traversal engine. In this model, a step in Gremlin is a step in the Bulk Synchronous Parallel model.

Given the running example of a.b.c in the local engine model, a Gremlin-esque distributed engine example is as follows. A “start message” of a.b.c is sent to vertex 1. Vertex 1 will “pop off” the first step a of the path description and then will pass a message of b.c to those vertices that satisfy a. In this way, at step 2, vertices 2 and 3 have b.c waiting for them in their inbox. Finally, at step 3, vertex 6 receives the message c. If a side-effect is desired (e.g. vertex rankings), a global data structure should be avoided. Given the parallel nature of distributed traversal engines, it is best to avoid thread blocking when writing to a global data structure. Instead, each vertex can maintain a portion of the data structure (the red boxes). Moreover, for non-regular path descriptions (require memory), local data structures can serve as a “scratch pad.” Finally, when the computation is complete a “reduce”-phase can yield the rank results as an aggregation of the data structure components (the union of the red boxes).

There is much work to be done in this area and members of the TinkerPop team are currently researching a unification of the local and distributed models of graph traversal. In this way, certain steps of a path description may be local and then others may be “fanned out” to run in parallel. We are on the edge of our seats to see what will come out of this thought traversal.

Acknowledgements

The contents of this post have been directly inspired by conversations with Alex Averbuch, Peter Neubauer, Andreas Kollegger, and Ricky Ho. Indirect inspiration has come from my various collaborators and all the contributors to the TinkerPop space.

About these ads
One Comment

Trackbacks & Pingbacks

  1. Local and Distributed Traversal Engines « Another Word For It

Comments are closed.

Follow

Get every new post delivered to your Inbox.

Join 141 other followers

%d bloggers like this: