MRQL: A MapReduce Query Language

MRQL is an SQL-like query language for large-scale data analysis on a computer cluster. The MRQL system can execute MRQL queries in three modes: in MapReduce mode on Apache Hadoop, in BSP (Bulk Synchronous Parallel) mode on Apache Hama, and in Spark mode using Apache Spark. The MRQL query language is powerful enough to express most common data analysis tasks over many forms of raw data, such as XML and JSON documents, binary files, and line-oriented text documents with comma-separated values. In contrast to SQL, MRQL supports a richer data model (including nested collections), arbitrary query nesting, and user-defined types and functions.

To evaluate MRQL queries in MapReduce mode, you must run the script bin/mrql in the MRQL installation directory. To evaluate MRQL queries in BSP mode, you must run the script bin/mrql.bsp. To evaluate MRQL queries in Spark mode, you must run the script bin/mrql.spark. First, you need to change the first lines of conf/mrql-env.sh to point to your installation directories. The directory tests/queries contains various MRQL example queries over small data sets.

Evaluating MRQL Queries in MapReduce Mode

Before deploying your MRQL queries on a Hadoop cluster, you can run these queries in memory on a small amount of data using the command:

mrql

which evaluates MRQL top-level commands and queries from the input until you type quit. To run MRQL in Hadoop's standalone mode (single node on local files), use:

mrql -local

To run MRQL in Hadoop's fully distributed mode (cluster mode), use:

mrql -dist

To evaluate a file that contains MRQL queries in any of these three modes, put the file name in the command. For example:

mrql -local queries/q1.mrql

Evaluating MRQL Queries in BSP Mode

Before deploying your MRQL queries on a Hama cluster, you can run them in memory on a small amount of data using the command:

mrql.bsp

To run MRQL in Hama's standalone mode (single node on local files), use:

mrql.bsp -local

To run MRQL in Hama's fully distributed mode (cluster mode), use:

mrql.bsp -dist -nodes n

where n is the number of parallel tasks (it should not exceed the number of available cores in the cluster). To evaluate a file that contains MRQL queries in any of these three modes, put the file name in the command. For example:

mrql.bsp -local queries/q1.mrql

Evaluating MRQL Queries in Spark Mode

To run MRQL in Spark mode on a single node on local files, use:

mrql.spark -local -nodes n

where n is the number of available cores in the node. To run MRQL in Hama's fully distributed mode in a standalone Spark cluster, use:

mrql.spark -dist -nodes n

where n is the number of parallel tasks (it can be more than the number of cores in the cluster). To evaluate a file that contains MRQL queries in any of these three modes, put the file name in the command.

The MRQL Command Parameters

You can control some aspects of query execution by providing the following parameters to the mrql, mrql.bsp, and mrql.spark commands:

The extra parameters that do not start with a - are stored in the MRQL variable args as a list of type list(string), which is directly accessible from MRQL queries. It can be used to pass the path names of input data sources to the query.

The MRQL Top-Level Commands

To evaluate an MRQL expression (query) e, simply write the expression followed by semicolon (you may break the expression into multiple lines):

e;

MRQL will typecheck, optimize, and evaluate the query, and it will print the query result. In addition, you may define a variable v to be a synonym for the expression e (it does not evaluate e) using:

v = e;

or you may evaluate e and store its value into the variable v:

store v := e;

The value of e can be dumped to an HDFS binary file with path name fname (a string) using:

store fname from e;

The file can then be read using the MRQL expression source(binary,fname).

The value of e can be dumped to a text file with path name fname (a string) using:

dump fname from e;

If e is a bag of tuples, the tuples will be written in CVS form.

Functions can be declared using the syntax:

function f ( v1: t1, ..., vn: tn ) : t { e };

where v1,...,vn are the parameter names, t1,...,tn are their types, t is the return type, and e is the function body (an MRQL expression). For example:

function fact ( n: int ): int { if n <= 0 then 1 else n*fact(n-1) };

Macros are inlined functions that can be declared using the syntax:

macro f ( v1, ..., vn ) { e };

where v1,...,vn are the parameter names and e is the macro body (an MRQL expression). Macros cannot be recursive. A macro call is replaced with the macro body at compile-time, after substituting the macro parameters with the call arguments.

To include an MRQL source file f.mrql, use:

include 'f.mrql';

The MRQL Data Model

MRQL supports the following types:

