Overview
This article is about how to bring your graph into the graph module of Hama and successfully run algorithms on it.
Google Web dataset (local mode, pseudo distributed cluser)
For this example, the Google web graph from 2002 is used (http://snap.stanford.edu/data/web-Google.html), it contains 875,713 nodes and 5,105,039 edges.
The file itself looks like this:
# Directed graph (each unordered pair of nodes is saved once): web-Google.txt # Webgraph from the Google programming contest, 2002 # Nodes: 875713 Edges: 5105039 # FromNodeId ToNodeId 0 11342 0 824020 0 867923 0 891835 11342 0 11342 27469 11342 38716
To read this kind of file you need to provide a vertex reader like this:
public static class PagerankTextReader extends VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> { String lastVertexId = null; List<String> adjacents = new LinkedList<String>(); @Override public boolean parseVertex(LongWritable key, Text value, Vertex<Text, DoubleWritable, NullWritable> vertex) { String line = value.toString(); String[] lineSplit = line.split("\t"); if (!line.startsWith("#")) { if (lastVertexId == null) { lastVertexId = lineSplit[0]; } if (lastVertexId.equals(lineSplit[0])) { adjacents.add(lineSplit[1]); } else { vertex.setVertexID(new Text(lastVertexId)); for (String adjacent : adjacents) { vertex.addEdge(new Edge<Text, NullWritable>(new Text(adjacent), null)); } adjacents.clear(); lastVertexId = lineSplit[0]; adjacents.add(lineSplit[1]); return true; } } return false; } }
HamaConfiguration conf = new HamaConfiguration(new Configuration()); // if we are in local mode, prevent the file from beeing split conf.set("bsp.local.tasks.maximum", "1"); GraphJob pageJob = new GraphJob(conf, PageRank.class); pageJob.setJobName("Pagerank"); pageJob.setVertexClass(PageRankVertex.class); pageJob.setInputPath(new Path( "/tmp/google-in/")); pageJob.setOutputPath(new Path( "/tmp/google-out/")); // do a maximum of 60 iterations pageJob.setMaxIteration(60); pageJob.set("hama.pagerank.alpha", "0.85"); // we need to include a vertex in its adjacency list, // otherwise the pagerank result has a constant loss pageJob.set("hama.graph.self.ref", "true"); // run until the error is archived pageJob.set("hama.graph.max.convergence.error", "0.001"); // hama takes care that the graph is complete pageJob.set("hama.graph.repair", "true"); pageJob.setAggregatorClass(AverageAggregator.class); // don't get splitted pageJob.setNumBspTask(1); pageJob.setVertexIDClass(Text.class); pageJob.setVertexValueClass(DoubleWritable.class); pageJob.setEdgeValueClass(NullWritable.class); pageJob.setInputKeyClass(LongWritable.class); pageJob.setInputValueClass(Text.class); pageJob.setInputFormat(TextInputFormat.class); pageJob.setVertexInputReaderClass(PagerankTextReader.class); pageJob.setPartitioner(HashPartitioner.class); pageJob.setOutputFormat(TextOutputFormat.class); pageJob.setOutputKeyClass(Text.class); pageJob.setOutputValueClass(DoubleWritable.class); long startTime = System.currentTimeMillis(); if (pageJob.waitForCompletion(true)) { System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); }
You should see the algorithm converge quite fast after nine supersteps. The textfile is sadly not deterministically splitable, so it doesn't converge as fast as if it were executed in parallel.
If you read the results back from the text output, you will see the following top 10 ranked sites:
885605 = 0.00149900065779375 846221 = 0.0010280702392776039 557124 = 8.654234880507804E-4 537039 = 6.634317501245855E-4 163075 = 6.529762251084758E-4 597621 = 6.503367245789417E-4 41909 = 5.845160681337011E-4 551829 = 5.702205338951212E-4 504140 = 5.507901000809657E-4 765334 = 5.432108978490109E-4 486980 = 5.394792436341423E-4
Wikipedia dataset (for smallish clusters)
For this example, the Wikipedia link dataset is used (http://haselgrove.id.au/wikipedia.htm) / (http://users.on.net/~henry/pagerank/links-simple-sorted.zip).
The dataset contains 5,716,808 pages and 130,160,392 links and is unzipped ~1gb large. You should use a smallish cluster to crunch this dataset with Hama, based on the blocksize of HDFS a slot number of 16-32 is required.
The file is formatted like this
from1: to11 to12 to13 ... from2: to21 to22 to23 ...
To read this kind of file you need to provide a vertex reader like this:
public static class WikipediaLinkDatasetReader extends VertexInputReader<LongWritable, Text, Text, NullWritable, DoubleWritable> { @Override public boolean parseVertex(LongWritable key, Text value, Vertex<Text, NullWritable, DoubleWritable> vertex) { String[] vertexAdjacents = value.toString().split(":"); vertex.setVertexID(new Text(vertexAdjacents[0].trim())); String[] split = vertexAdjacents[1].split(" "); for (int i = 0; i < split.length; i++) { if (!split[i].isEmpty()) { vertex.addEdge(new Edge<Text, NullWritable>( new Text(split[i].trim()), null)); } } return true; } }
Now we can setup the job with the following code (assuming that your input is in /tmp/wiki-in/:
HamaConfiguration conf = new HamaConfiguration(new Configuration()); GraphJob pageJob = new GraphJob(conf, PageRank.class); pageJob.setJobName("Pagerank"); pageJob.setVertexClass(PageRankVertex.class); pageJob.setInputPath(new Path( "/tmp/wiki-in/")); pageJob.setOutputPath(new Path( "/tmp/wiki-out/")); // do a maximum of 60 iterations pageJob.setMaxIteration(60); pageJob.set("hama.pagerank.alpha", "0.85"); // we need to include a vertex in its adjacency list, // otherwise the pagerank result has a constant loss pageJob.set("hama.graph.self.ref", "true"); // run until the error is archived pageJob.set("hama.graph.max.convergence.error", "0.001"); // hama takes care that the graph is complete pageJob.set("hama.graph.repair", "true"); pageJob.setAggregatorClass(AverageAggregator.class); pageJob.setVertexIDClass(Text.class); pageJob.setVertexValueClass(DoubleWritable.class); pageJob.setEdgeValueClass(NullWritable.class); pageJob.setInputKeyClass(LongWritable.class); pageJob.setInputValueClass(Text.class); pageJob.setInputFormat(TextInputFormat.class); pageJob.setVertexInputReaderClass(WikipediaLinkDatasetReader.class); pageJob.setPartitioner(HashPartitioner.class); pageJob.setOutputFormat(TextOutputFormat.class); pageJob.setOutputKeyClass(Text.class); pageJob.setOutputValueClass(DoubleWritable.class); long startTime = System.currentTimeMillis(); if (pageJob.waitForCompletion(true)) { System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); }
Now you could export a jar with the job submission as main class and you can submit this via
hama/bin/hama jar YOUR_JAR.jar
to your Hama cluster.
If you sort the result descending by pagerank you can see the following top 10 sites:
1 0.00222 United_States 2 0.00141 2007 3 0.00136 2008 4 0.00126 Geographic_coordinate_system 5 0.00101 United_Kingdom 6 0.00087 2006 7 0.00074 France 8 0.00073 Wikimedia_Commons 9 0.00066 Wiktionary 10 0.00065 Canada
Note that you can decode the indices you may see with the page titels files.
Troubleshooting
If your job does not execute, your cluster may not have enough resources (task slots).
Symptoms may look like this in the bsp master log:
2012-05-27 20:00:51,228 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2012-05-27 20:00:51,288 INFO org.apache.hama.bsp.JobInProgress: num BSPTasks: 16 2012-05-27 20:00:51,305 INFO org.apache.hama.bsp.JobInProgress: Job is initialized. 2012-05-27 20:00:51,313 ERROR org.apache.hama.bsp.SimpleTaskScheduler: Scheduling of job Pagerank could not be done successfully. Killing it! 2012-05-27 20:01:08,334 INFO org.apache.hama.bsp.JobInProgress: num BSPTasks: 16 2012-05-27 20:01:08,339 INFO org.apache.hama.bsp.JobInProgress: Job is initialized. 2012-05-27 20:01:08,340 ERROR org.apache.hama.bsp.SimpleTaskScheduler: Scheduling of job Pagerank could not be done successfully. Killing it!
This was run on a 8 slot cluster, but it required 16 slots because of 64m chunk size of HDFS. Either you can reupload the file with higher chunksize so the slots match the blocks or you can increase the slots in your Hama cluster.