Pig Abstraction Layer

Introduction and Rational

Many of the activities that Pig carries out during the compilation and execution stages of Pig Latin queries are, currently, deeply tied to the Hadoop file system and Hadoop Map-Reduce paradigm.

For instance, file management tasks, job submission and job tracking in the Pig client explicitly assume the availability of a Hadoop cluster to which the client connects.

It is possible, however, to envision an architecture where the front-end part of the system (i.e. Pig client) may have a more abstract notion of the back-end portion. In this context, a Hadoop cluster could be regarded as a particular instance amongst a family of different back-ends, all of which provide similar functionalities that can be accessed via the same API.

The main motivations behind this proposal can be summarized as follows:

- The availability of well-defined APIs that a back-end needs to support in order to run Pig Latin queries can facilitate porting such APIs to different platforms. Hence, this could foster wider adoption of Pig.

- Changes in various back-ends can be encapsulated within the actual implementation of the generic APIs. Hence, fewer modifications to the front-end code-base will result in a more stable code-base.

A proper API design should be general enough to easily support various back-ends that are currently supported by Pig like: Hadoop, Galago (see section below) and the local back-end (i.e. the local file system and the local execution type.)

Galago is a research project started by Trevor Strohman at the University of Massachusetts, Amherst. Galago is a search-engine with its own execution back-end.

Galago is able to execute Pig Latin queries by translating them into its own representation language (TupleFlow jobs.)

API Specification

The basic functionalities that a back-end may need to export to the Pig client could be categorized into two main abstractions:

- Data Storage: provides functionalities that pertain to storing and retrieving data. It encapsulates the typical operations supported by file systems like creating, opening (for reading or writing) a data object.

- Query Execution Engine: provides functionalities to parse a Pig Latin program and submit a compiled Pig job to a back-end. This API should enable the front-end to track the current status of a job, its progress, diagnostic information and possibly to terminate it.

The sections below provide some initial suggestions for possible APIs for the Data Storage and Query Execution abstractions.

Back-End Configuration

This interface abstracts functionalities for management of configuration information for both the Data Storage and Query Execution portions of a back-end.

package org.apache.pig.backend;

import java.io.Serializable;
import java.util.Map;
import java.net.URI;

 /** Abstraction for a generic property object that can be
  * used to specify configuration information, stats...
  * Information is represented in the form of (key, value)
  * pairs.
  */
public interface PigBackEndProperties extends Serializable, 
                                              Iterable<String> {
        /**
         * Introduces a new (key, value) pair or updates one already 
         * associated to key.
         * 
         * @param key - the key to insert/update
         * @param value -the value for the given key
         * @return - the value of the old key, if it exists, null otherwise
         */
        public Object setValue(String key, Object value);
                
        /**
         * Given a resource, update configuration information. 
         * 
         * @param resource from which property values come from.
         * @return the set of keys and relative values that has been updated. 
         *         If resource contains/updates the same key multiple 
         *         times, only the initial value of key is returned.
         */
        public Map<String, Object> addFromResource(URI resource);
                
        /**
         * Creates or Updates (key,value) pairs with information 
         * from other
         * 
         * @param other - source of properties
         * @return - keys that have been updated, if any, and the        
         *           corresponding old values
         */
        public Map <String, Object> merge(PigBackEndProperties other);
                
        /**
         * Removes (key, value) pair if present
         * @param key - key to remove
         * @return - value of key, if key was present, null otherwise
         */
        public Object delete(String key);
                
        /**
         * Returns value of a key
         * @param key
         * @return value of key if present, null otherwise.
         */
        public Object getValue(String key);
        
        /**
         * @return number of (key, value) pairs stored
         */
        public long getCount();
}

Data Storage

This is a possible API for a generic interface that abstracts on the actual details used to store/persist collections of objects.

package org.apache.pig.datastorage;

import org.apache.pig.backend.PigBackEndProperties;

import java.io.Serializable;
import java.util.Map;
import java.net.URI;