where t, t1,...,tn are types. MRQL supports the usual arithmetic and comparison operations for numbers. An integer constant is of type int, a real number constant is a float. They can be up-coerced using the syntax e as t. For example, 1 as float. Arithmetic expressions are overloaded to work on multiple numerical types, such as 10+3.4E2. A bool can only be true or false. Boolean conditions can be checked with the if e1 then e2 else e3 syntax and can be combined with the and, or, and not operators. Strings are concatenated with +. Tuples are constructed using ( e1, ..., en ) and records are constructed using < A1: e1, ..., An: en >, where e1, ..., en are expressions. To get the ith element of a tuple x (starting from 0), use x#i. To get the A component of a record x, use x.A.

Lists are constructed using [ e1, ..., en ] while bags are constructed using { e1, ..., en }, where e1, ..., en are expressions. The difference between a list and a bag is that a list supports order-based operations, such as indexing e1[e2] and subsequence e1[e2:e3]. The range n..m, where n and m are MRQL expressions that evaluate to long integers, creates the list [n,n+1,...,m-1,m] of m-n+1 elements. Lists and bags can be queried using the select-query syntax and can be combined using union, intersect, and except. Any value x of type bag( (k,v) ) (ie, a bag of pairs), where k and v are arbitrary types, is also a map, which binds keys to values. In addition to bag operations, a map also supports key indexing x[key], for a key value key of type k, to return the value of type v associated with key, if exists (a run-time error otherwise). For example, {('a',1),('b',2)}['a'] returns 1. For a string key 'a' (a name), one may also write x.a for x['a'].

A user-defined type T is defined at the top level:

type T = t;

where t is a type. It should not be recursive.

A data type T is also defined at the top level:

data T = C1: t1 | ... | Cn: tn;

where C1, ..., Cn are globally defined data constructors and t1, ..., tn are types. A data type can be recursive. It provides n ways of constructing the data type. When checking for type equivalence, MRQL uses structural equality for user-defined types and name equality for data types.

For example, a linear integer list can be defined as follows:

data IList = Cons: (int,IList) | Nil: ();

Then, Cons(1,Cons(2,Nil())) constructs the list [1,2].

XML is a predefined data type, defined as:

data XML = Node: ( String, bag( (String,String) ), list(XML) )
         | CData: String;

That is, XML data can be constructed as nodes (which are tuples that contain a tagname, a list of attribute bindings, and a list of children) or text leaves (CData). For example, <a x='1'><b>text</b></a> is constructed using

Node('a',{('x','1')},[Node('b',{},[CData('text')])])

The JSON type is defined as:

data JSON = JObject: bag( (string,JSON) )
          | JArray: list(JSON)
          | Jstring: string
          | Jlong: long
          | Jdouble: double
          | Jbool: bool
          | Jnull: ();

For example, the JSON object { 'a': 1 } is constructed using Jobject({('a',Jlong(1 as long))}).

A persistent collection type !t is a collection of type t that is consumed or produced by a MapReduce job (and consequently stored in HDFS). Normally, the MRQL type system infers these types at compile-time. For example, in the following MRQL query that computes DFT (the Discrete Fourier Transform) over the sparse vector M of type !list( ( int, double ) ) and length N:

select ( k, sum(select v*cos(2*PI*k*i/N) from (i,v) in m) )
from m in M, j in 1..N
group by k: j;

the range expression 1..N computes a non-persistent list of type list(int). The MRQL optimizer will evaluate the query using a single MapReduce job over M, since this is the only persistent type, while 1..N is generated and used during the map. Without these annotations, the evaluation would have stored 1..N in HDFS and would have performed an expensive cross product between M and 1..N. Although these annotations are inferred for most MRQL expressions, they must be explicitly specified in UDFs (by annotating the UDF types) to help the MRQL compiler choose which computations are to be executed as Hadoop jobs and which to be executed in memory.

Patterns

Patterns are used in select-queries and case statements. They are compiled away from expressions before query optimization. In general, a pattern can be

where p1,...,pn are patterns. Note that a record pattern can match a record that has additional components and that the order of components is ignored. A pattern variable can appear multiple times in a pattern or across patterns in nested queries, which requires that these variables be bound to equal values. An irrefutable pattern is a pattern that does not contain constants, data constructions, lists, or repeated variables. An irrefutable pattern matches any expression of the same type.

A case statement takes the following form:

case e { p1: e1 | ... | pn: en }

where p1,...,pn are patterns and e, e1,...,en are expressions. The type of e must be the same as the types of the patterns p1,...,pn. The last pattern pn must be irrefutable. The case statement will try to match the value of e with the patterns p1,...,pn (in that order). The first pattern to succeed, pi (in the worst case, it will be pn), will bind the pattern variables to values, and will use them in ei. For example,

case e { Node(*,*,Node('a',*,cs)): cs | *: [] }

Finally, the expression

let p = e in e'

matches the irrefutable pattern p with the result of the expression e, binding the pattern variables, and returns the result of the expression e' under these bindings. For example, let x = 1 in x+1.

Accessing Data Sources

The MRQL expression that makes a directory of raw files accessible to a query is:

source(parser,path,...args)

where path is the URI of the directory that contains the source files (a string), parser is the name of the parser to parse the files, and args are various parameters specific to the parsing method. It returns a !bag(t), for some t, that is, it returns a MapReduce type. Currently, there are four supported parsers: line, xml, json, and binary, but it is easy to define and embed your own parser (explained later).

Parsing Flat Files

The line parser parses record-oriented text documents that contain basic values separated by a user-defined delimiter (a string):

source( line, path, c, type( < A1: t1, ..., An: tn > ) )

or

source( line, path, c, type( ( t1, ..., tn ) ) )

where ti is a basic type or any, and c is the delimiter. If ti is any, the data value is skipped and the record component is ignored. This expression reads the files in path one line at a time, it uses the delimiter to split each line into components, and constructs a record from these components. It skips the components of type any and those after the last component An. The first form returns a persistent bag of type !bag(< A1: t1, ..., An: tn >) while the latter returns !bag( (t1,..., tn ) ), that contain only those components whose type is not any.

For example, the expression

source(line,'employee.txt',',',type(<name:string,dno:int,phone:any,address:string>))

parses a CSV file and returns a bag(<name:string,dno:int,address:string>), since the phone is skipped.

Parsing XML Documents

The MRQL expression used for parsing an XML document is:

source( xml, path, tags, xpath )

where tags is a bag of synchronization tags and xpath is the XPath expression used for fragmentation. Given a data split from the document, this operation skips all text until it finds the opening of a synchronization tag and then stores the text up to the matching closing tag into a buffer. The buffer then becomes the current context for xpath, which is evaluated in stream-like fashion using SAX, returning XML objects constructed in our MRQL data model. if xpath is omitted, it is xpath(.), where the 'dot' returns the current context, which is the entire fragment in the buffer. Note that, although the document order is important for XML data, this order is ignored across fragments but is preserved within each fragment, as expected, since data splits are processed by worker nodes in parallel.

For example, the following expression:

XMark = source(xml,'xmark.xml',{'person'});

binds the variable XMark to the result of parsing the document 'xmark.xml' and returns a list of person elements. A more complex example is:

DBLP = source( xml, 'dblp.xml', {'article','incollection','book','inproceedings'},
               xpath(.[year=2009]/title) )

which retrieves the titles of certain bibliography entries published in 2009 from DBLP. Here, we are using multiple synchronization tags since we are interested in elements of multiple tagnames.

