Guide for MRQL Committers

Everything you always wanted to know about the MRQL implementation but were afraid to ask.

Constructing and Manipulating Terms

MRQL uses the Gen package to construct and transform algebraic expressions and physical plans. Gen is a Java preprocessor that extends the Java language with special syntax to make the task of handling Abstract Syntax Trees (ASTs) easier. More information can be found at the Gen Documentation. Source files ending with .gen can be be translated to plain Java files by the Java program Gen.Main, available at the lib/mrql-gen-*.jar. You should never edit generated Java files; instead, you should edit the original .gen files and then convert them to Java.

Code Organization

The Java source code is located at src/main/java. It contains 5 subdirectores: gen, core, MapReduce, BSP, and spark, for the Gen preprocessor, the core MRQL utilities used by all excution engines, the MapReduce execution engine on Apache Hadoop, the BSP execution engine on Apache Hama, and the spark execution engine on Apache Spark.

The Run-Time Data

All data used by the MRQL execution engine are encoded as instances of the MRData class. MRData are WritableComparable data and are similar to the AVRO data format. The MRContainer class is a container for MRData that implements data serialization and deserialization. The following are the MRData subclasses:

a bag is a sequence of MRData. There are three kinds of Bag implementations, which can be converted to each other at run-time, when necessary:
  1. stored in a vector (materialized), used for bags that can fit in memory
  2. stream-based, able to be traversed only once and implemented using Java iterators
  3. spilled to a local file so that it can be accessed multiple times
implemented as a fixed-sized vector

a tagged value, where the tag is the union descriminator. It is used for encoding MRQL data constructions, such as XML and JSON

a wrapper that negates the compare/compareTo results, used for descending order-by
a container for data that resides in HDFS
a container for a template variable, which should only appear in a template
a container for byte
a container for short
a container for int
a container for long
a container for float
a container for double
a container for String
a container for char
a container for bool
used as an event for BSP synchronization
used as an event for BSP synchronization

The Compiler

The MRQL scanner is described in mrql.lex, which provides the class MRQLLex. The MRQL syntax (grammar) is described in mrql.cgen, which provides the class MRQLParser. This is a .gen file that needs to be converted to a .cup file using Gen, and then to be processed by the CUP parser generator, which is a bottom-up shift-reduce parser generator. If one wants to add syntax extensions, such as an XQuery interface to MRQL, the scanner and the parser files must be extended first. More information at JFlex - Online Documentation and CUP User's Manual.

The MRQL Algebraic Forms

MRQL queries are first translated into algebraic forms. The algebra is described at a EDBT'12 paper. The initial algebra contains only 4 operations for Bag processing:

  1. cmap(f,S): Given two arbitrary types a and b, this operation (also known as concat-map or flatten-map in functional programming languages) applies the function f from a to { b } to each element of the input bag S of type { a } and collects the results to form a new bag of type { b }.

  2. groupBy(S): Given two types k and a, for an input bag of pairs S of type { ( k, a ) }, this operation groups the elements of S by the key of type k to form a list of type { ( k, { a } ) }. For example, groupBy({(1,10),(2,20),(1,30)}) is equal to {(1,{10,30}),(2,{20})}.

  3. orderBy(S): same as groupBy(S) but orders the results by the key.

  4. join(kx,ky,r,X,Y): join the bag X of type {a} with the bag Y of type {b} based on the key functions kx and ky from a to k and from b to k, respectively. Then apply the reducer function r to each join pair ({a},{b}), which contains all elements of X and Y with the same key k.

This initial algebra is refined to intermediate algebraic forms that are closer to MapReduce plans. The most important operators are:

  1. mapReduce(m,r,S) uses a map function m and a reduce function r over a bag S. It is equivalent to cmap(r,groupBy(cmap(m,S))) and is implemented using a MapReduce job.

  2. mapReduce2(mx,my,r,X,Y) is a join that maps the elements of X using the left map function mx, maps the elements of Y using the right map function my, groups both results by the join key, and reduces the groups using the reduce function r. It is implemented as a MapReduce job with two mappers.

Query Translation and Optimization

The results of all the query translation and optimization stages are printed using the parameter -trace during query evaluation.

Type Inference

Type inference is done in the class TypeInference. The method type_inference takes an algebraic term or a physical plan as input (of type Tree), it checks whether this term is type-correct, and returns the type of the term (types are also represented as Tree). It uses a symbol table to store declarations, such as the type of a variable or the signature of a function. Unlike modern functional languages such as Haskell, the MRQL type system is not fully type-polymorphic. Instead, there are a fixed number of system functions that are type-polymorphic, such as cmap and groupBy, but user functions can only be monomorphic. The type inference engine uses type unification combined with subtyping to check whether a function call matches the function signature. A call argument whose type is a subtype of the expected parameter type is implicitly coerced to the type. All queries and all the generated algebraic forms and physical plans at all stages are checked for type-correctness. If one wants to make the current type system fully polymorphic, type variables must be introduced and the type unification method must be rewritten completely.

