Differences between revisions 1 and 2
Revision 1 as of 2007-11-20 23:18:16
Size: 3593
Comment:
Revision 2 as of 2009-09-20 23:38:08
Size: 3593
Editor: localhost
Comment: converted to 1.6 markup
No differences found!

Data-Statorage Hadoop Back-End

These are code fragments where new classes implement the Data Storage API on top of the Hadoop file system.

package org.apache.pig.hadoop.fs;

import org.apache.pig.datastorage.*;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
import java.io.IOException;

public class HadoopFileSystem implements DataStorage {

        protected FileSystem hfs;  
        protected HadoopDataStorageConfiguration conf;
        
        public HadoopFileSystem(URI uri, HadoopDataStorageConfiguration configuration) throws IOException {
                conf = configuration;
                hfs = FileSystem.get(uri, configuration.getHadoopConf());
        }
        
        public void init() {
         ...
        }

}

package org.apache.pig.hadoop.fs;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.datastorage.*;

public abstract class HadoopPath implements DataStorageElementDescriptor {

        protected Path path;
        protected FileSystem fs;

        public int compareTo(Object obj) {
                return path.compareTo(((HadoopDirectory) obj).path);
        }

        public boolean exists() {
                try {
                        return fs.exists(path);
                }
                catch (IOException e) {
                        return false;
                }
        }
      
      ...
}

package org.apache.pig.hadoop.fs;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;

import org.apache.pig.datastorage.*;


public class HadoopFile extends HadoopPath
                        implements DataStorageElementDescriptor {

        public HadoopFile(FileSystem fs, String parent, String child) {
                this.fs = fs;
                path = new Path(parent, child);
        }
        
        public HadoopFile(FileSystem fs, HadoopDirectory parent, String child) {
                this.fs = fs;
                path = new Path(parent.toString(), child);
        }
        
        public HadoopFile(FileSystem fs, String pathString) {
                this.fs = fs;
                path = new Path(pathString);
        }
        
        public DataStorageProperties getConfiguration() {
                // TODO: file specific conf goes here
                return null;
        }
        
        public DataStorageProperties getStatistics() {
                // TODO: file specific stats go here
                return null;
        }

      ...
}

package org.apache.pig.hadoop.fs;

import java.util.Iterator;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;

import org.apache.pig.datastorage.*;

public class HadoopDirectory extends HadoopPath
                             implements DataStorageContainerDescriptor {
        
        public HadoopDirectory(FileSystem fs, String parent, String child) {
                this.fs = fs;
                path = new Path(parent, child);
        }
        
        public HadoopDirectory(FileSystem fs, HadoopDirectory parent, String child) {
                this.fs = fs;
                path = new Path(parent.toString(), child);
        }
        
        public HadoopDirectory(FileSystem fs, String pathString) {
                this.fs = fs;
                path = new Path(pathString);
        }


        public Iterator<DataStorageElementDescriptor> iterator() {
                Path[] allPaths;
                
                try {
                        allPaths = fs.listPaths(path);
                }
                catch (IOException e) {
                        allPaths = new Path[ 0 ];
                }
                
                Vector<DataStorageElementDescriptor> descriptors = 
                        new Vector<DataStorageElementDescriptor>();
                
                for (int i = 0; i < allPaths.length; ++i) {
                        if (fs.isFile(allPaths[ i ])) {
                                descriptors.add(new HadoopFile(fs,allPaths[ i ].toString()));
                        }
                        else {
                                descriptors.add(new HadoopDirectory(fs,allPaths[ i ].
                             toString()));                              
                        }
                                
                }

                return descriptors.iterator();
        }
                                
      ...
}

PigAbstractionHadoopBackEnd (last edited 2009-09-20 23:38:08 by localhost)