MRQL also provides syntax to navigate through XML data. The projection operation e.A has been overloaded to work on XML data. Given an expression e of type XML or list(XML), e.A returns a list(XML) that contains the subelements of e with tagname A (much like e/A in XPath). Similarly, the syntax e.*, e.@A, and e.@* correspond to the XPaths e/*, e/@A, and e/@*, respectively.

Parsing JSON Documents

The expression to parse JSON files is:

source( json, path, names )

where names is a bag of synchronization attribute names in JSON objects. It returns a bag of JSON objects that contain name/value pairs where name is from names. JSON object values can be accessed using projection, eg, if x = Jobject({('a',Jlong(1 as long))}) then x.a or x['a'] will return Jlong(1).

The Query Syntax

The Select-Query

The select-query syntax in MRQL takes the form:

select [ distinct ] e
from p1 in e1, ..., pn in en
[ where ec ]
[ group by p': e' [ having eh ] ]
[ order by e0 [ limit el ] ]

where the syntax enclosed in [ ] is optional and ... means a sequence of similar bindings. Expressions e, e1,...,en, ec, e', eh, e0, and el are arbitrary MRQL expressions, which may contain other nested select-queries. An MRQL query works on collections of values, which are treated as bags by the query, and returns a new collection of values. If it is an order-by query, the result is a list, otherwise, it is a bag. The from part of an MRQL syntax contains query bindings of the form p in e, where p is a pattern and e is an MRQL expression that returns a collection. The pattern p matches each element in the collection e, binding its pattern variables to the corresponding values in the element. In other words, this query binding specifies an iteration over the collection e, one element at a time, causing the pattern p to be matched with the current collection element. If the element value doesn't match the pattern, it is ignored. The shorthand p = e is equivalent to p in {e}, that is, the pattern p must be matched to the single element e. For example, the following query:

select (n,cn)
from < name: n, children: cs > in Employees,
     < name: cn > in cs

iterates over Employees, and for each employee record, it matches the record with the pattern <name: n, children: cs>, which binds the variables n and cs to the record components name and children, respectively, and ignores the others. Without patterns, this query is equivalent to:

select (e.name,c.name)
  from e in Employees,
       c in e.children

This is a dependent join because the domain of the second query variable c depends on e.

Group-By

The group-by syntax of an MRQL query takes the form group by p': e'. It partitions the query results into groups so that the members of each group have the same e' value. The pattern p' is bound to the group-by value, which is unique for each group and is common across the group members. As a result, the group-by operation lifts all the other pattern variables defined in the from-part of the query from some type T to a bag of T, indicating that each such variable must contain multiple values, one for each group member.

For example, the query

select ( d, c, sum(s) )
from <dno:dn,salary:s> in Employees
group by (d,c): ( dn, salary>=100000 )
having avg(s) >= 80000

groups Employees by dno and by whether their salary is greater than 100K. The variables d and c in the query header are directly accessible since they are group-by variables. The variable s, on the other hand, is lifted to a bag of integers, which contains the salaries of all employees in the group. The having condition filters out the groups that contain employees whose average salary is less than 80K.

As another example, the following query on XMark data:

select ( cat, os, count(p) )
from p in XMark,
     i in p.profile.interest
group by ( cat, os ): ( i.@category,
                        count(p.watches.@open_auctions) )

groups all persons according to their interests and the number of open auctions they watch. For each such group, it returns the number of persons in the group. The XMark data source returns the person elements, so that p is one person, and i is one of p's interests. The variables cat and os in the query header are directly accessible since they are group-by variables. The variable p, on the other hand, is lifted to a bag of XML elements. Thus, count(p) counts all persons whose interests include cat and watch os open auctions.

If the group-by values are variables, the group-by syntax can take the form group by p, for a pattern p, which is equivalent to group by p: p. For example, the sparse matrix X of type {(double,int,int)} is converted to a vector by summing up the columns of each row:

select (sum(x),i)
from (x,i,j) in X
group by i

which is equivalent to:

select (sum(x),ii)
from (x,i,j) in X
group by ii: i

Complex group-by queries may use the special variable partition, which is a bag of records that contains the entire group (the non-group-by variables). For example, the matrix multiplication of two sparse matrices X and Y of type  {(double,int,int) } is:

select ( sum(select p.x*p.y from p in partition), i, j )
from (x,i,k) in X, (y,k,j) in Y
group by (i,j)

Here, the partition has type {{{{ < x:double, k:int, y:double > }}}} and contains the group associated with i and j. This query can also be written as:

select ( sum(z), i, j )
from (x,i,k) in X, (y,k,j) in Y, z = x*y
group by (i,j)

Order-By

The order by e0 syntax orders the result of a query (after the optional group-by) by the e0 values. It is assumed that there is a default total order defined for all data types (including tuples and bags). The special parametric type Inv(T), which has a single data constructor inv(v) for a value v of type T, inverts the total order of T from to . For example,

order by ( inv(count(p.watches.@open_auctions)), p.name )

orders people by major order count(p.watches.@open_auctions) (descending) and minor order p.name (ascending).

Examples

A more complex query, which is similar to the query Q10 of the XMark benchmark, is

select ( cat, count(p), select text(x.name) from x in p )
from p in XMark,
     i in p.profile.interest,
     c in XMark
where c.@id = i.@category
group by cat: text(c.name)

which uses an XML source that retrieves both persons and categories:

XMark = source(xml,'xmark.xml',{'person','category'},xpath(.));

It groups persons by their interests, and for each group, it returns the category name, the number of people whose interests include this category, and the set of names of these people. The text function returns the textual content of element(s).

As yet another example over the DBLP bibliography:

DBLP = source( xml, 'dblp.xml', {'article','incollection','book','inproceedings'}, xpath(.) )

the following query

select ( select text(a.title) from a in DBLP where a.@key = x,
         count(a) )
from a in DBLP,
     c in a.cite
where text(c) <> '...'
group by x: text(c)
order by inv(count(a))

inverts the citation graph in DBLP by grouping the items by their citations and by ordering these groups by the number of citations they received. The condition text(c) <> '...' removes bogus citations. Note that, the DBLP source is also used in the query header to retrieve the citation title. It is translated into two MapReduce operations.

Quantification

Existential and universal quantifications use bindings similar to those in select-queries:

some p1 in e1, ..., pn in en: e
all p1 in e1, ..., pn in en: e

where e is a boolean predicate. They both return a boolean. The existential quantification returns true if at least one binding combination makes e true. The universal quantification returns true if every binding combination makes e true. For example,

select p.name
from p in Employee
where all c in p.children: c.age > 18

returns the name of an employee if all her children are older than 18.

Nested Queries

SQL-like MapReduce query languages (such as Hive) use outer-joins combined with group-bys to simulate nested queries. This is a very bad idea because these queries are hard to understand and they are inefficient, since they force the optimizer to use a suboptimal evaluation method. MRQL has banned outer-joins completely, as all modern query languages have (eg, OQL and XQuery). SQL queries with outer-joins/group-bys can be more elegantly expressed using nested queries. MRQL translates deeply nested queries, of any form and at any nesting level, into efficient generalized joins that nest the data during the join in such a way that the data nesting reflects the query nesting, thus making unnecessary the need for an explicit group-by. This is often far better than using a flat outer-join followed by a group-by. Nested queries, along with dependent joins, make MRQL MapReduce-complete because any MapReduce computation can be coded directly and declaratively in MRQL. More specifically, any MapReduce computation over a dataset S with an arbitrary map function m and an arbitrary reduce function r can be expressed in MRQL as:

select w
  from z in (select r(k,y)
               from x in S,
                    (k,y) in m(x)
             group by k),
       w in z

For example, it is very hard to express scientific computations directly in SQL, such as matrix 'smoothing', where each element M[i,j] of a sparse matrix M becomes the average of the 3*3 neighboring elements M[i-1,j-1], ..., M[i+1,j+1]. In MRQL, this is simply:

select (avg(v),i,j)
from s in (select {(v,i-1,j-1),(v,i-1,j),(v,i-1,j+1),(v,i,j-1),
                   (v,i,j),(v,i,j+1),(v,i+1,j-1),(v,i+1,j),(v,i+1,j+1)}
             from (v,i,j) in M),
     (v,i,j) in s
group by (i,j)

which is translated into a single MapReduce job.

Repetition

There are three repetition forms that have similar syntax but different types and semantics. The repetition syntax is:

repeat v = e step body [ limit n ]

where v is the repetition variable. The type of the expressions e is a bag(T), for some type T, and the type of the optional limit n is int. Then the type of v is also bag(T).

The first repetition form is called a closure. The body of a closure has type bag(T), which is the same as the types of v and e. This closure first binds v to the value of e and then it evaluates the body repeatedly and assigns its value to v. It stops if either the number of repetitions becomes n or the number of elements of the new value of v becomes less than or equal to the number of elements of the old value. That is, a closure evaluates as follows (in pseudo Java code):

   1 i = 0;
   2 new = 0;
   3 old = 0;
   4 v = e;
   5 do {
   6    v = body(v);
   7    i++;
   8    old = new;
   9    new = size(v);
  10 } while (old < new && i < n);

For example, the following query computes the connectivity graph of G, which has an edge i-j if there is a path in G from i to j:

repeat s = G
  step select distinct c
         from s in (select { (i,k), (k,j), (i,j) }
                      from (i,k) in s, (k,j) in s),
              c in s

Every repetition step requires two MapReduce jobs: one for the self-join over s, and one to remove duplicates.

For the second repetition form, the type of body must be bag( (T,bool) ). This repetition form first binds v to the value of e and then it evaluates the body repeatedly and assigns a new value to v, which is equal to select x from (x,*) in body. It stops if either the number of repetitions becomes n or when all (*,b) in body: not b, that is, when all the booleans returned by body are false. That is, this repetition form evaluates as follows (in pseudo Java code):

   1 i = 0;
   2 v = e;
   3 do {
   4    s = body(v);
   5    i++;
   6    v = select x from (x,*) in s;
   7 } while ((some (*,b) in s: b) && i < n);

For example, a simplified PageRank algorithm is as follows:

// preprocessing: for each node, group its outgoing links into a bag
graph = select (key,select x.to from x in n)
          from n in source(line,'queries/links.txt',',',type(<id:string,to:string>))
         group by key: n.id;

store graph_size := count(graph);

// damping factor
factor = 0.85;

repeat nodes = select < id: key, rank: 1.0/graph_size as double, adjacent: al >
                 from (key,al) in graph
    step select (< id: m.id, rank: n.rank, adjacent: m.adjacent >,
                 abs((n.rank-m.rank)/m.rank) > 0.1)
           from n in (select < id: key, rank: (1-factor)/graph_size+factor*sum(select x.rank from x in c) >
                        from c in ( select < id: a, rank: n.rank/count(n.adjacent) >
                                      from n in nodes, a in n.adjacent )
                      group by key: c.id),
                m in nodes
          where n.id = m.id
    limit 10;

The binding to graph builds the web graph by grouping together all the outgoing links of each node. The repetition starts by initializing the rank of all nodes with the same value. The repeat body propagates the weight of each node to its neighbors. The entire process terminates in 10 steps or when the difference between the new and the old rank of every node goes below a threshold. The MRQL translation of this expression requires one MapReduce job for the initial step and only one MapReduce job for the repeat body. The repetition itself does not require any additional MapReduce job to check the condition. It is simply done using a Hadoop Java counter to count the true values across all task trackers. For the Hungarian web graph (available at Erdo‘s WebGraph), which has 500K nodes and 14M links, the pagerank required 8 iterations (a total of 9 MapReduce jobs).

The third repetition form has the syntax:

repeat (v1, ..., vn) = e step body limit m

Here, the body returns a tuple of size n that provides new values for the variables v,i,. The repeatition stops in m steps. For example, the following macro, factorize, performs a Gaussian non-negative matrix factorization over the matrix V into the matrices H and W:

macro transpose ( X ) {
  select (x,j,i)
    from (x,i,j) in X
};
macro multiply ( X, Y ) {
  select (sum(z),i,j)
    from (x,i,k) in X, (y,k,j) in Y, z = x*y
   group by (i,j)
};
macro Cmult ( X, Y ) {
  select ( x*y, i, j )
    from (x,i,j) in X, (y,i,j) in Y
};
macro Cdiv ( X, Y ) {
  select ( x/y, i, j )
    from (x,i,j) in X, (y,i,j) in Y
};
macro factorize ( V, Hinit, Winit ) {
  repeat (H,W) = (Hinit,Winit)
    step ( Cmult(H,Cdiv(multiply(transpose(W),V),multiply(transpose(W),multiply(W,H)))),
           Cmult(W,Cdiv(multiply(V,transpose(H)),multiply(W,multiply(H,transpose(H))))) )
   limit 10
};

where Hinit and Winit are the initial values of H and W.

Templates

The parser of an input format can also be used for syntax-directed construction/deconstruction of data. A template takes the form:

[parser| ... |]

where parser is an MRQL parser (xml, json, or a user-defined parser) and ... is text in the language recognized by the parser. It returns the data constructed by the parser. For example, [xml|<a>1</a>|] is equal to Node('a',{},[CData('1')]). You can escape from the template mode into the MRQL mode using {{e}}, where e is an MRQL expression. For example, [xml| <a>{{CData('1')}}</a> |] evaluates CData('1') and embeds its value inside the XML element. Template and MRQL modes can be freely nested. Templates can also be used in patterns to deconstruct a value. For example,

case e { [xml|<a>{{x}}</a>|]: x | *: e  }

Data Generation

To create a parallel program independent of data size, MRQL supports the range syntax n...m, where n and m are MRQL expressions that evaluate to long integers. It creates a virtual bag {n,n+1,...,m-1,m} of m-n+1 elements, which has type !bag(long) and is generated across multiple map tasks. The number of parallel tasks is determined by the MRQL parameter -split_size: it is equal to (m-n+1)/split_size. Note that MRQL does not store this bag in memory or on disk. Instead, it generates one small file for each mapper that contains two numbers: an offset and a size, which are used by a special InputFileFormat to generate the range numbers on demand. A more general form of data generation is gen(n,m,s), which is equal to n...m but with split_size = s. For example, the MRQL query:

select distinct randomEdge(0,100000,0,100000)
from i in 1...1000000;

where randomEdge generates a random edge in a graph using the R-MAT algorithm, generates graph edges across multiple tasks and removes the duplicates.

User-Defined Functions (UDFs)

Functions can be declared using the following MRQL top-level syntax:

function f ( v1: t1, ..., vn: tn ) : t { e };

where v1,...,vn are the parameter names, t1,...,tn are their types, t is the return type, and e is the function body (an MRQL expression). The body of a UDF may contain MapReduce computations. This is determined by the UDF type annotations. Anonymous functions are defined using the following syntax:

Passing persistent collections to a UDF requires the special persistent annotation !t. For example, the UDF:

function g ( s: !bag(<name:string,dno:int,address:string>) ): !bag(<A:string,B:int>) {
   select <A:x.name,B:x.dno> from x in s
};

will execute the UDF body using one MapReduce job, since the parameter s is a persistent bag. Therefore, the query

select (e,d)
from e in g(select x from x in E where x.name='A'),
     d in D
where e.B=d.dno;

where E and D are persistent bags, will execute a map job for the inner select, then will call g (which is one MapReduce job), and then will perform the join. If we had declared s in g with a non-persistent type, then the query would still be type-correct (since !bag is a subtype of bag), but the argument to g would have been materialized in memory and the body of g would have been executed in memory.

User-Defined Aggregations

There are some predefined aggregates: count, which counts the elements of any bag, sum, avg, max, and min, which aggregate bags of int/long/float/double numbers, and some and all, which aggregate bags of booleans. One can define a new aggregation that can operate on a bag of T, for any type T, using the following MRQL command:

aggregation name ( plus, zero [ , unit ] ) : T;

where T is the element type, zero is a value of type S, plus is a binary function of type (S,S)->S, and unit is an optional unary function of type T->S. Then, name is a function from bag(T) to S. The semantics is: name({x1,...,xn})=plus(unit(x1),plus(...,plus(unit(xn),zero))). If unit is omitted, then it's the identity function, which requires S=T. Note that, plus must be an associative and commutative function that satisfies plus(zero,x)=plus(x,zero)=x. That is, plus/zero must be a commutative monoid. MRQL will not make any attempt to prove the monoid properties. If a given aggregation does not satisfy these properties, it will produce unpredictable results. For example:

aggregation aggr(f,0):int;

where f is the function:

function f (x:int,y:int):int { x+y };

Then aggr({1,2,3}) will return 6. As another example, the following query calculates the k-means clustering algorithm (Lloyd's algorithm), by deriving k new centroids from the old:

type point = < X: double, Y: double, Z: double >;

function distance ( x: point, y: point ): double {
   sqrt(pow(x.X-y.X,2)+pow(x.Y-y.Y,2)+pow(x.Z-y.Z,2))
};

function centroid ( p: (point,long), default: point ): point {
   if p#1 = 0
      then default
      else < X: p#0.X/p#1, Y: p#0.Y/p#1, Z: p#0.Z/p#1 >
};

repeat centroids = select < X: random(1000)/100.0 as double,
                            Y: random(1000)/100.0 as double,
                            Z: random(1000)/100.0 as double >
                     from x in 1..k
  step select let nc = centroid( new_centroid(select (p,1 as long) from p in s), closest )
              in ( nc, distance(closest,nc) > 0.1 )
         from s in Points
        group by closest: (select c from c in centroids order by distance(c,s))[0]
  limit 10;

where Points is the input data set (3D points) of type !bag(<X:double,Y:double,Z:double>), centroids is the current set of centroids (k cluster centers), and distance calculates the distance between two points. The query in the group-by assigns the closest centroid to a point s. This query clusters the data points by their closest centroid, and, for each cluster, a new centroid is calculated from the mean values of its points. The new centroid is calculated by averaging over the X, Y, and Z axes using the user-defined aggregation:

aggregation new_centroid (
      \(p:(point,long),q:(point,long)):(point,long)
                .( < X: p#0.X+q#0.X, Y: p#0.Y+q#0.Y, Z: p#0.Z+q#0.Z >,
                   p#1+q#1),
      ( < X: 0.0 as double, Y: 0.0 as double, Z: 0.0 as double >, 0 as long )
    ) : (point,long);

Without this aggregation, we would have to apply the avg aggregation 3 times (once for every dimension), which would require multiple passes over the data set. Using this aggregation, each repeat step is done with one MapReduce job over Points since the k centroid calculated at every step are stored in memory (they are not a persistent bag).

Tracing Query Execution

Any expression e in a query can be traced during query execution using trace(e). This will print the value of the expression every time it is evaluated. Nested trace calls will appear as nested traces.

System Functions

The MRQL System functions

Importing External Java Functions

The external methods that can be imported in MRQL must be `public static' and must be over MRData (objects from the MRQL data model, as defined in the MapReduceData.java file). To import all the java methods of a class, use the MRQL command:

import 'class-path';

To selectively import a list of methods from a class, use:

import m1, ..., mn from 'class-path';

For example, you can define your functions in a file MyFunctions.java in the mrql directory:

   1 package myPackage;
   2 import hadoop.mrql.*;
   3 public class MyFunctions {
   4     public static MR_string substring ( MR_string s, MR_int i ) {
   5         return new MR_string(s.get().substring(i.get()));
   6     }

}

You can compile it using:

javac -cp mrql.jar MyFunctions.java -d classes
make

which creates a new mrql.jar file that includes MyFunctions. Then, you may import the method substring using:

import 'myPackage.MyFunctions';

Note that imported functions must be pure, that is, they should not cause side effects and they should always return the same value for the same input values. If a query calls an imported function that causes side effects, it may produce unpredictable results. The only system function that is not pure is random(n), which generates a random number between 0 and n-1. It is handled specially by MRQL.

Embedded MRQL

You may evaluate MRQL commands (ending with a semicolon) inside a Java program using:

   1 MRQL.evaluate('MRQL commands');

You may evaluate an MRQL query using:

   1 MRQL.query('MRQL query');

which returns MRData (objects from the MRQL data model). Here is a complete example:

   1 package org.apache.mrql;
   2 import org.apache.hadoop.conf.*;
   3 import org.apache.hadoop.util.*;
   4 public class MyClass extends Main {
   5    public int run ( String args[] ) throws Exception {
   6         Config.parse_args(args,conf,false);
   7         ClassImporter.load_classes();
   8         MRQL.evaluate('C = source(line,\'customer.tbl\',\'|\',type( ));');
   9         for ( MRData x: (Bag)MRQL.query('select x.CUSTKEY from x in C') )
  10             System.out.println(x);
  11         MRQL.clean();
  12         return 0;
  13     }
  14     public static void main ( String[] args ) throws Exception {
  15         conf = new Configuration();
  16         GenericOptionsParser gop = new GenericOptionsParser(conf,args);
  17         conf = gop.getConfiguration();
  18         args = gop.getRemainingArgs();
  19         ToolRunner.run(conf,new MyClass(),args);
  20     }
  21 }

User-Defined Parsers

Parsers are used by the source expression for parsing input data and by templates. A user-defined parser must implement the following Java interface:

   1 interface Parser {
   2     public void initialize ( Trees args );
   3     public Tree type ();
   4     public void open ( String file );
   5     public void open ( FSDataInputStream fsin, long start, long end );
   6     public String slice ();
   7     public Bag parse ( String s );
   8 }

and must be defined using the top-level MRQL statement:

parser name = 'class-path';

where name is the parser name and class-path is the Java path that implements the Parser interface. Method initialize initializes the parser using the extra arguments passed to the MRQL source function (these arguments are passed unevaluated as abstract-syntax trees). Method type returns the MRQL type of the data returned by this data format. The two open methods open the input stream; the first one opens a local file while the second opens an HDFS data split (which may start at any point in the file). The most important methods are slice and parse. Method slice breaks the input into fragments (strings) to be processed by parse (the parser). The parser returns a Bag of values in the MRQL data model. The fragmentation process must be able to work on data splits, which may start at arbitrary points in a file. To accommodate templates, the parser must recognize the syntax {{n}}, where n is an integer constant, and simply return a MR_variable(n) data value to be processed by the MRQL compiler to embed MRQL abstract syntax trees.

LanguageDescription (last edited 2014-07-30 13:42:53 by LeonidasFegaras)