Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Adding and Removing Vertices

Description

Dynamic Addition and Removal of Vertexes in Graph API

...

Nowadays more and more people turn to distributed environments to store and analyze Big Data. For that reason there is a growing need in both achieving efficiency and new features. Hama is an upcoming project that gives to researchers and analysts a way to handle big amounts of data, through the BSP computation model.

...

This article will present some new features on the Graph API of Hama, on how to create a dynamic graphs on run time. It also will serve as a technical document of the internals of those features.

...

Introduction

At the time being, we will discuss only two different operations (, addition and deletion) from both points of the end-user and internal architecture. We will give an example on how to use this feature and later on we will describe the internal implementation.

Some points that we need to keep in mind, are:

  1. Deletion/Addition is happening after a super step. We are providing methods inside a vertex instance that create/delete a vertex 2. The vertex API is build on top of BSP peers. That means that each node of your cluster contains a specific number of BSP peers and therefor each BSP peer contains multiple vertices. 3. New vertexes need to be distributed through partitioner to be placed on right peers. 3. Keep in mind that a new created vertex 4. New and old vertexes, will have the same super step counter. Various algorithms change their behavior though time. If such a case, you need to develop your own state counter

User Example

The following code is part of the Graph examples. You can find it in org.apache.hama.examples.DynamicGraph

This is an example of how to manipulate Graphs dynamically. The input of this example is a number in each row. We assume that the is a vertex with

...

ID:1 which is responsible to create a sum vertex that will aggregate the values of the other vertices. During the aggregation, sum vertex will delete all other vertices.

Input example:

  • 1
  • 2
  • 3
  • 4

Output example:

  • sum 12

(we also add the number of vertices that exist in the last superstep from two different methods)

No Format

public class DynamicGraph {
  
