Data-Storage Based Pig Front-End

This is a sample code fragment where PigContext.java has been adapted to use the Data Storage API defined above.

-
-    //  configuration for connecting to hadoop
-    transient private JobConf conf = null;        
     
@@ -79,16 +81,21 @@
+    // configuration information for file system(s)
+    transient private DataStorageProperties fileSystemConf;
+    
     //main file system that jobs and shell commands access
-    transient private FileSystem dfs;                         
+    transient private DataStorage dfs;     

@@ -195,21 +199,32 @@
                                }
                        }
                
-                    
-                   lfs = FileSystem.getNamed("local", conf);
-              
-                   mLogger.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
-                   dfs = FileSystem.get(conf);
+                   HadoopDataStorageConfiguration conf = 
                           new HadoopDataStorageConfiguration();             
+                   fileSystemConf = conf;
+                       
+                   lfs = new HadoopFileSystem(new URI(...), conf);
+                   
+                   mLogger.info("Connecting to hadoop file system at: " + 
                                 fileSystemConf.getValue("fs.default.name"));
+                   
+                   dfs = new HadoopFileSystem (new URI(...), conf);
+                       
+                   mLogger.info("Connecting to map-reduce job tracker at: " + 
                                  conf.getValue("mapred.job.tracker"));
+
-                   mLogger.info("Connecting to map-reduce job tracker at: " + 
                                  conf.get("mapred.job.tracker"));
-                   jobTracker = (JobSubmissionProtocol) 
                                  RPC.getProxy(JobSubmissionProtocol.class,
-                                              JobSubmissionProtocol.versionID, 
                                               JobTracker.getAddress(conf), 
                                               conf);
-                   jobClient = new JobClient(conf);
+                   HadoopExecutionEngineConfiguration execConf = 
                             new HadoopExecutionEngineConfiguration();
+                   backEndConf = execConf;
+                   jobClient = new HadoopExecutionEngine(execConf);
                
           }else{
-               conf = new JobConf();
-               lfs = FileSystem.getNamed("local", conf);
-               dfs = lfs;  // for local execution, the "dfs" is the local file system
+               HadoopDataStorageConfiguration conf = new HadoopDataStorageConfiguration();             
+               fileSystemConf = conf;
+               
+               lfs = new HadoopFileSystem(new URI(...), 
                                           new HadoopDataStorageConfiguration(conf));
+               
+               dfs = lfs;  // for local execution, the "dfs" is the local file system
           }
        }catch (IOException e){

These are sample code fragments from PigServer.java. Operations that previously utilized the Hadoop file system directly, now have been adapted to use the Data Storage API defined above.

@@ -485,37 +565,99 @@
      * @return
      * @throws IOException
      */
-    public long fileSize(String filename) throws IOException {
-        FileSystem dfs = pigContext.getDfs();
-        Path p = new Path(filename);
-        long len = dfs.getLength(p);
-        long replication = dfs.getDefaultReplication(); 
-        return len * replication;
+    public long fileSize(String name) throws IOException {
+       try {
+               DataStorage dfs = pigContext.getDfs();
+               DataStorageElementDescriptor elem = dfs.asElement(name);
+               DataStorageProperties elemStats = elem.getStatistics();
+               
+               long len = new Long(elemStats.
                                    getValue("length.bytes").toString());
+               long replication = new Long(elemStats.
                                       getValue("replication").toString());
+               
+               return len * replication;
+       }
+       catch (DataStorageException e)
+       {
+               return 0;
+       }
     }

-    public boolean deleteFile(String filename) throws IOException {
-        return pigContext.getDfs().delete(new Path(filename));
+    public boolean deleteFile(String name) throws IOException {
+       try {
+               DataStorage ds = pigContext.getDfs();
+               DataStorageElementDescriptor elem = ds.asElement(name);
+               
+               return elem.delete();
+           }
+       catch (DataStorageException e)
+       {
+               throw new IOException(e);
+       }
     }

PigAbstractionFrontEnd (last edited 2009-09-20 23:38:07 by localhost)