/**
 * Abstraction for a generic property object that can be
 * used to specify configuration information, stats...
public interface DataStorageProperties extends PigBackEndProperties {
   ... 
}

package org.apache.pig.datastorage;

public interface DataStorage {
        
        /**
         * Place holder for possible initialization activities.
         */
        public void init();

        /**
         * Clean-up and releasing of resources.
         */
        public void close();
        
        /**
         * Provides configuration information about the storage itself.
         * For instance global data-replication policies if any, default
         * values, ... Some of such values could be overridden at a finer 
         * granularity (e.g. on a specific object in the Data Storage)
         * 
         * @return - configuration information
         */
        public DataStorageProperties getConfiguration();
        
        /**
         * Provides a way to change configuration parameters
         * at the Data Storage level. For instance, change the 
         * data replication policy.
         * 
         * @param newConfiguration - the new configuration settings
         * @throws when configuration conflicts are detected
         * 
         */
        public void updateConfiguration(DataStorageProperties 
                                      newConfiguration) 
             throws DataStorageConfigurationException;
        
        /**
         * Provides statistics on the Storage: capacity values, how much 
         * storage is in use...
         * @return statistics on the Data Storage
         */
        public DataStorageProperties getStatistics();
                
        /**
         * Creates an entity handle for an object (no containment
         * relation)
         *
         * @param name of the object
         * @return an object descriptor
         * @throws DataStorageException if name does not conform to naming 
         *         convention enforced by the Data Storage.
         */
        public DataStorageElementDescriptor asElement(String name) 
             throws DataStorageException;
        
        /**
         * Created an entity handle for a container.
         * 
         * @param name of the container
         * @return a container description
         * @throws DataStorageException if name does not conform to naming 
         *         convention enforced by the Data Storage.
         */
        public DataStorageContainerDescriptor asContainer(String name) 
             throws DataStorageException;

}

Data Storage Descriptors

Descriptors are a representation of entities in the Data Storage and are used to access and carry out operations on such entities. There are Element Descriptors and Container Descriptors. The latter are descriptors for entities that contain Data Storage Element Descriptors.

package org.apache.pig.datastorage;


public interface DataStorageElementDescriptor extends Comparable {
        /**
         * Opens a stream onto which an entity can be written to.
         * 
         * @param configuration information at the object level
         * @return stream where to write
         * @throws DataStorageException
         */
        public DataStorageOutputStream create(
                     DataStorageProperties configuration) 
             throws DataStorageException;

        /**
         * Copy entity from an existing one, possibly residing in a 
         * different Data Storage.
         * 
         * @param dstName name of entity to create
         * @param dstConfiguration configuration for the new entity
         * @param removeSrc if src entity needs to be removed after copying it
         * @throws DataStorageException for instance, configuration 
         *         information for new entity is not compatible with 
         *         configuration information at the Data
         *         Storage level, user does not have privileges to read from
         *         source entity or write to destination storage...
         */
        public void copy(DataStorageElementDescriptor dstName,
                       DataStorageProperties dstConfiguration,
                       boolean removeSrc) 
             throws DataStorageException;
        
        /**
         * Open for read a given entity
         * 
         * @return entity to read from
         * @throws DataStorageExecption e.g. entity does not exist...
         */
        public DataStorageInputStream open() throws DataStorageException;

        /**
         * Open an element in the Data Storage with support for random access 
         * (seek operations).
         * 
         * @return a seekable input stream
         * @throws DataStorageException
         */
        public DataStorageSeekableInputStream sopen() 
             throws DataStorageException;
        
        /**
         * Checks whether the entity exists or not
         * 
         * @param name of entity
         * @return true if entity exists, false otherwise.
         */
        public boolean exists();
        
        /**
         * Changes the name of an entity in the Data Storage
         * 
         * @param newName new name of entity 
         * @throws DataStorageException 
         */
        public void rename(DataStorageElementDescriptor newName) 
             throws DataStorageException;

        /**
         * Remove entity from the Data Storage.
         * 
         * @throws DataStorageException
         */
        public void delete() throws DataStorageException;

        /**
         * Retrieve configuration information for entity
         * @return configuration
         */
        public DataStorageProperties getConfiguration();

        /**
         * Update configuration information for this entity
         *
         * @param newConfig configuration
         * @throws DataStorageException
         */
        public void updateConfiguration(DataStorageProperties newConfig) 
             throws DataStorageException;
        
        /**
         * List entity statistics
         * @return DataStorageProperties
         */
        public DataStorageProperties getStatistics();
}