  public static class GraphTextReader extends 
      VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {

    @Override
    public boolean parseVertex(LongWritable key, Text value,
            Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {

        vertex.setVertexID(value);
        vertex.setValue(new IntWritable(Integer.parseInt(value.toString())));

        return true;
    }
  }

  public static class GraphVertex extends 
      Vertex<Text, NullWritable, IntWritable> {
    
    private void createSumVertex() throws IOException {
      if (this.getVertexID().toString().equals("1")) {
        Text new_id = new Text("sum");
        this.addVertex(new_id, new ArrayList<Edge<Text, NullWritable>>(), new IntWritable(0));
      }
    }

    private void sendAllValuesToSumAndRemove() throws IOException {
      if (!this.getVertexID().toString().equals("sum")) {
        this.sendMessage(new Text("sum"), this.getValue());
        this.remove();
      }
    }

    // this must run only on "sum" vertex
    private void calculateSum(Iterable<IntWritable> msgs) throws IOException {
      if (this.getVertexID().toString().equals("sum")) {
        int s = 0;
        for (IntWritable i : msgs) {
          s += i.get();
        }
        s += this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter();
        s += this.getNumVertices();
        this.setValue(new IntWritable(this.getValue().get() +s));
      } else {
        throw new UnsupportedOperationException("We have more vertecies than we expected: " + this.getVertexID() + " " + this.getValue()); 
      }
    }

    @Override
    public void compute(Iterable<IntWritable> msgs) throws IOException {
      if (this.getSuperstepCount() == 0) {
        createSumVertex();
      } else if (this.getSuperstepCount() == 1) {
        sendAllValuesToSumAndRemove();
      } else if (this.getSuperstepCount() == 2) {
        calculateSum(msgs);
      } else if (this.getSuperstepCount() == 3) {
        this.voteToHalt();
      }
    }
  }

  public static void main(String[] args) throws IOException, 
        InterruptedException, ClassNotFoundException {
    if (args.length != 2) {
      printUsage();
    }
    HamaConfiguration conf = new HamaConfiguration(new Configuration());
    GraphJob graphJob = createJob(args, conf);
    long startTime = System.currentTimeMillis();
    if (graphJob.waitForCompletion(true)) {
      System.out.println("Job Finished in "
          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
    }
  }

  private static void printUsage() {
    System.out.println("Usage: <input> <output>");
    System.exit(-1);
  }

  private static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException {
    GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
    graphJob.setJobName("Dynamic Graph");
    graphJob.setVertexClass(GraphVertex.class);

    graphJob.setInputPath(new Path(args[0]));
    graphJob.setOutputPath(new Path(args[1]));

    graphJob.setVertexIDClass(Text.class);
    graphJob.setVertexValueClass(IntWritable.class);
    graphJob.setEdgeValueClass(NullWritable.class);

    graphJob.setInputFormat(TextInputFormat.class);

    graphJob.setVertexInputReaderClass(GraphTextReader.class);
    graphJob.setPartitioner(HashPartitioner.class);

    graphJob.setOutputFormat(TextOutputFormat.class);
    graphJob.setOutputKeyClass(Text.class);
    graphJob.setOutputValueClass(IntWritable.class);

    return graphJob;
  }  

}

Starting from creating a class that will serve as a container of our example. In this case, the class is DynamicGraph.

DynamicGraph contains three important parts.

The first component is the declaration of the input reader. This class overrides the parseVertex method and creates vertex instances from an input file. In our case the input is a text file in which, each line has a number. This number is assigned as both the ID and value of the vertex.

No Format

  public static class GraphTextReader extends 
      VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {

    @Override
    public boolean parseVertex(LongWritable key, Text value,
            Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {

        vertex.setVertexID(value);
        vertex.setValue(new IntWritable(Integer.parseInt(value.toString())));

        return true;
    }
  }

The second component is the standard boilerplate to create and submit a GraphJob.

No Format

  public static void main(String[] args) throws IOException, 
        InterruptedException, ClassNotFoundException {
    if (args.length != 2) {
      printUsage();
    }
    HamaConfiguration conf = new HamaConfiguration(new Configuration());
    GraphJob graphJob = createJob(args, conf);
    long startTime = System.currentTimeMillis();
    if (graphJob.waitForCompletion(true)) {
      System.out.println("Job Finished in "
          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
    }
  }

  private static void printUsage() {
    System.out.println("Usage: <input> <output>");
    System.exit(-1);
  }

  private static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException {
    GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
    graphJob.setJobName("Dynamic Graph");
    graphJob.setVertexClass(GraphVertex.class);

    graphJob.setInputPath(new Path(args[0]));
    graphJob.setOutputPath(new Path(args[1]));

    graphJob.setVertexIDClass(Text.class);
    graphJob.setVertexValueClass(IntWritable.class);
    graphJob.setEdgeValueClass(NullWritable.class);

    graphJob.setInputFormat(TextInputFormat.class);

    graphJob.setVertexInputReaderClass(GraphTextReader.class);
    graphJob.setPartitioner(HashPartitioner.class);

    graphJob.setOutputFormat(TextOutputFormat.class);
    graphJob.setOutputKeyClass(Text.class);
    graphJob.setOutputValueClass(IntWritable.class);

    return graphJob;
  }

The most important component is the GraphVertex class. This class is the heart of the program as contains the compute method.

No Format

  public static class GraphVertex extends 
      Vertex<Text, NullWritable, IntWritable> {
    
    private void createSumVertex() throws IOException {
      if (this.getVertexID().toString().equals("1")) {
        Text new_id = new Text("sum");
        this.addVertex(new_id, new ArrayList<Edge<Text, NullWritable>>(), new IntWritable(0));
      }
    }

    private void sendAllValuesToSumAndRemove() throws IOException {
      if (!this.getVertexID().toString().equals("sum")) {
        this.sendMessage(new Text("sum"), this.getValue());
        this.remove();
      }
    }

    // this must run only on "sum" vertex
    private void calculateSum(Iterable<IntWritable> msgs) throws IOException {
      if (this.getVertexID().toString().equals("sum")) {
        int s = 0;
        for (IntWritable i : msgs) {
          s += i.get();
        }
        s += this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter();
        s += this.getNumVertices();
        this.setValue(new IntWritable(this.getValue().get() +s));
      } else {
        throw new UnsupportedOperationException("We have more vertecies than we expected: " + this.getVertexID() + " " + this.getValue()); 
      }
    }

    @Override
    public void compute(Iterable<IntWritable> msgs) throws IOException {
      if (this.getSuperstepCount() == 0) {
        createSumVertex();
      } else if (this.getSuperstepCount() == 1) {
        sendAllValuesToSumAndRemove();
      } else if (this.getSuperstepCount() == 2) {
        calculateSum(msgs);
      } else if (this.getSuperstepCount() == 3) {
        this.voteToHalt();
      }
    }
  }

As we can see, the compute method is calling 3 other methods. In the createSumVertex method we can see the creation of a new vertex with ID the text sum and value 0. Later on, in sendAllValuesToSumAndRemove we can see that each vertex that runs this method is deleting itself by running this.remove();. In the end, calculateSum is called that summarizes the values of all vertices in the sum vertex. The interesting part of the last method is that also adding the number of input vertices this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter() and the current number of vertices that exist on the running superstep this.getNumVertices().

Implementation Details

The GraphJobRunner class, contains the major steps for the graph computation. It starts with an initial setup, where the vertexes are loaded to memory, then there is the main computation loop, where the supersteps occur, and in the end there is the cleanup, where of the graph and the results are written to HDFS.

...