Query Translation and Normalization

After the MRQL queries are parsed by the parser and converted to ASTs, the select-from-where terms are simplified by removing the group-by and order-by parts (method remove_groupby in the class Normalization. This is done by applying the groupBy function to the query and then by binding the query variables that are not used for group-by to the original variable values, one fore each group. That is, non-group-by variables of type T are lifted to bags of type {T}, where the T values are taken from each group. At the same time, variables are renamed with fresh new variable names, to avoid name conflicts. Then, the queries are normalized to simpler forms by the method normalize in the class Normalization. The result is terms of the form select(header,from(,,,),where(predicate)) where header and predicate are algebraic terms and ... contain query bindings of the form bind(v,e), which binds a variable v to a term e.

Plan Generation

The next step is to build the query graph and to generate a good algebraic plan with joins, described in class QueryPlan. It uses a greedy graph reduction algorithm to construct a good algebraic term that uses generalized joins that combine flat joins with group-bys. More details can be found here. The resulting terms still contain select-from-where terms but now all inputs are combined with joins and groupBys.


This stage, described in class Simplification, converts the algebraic terms produced by QueryPlan to initial algebra terms (without select-from-where terms) and simplifies the terms using algebraic rewrite rules (method simplify). There are rewrite rules for fusing two cascading cmaps two one, fusing a groupBy with a join, etc.

Algebraic Optimization

The class AlgebraicOptimization converts the initial algebra to the intermediate algebra using rewrite rules.

Plan Optimization

The final plans that correspond to the MapReduce physical operations are generated by the method makePlan in the class PlanGeneration. When applicable, this method synthesizes the in-mapper combiner and in-reducer aggregations from the reduce function.

Compiling Plans to Java ByteCode

The functional arguments of the physical operators are compiled to Java ByteCode by the method compile in the Compiler class. This is done by generating Java code as a string from expressions and use the Java compiler to translate the string to ByteCode which is put in a jar file. The functional arguments then become references to the compiled code, which is distributed to nodes and loaded at run-time (see Plan.distribute_compiled_arguments and Plan.functional_argument).

The Run-Time Execution Engine

Physical plans can be evaluated in three modes: in memory, in MapReduce mode using Hadoop, and in BSP mode using Hama.

Evaluating Expressions in Memory

When in MapReduce or BSP mode, the functional arguments of the physical operators must be evaluated in memory. When in memory mode, not only the arguments but the physical operations too must be evaluated in memory. The code for evaluating expressions and plans is given in MapReduceAlgebra class. Most methods in this class are higher-order and stream-based. Since Java doesn't support anonymous functions, the functional arguments to operations are instances of the class Function:

   1 abstract public class Function {
   2     abstract public MRData eval ( final MRData arg );
   3 }

To construct an anonymous function, one has to provide an implementation of the method eval. For example, the

   1 new Function(){
   2   public MRData eval ( final MRData x ) {
   3       return new MR_int(((MR_int)x).value()+1);
   4  }

returns a function that increments the argument. Most functions over Bags are stream-based (lazy), by returning a Bag implemented as a Bag iterator. For example,

   1     public static Bag map ( final Function f, final Bag s ) {
   2         final Iterator<MRData> si = s.iterator();
   3         return new Bag(new BagIterator() {
   4                 public boolean hasNext () { return si.hasNext(); }
   5                 public MRData next () { return f.eval(; }
   6             });
   7     }

applies the function f to every element of the Bag s returning a new Bag. That way, intermediate Bags between Bag operations are often not materialized in memory. Some operations though, such as groupBy, must materialize these Bags into vectors in order to be sorted for group-by.

The Interpreter

The evaluation of functional arguments in all three evaluation modes is done by the method evalE of the Interpreter class. When in memory evaluation mode, physical plans are evaluated in memory by the method evalM of the Interpreter class. For the other evaluation modes, there are special interpreters, implemented by the method Evaluator.evalD.

Importing Java Methods

System functions, such as arithmetic operations, comparisons, and coercions, are specified in the SystemFunctions class. These functions, as well as user-defined functions, are imported into MRQL using the methods importClass and importMethod, defined in the class ClassImporter, and are instances of the class MethodInfo.

Data Sources and Document Parsing

The domain of the MRQL physical plan operators is a set of DataSources, called a DataSet. That is, all physical operators get DataSets as input and generate a DataSet as output. A DataSource is an input data source, such as a text file, a key/value map, a data base, an intermediate file. There are three predefined data sources (which are subclasses of DataSource):

for a new text documents, such as CVS, XML, and JSON. A parsed data source instance must implement a new parser that implements the Parser interface:
   1 interface Parser {
   2     public void initialize ( Trees args );
   3     public Tree type ();
   4     public void open ( String file );
   5     public void open ( FSDataInputStream fsin, long start, long end );
   6     public String slice ();
   7     public Bag parse ( String s );
   8 }

These parsers are also used by templates. The method initialize initializes the parser using the extra arguments passed to the MRQL source function (these arguments are passed unevaluated as abstract-syntax trees). The method type returns the MRQL type of the data returned by this data format. The two open methods open the input stream; the first one opens a local file while the second opens an HDFS data split (which may start at any point in the file). The most important methods are slice and parse. The method slice breaks the input into fragments (strings) to be processed by parse (the parser). The parser returns a Bag of values in the MRQL data model. The fragmentation process must be able to work on data splits, which may start at arbitrary points in a file. To accommodate templates, the parser must recognize the syntax {{n}} , where n is an integer constant, and simply return a MR_variable(n) data value to be processed by the MRQL compiler to embed MRQL abstract syntax trees. There are three predefined parsers:

  1. LineParser: A parser for line-oriented, character delimited text files, such as CVS.

  2. XMLParser: an XML parser. It uses XMLSplitter to slice an XML document into strings and XPathParser to parse these strings into XML and to apply an XPath query on the results. The XPath query processor is fully pipelined, using SAX handlers to form pipelines. The XML data type is predefined.
  3. JsonParser: a JSON parser, which is similar to the XML parser. It uses JsonSplitter to slice the input to strings and a the parser JSON.cup and scanner JSON.lex to parse these strings into JSON data, The JSON data type is predefined.

used for storing intermediate results and data dumps into HDFS sequence files.

used for processing the MRQL range syntax min..max, which represents the bag {min,min+1,...,max}. A data generator creates a number of HDFS files, where each file contains an (offset,size) pair that is used for generating the range of long integers [offset,offset+size]. The default size is range_split_size, which can be changed.

Various InputFormats Used

The input formats used by MRQL are instances of the class MRQLFileInputFormat, which is a subclass of Hadoop's FileInputFormat. Subclasses of this input format must implement a RecordReader. There are three input formats, one for each data source:

for a text file, such as CVS, XML, and JSON. It finds the parser associated with the file in a path and parses the file.

for a sequence file (a SequenceFileInputFormat).


for a data generator for the GeneratorDataSource.

These input formats are implemented differently for MapReduce, BSP, and Spark evaluation.

MapReduce Evaluation

Physical plans are evaluated in MapReduce mode by the MapReduceEvaluator.eval method.

The MapReduce Plans

Physical operations are implemented in MapReduce mode in classes that are instances of the MapReducePlan class:


a MapReduce job that may use an in-mapper combiner to partially reduce groups during mapping, or an aggregator to totally aggregate results to a value


a MapReduce job that uses a map stage only (no reduction); it may optionally aggregate results to a value


a MapReduce job with two mappers that implements the MapReduce2 physical operator (a reduce-side join); it may use an aggregator to totally aggregate results to a value

the fragment-replicate join (map-side join) physical operator

the CrossProduct physical operation (similar to block-nested loop)


a MapReduce job that captures a join with an embedded group-by. Similar to matrix multiplication. Still experimental.

BSP Evaluation

The MRQL BSP evaluation framework is described in the paper DataCloud'12 paper.

Generating BSP Plans from MapReduce Plans

The BSPTranslator class translates MapReduce physical plans to BSP plans to be executed on Hama. The only BSP physical operator is

bsp( superstep, initstate, S )

maps a dataset S of type V into a new dataset of type V, by repeating a superstep, which is a function from ({M},V,K) to ({(I,M)},V,K,boolean). The superstep is evaluated by every peer participating in the BSP computation and maps the peer's local snapshot V to a new local snapshot V, receives messages {M} and sends messages {(I,M)} to peers I, The superstep logic is controlled by a DFA: the DFA state is of type K, a superstep makes a transition from a state K to a new state K, and the initial state is initstate.

Each MapReduce operator is translated to one BSP plan by the method BSPTranslator.mr2bsp. For example, a MapReduce job can be evaluated using a BSP job that has two supersteps: one for the map and one for the reduce task. Nested BSP plans are flatten to a single BSP plan (by chaining the supersteps together) by the method BSPTranslator.bspSimplify. That way, most queries require just one BSP operation.

Evaluating BSP Plans on Hama

The BSPEvaluator class is the interpreter that executes BSP plans. The BSPPlan class contains the code for evaluating a BSP plan using Apache Hama. It uses the FileInputFormat's provided by Hama, instead of those from Hadoop. Currently, messages between BSP peers are buffered into bags (which can be spilled to local files). This buffering may not be necessary with the new Hama implementations that dumps the messages to local files.

Spark Evaluation

The SparkEvaluator class is the interpreter that evaluates the MapReduce plans in Spark mode. The evaluation is straightforward because for each MRQL MapReduce plan there is an equivalent Spark operation. The only complication is that Spark uses its own Java functions to capture anonymous functions, so MRQL Function values had to be mapped to Spark functions.

Committers (last edited 2013-10-01 19:09:08 by LeonidasFegaras)