package org.apache.pig.datastorage;

import org.apache.pig.datastorage.DataStorageElementDescriptor;

public interface DataStorageContainerDescriptor 
                 extends DataStorageElementDescriptor, 
                 Iterable<DataStorageElementDescriptor> {
}

Data-Storage Use-Cases

The links below contain code fragments that make use the propoesed APIs.

Execution Engine

package org.apache.pig.executionengine;

import java.util.Collection;

/**
 * This is the main interface that various execution engines
 * need to support and it is also the main interface that Pig
 * will need to use to submit jobs for execution, retrieve information
 * about their progress and possibly terminate them.
 *
 */

public interface ExecutionEngine {

        /**
         * Place holder for possible initialization activities.
         */
        public void init();

        /**
         * Clean-up and releasing of resources.
         */
        public void close();

        
        /**
         * Provides configuration information about the execution engine itself.
         * 
         * @return - information about the configuration used to connect to execution engine
         */
        public ExecutionEngineProperties getConfiguration();
        
        /**
         * Provides a way to dynamically change configuration parameters
         * at the Execution Engine level.
         * 
         * @param newConfiguration - the new configuration settings
         * @throws when configuration conflicts are detected
         * 
         */
        public void updateConfiguration(ExecutionEngineProperties newConfiguration) 
                      throws ExecutionEngineException;
        
        /**
         * Provides statistics on the Execution Engine: number of nodes,
         * node failure rates, average load, average job wait time...
         * @return ExecutionEngineProperties
         */
        public ExecutionEngineProperties getStatistics();

        /**
         * Compiles a logical plan into a physical plan, given a set of configuration
         * properties that apply at the plan-level. For instance desired degree of 
         * parallelism for this plan, which could be different from the "default"
         * one set at the execution engine level.
         * 
         * @param logical plan
         * @param properties
         * @return physical plan
         */
        public ExecutionEnginePhysicalPlan compile(ExecutionEngineLogicalPlan plan,
                                                     ExecutionEngineProperties properties);
        
        /**
         * This may be useful to support admin functionalities.
         * 
         * @return a collection of jobs "known" to the execution engine,
         * say jobs currently queued up or running (this can be determined 
         * by the obtaining the properties of the job)
         * 
         * @throws ExecutionEngineException maybe the user does not have privileges
         * to obtain this information...
         */
        public Collection<ExecutionEnginePhysicalPlan> allPhysicalPlans () throws
            ExecutionEngineException;
}

Execution Engine Physical Plan

Interface to manage a Physical Plan.

package org.apache.pig.executionengine;

public interface ExecutionEnginePhysicalPlan {

        /**
         * Execute the physical plan.
         * This is non-blocking. See getStatistics to pull information
         * about the job.
         * 
         * @throws
         */
        public void execute() throws ExecutionEngineException;

        /**
         * A job may have properties, like a priority, degree of parallelism...
         * Some of such properties may be inherited from the ExecutionEngine
         * configuration, other may have been set specifically for this job.
         * For instance, a job scheduler may attribute low priority to
         * jobs automatically started for maintenance purpose.
         * 
         * @return set of properties
         */
        public ExecutionEngineProperties getConfiguration();
        
        /**
         * Some properties of the job may be changed, say the priority...
         * 
         * @param configuration
         * @throws some changes may not be allowed, for instance the some
         * job-level properties cannot override Execution-Engine-level properties
         * or maybe some properties can only be changes only in certain states of the
         * job, say, once the job is started, parallelism level may not be changed...
         */
        public void updateConfiguration(ExecutionEngineProperties configuration)
                throws ExecutionEngineException;
        
        /**
         * Hook to provide asynchronous notifications.
         * 
         */
        public void notify(ExecutionEnginerNotificationEvent event);
        
        /**
         * Kills current job.
         * 
         * @throws ExecutionEngineException
         */
        public void kill() throws ExecutionEngineException;
        
        /**
         * Can be information about the state (not submitted, e.g. the execute method
         * has not been called yet; not running, e.g. execute has been issued, 
         * but job is waiting; running...; completed; aborted...; progress information
         * 
         * @return
         */
        public ExecutionEngineProperties getStatistics();
}

PigAbstractionLayer (last edited 2010-04-27 08:05:52 by newacct)