Hadoop MapReduce Next Generation - Writing YARN Applications

Purpose

This document describes, at a high-level, the way to implement new Applications for YARN.

Concepts and Flow

The general concept is that an 'Application Submission Client' submits an 'Application' to the YARN Resource Manager. The client communicates with the ResourceManager using the 'ClientRMProtocol' to first acquire a new 'ApplicationId' if needed via ClientRMProtocol#getNewApplication and then submit the 'Application' to be run via ClientRMProtocol#submitApplication. As part of the ClientRMProtocol#submitApplication call, the client needs to provide sufficient information to the ResourceManager to 'launch' the application's first container i.e. the ApplicationMaster. You need to provide information such as the details about the local files/jars that need to be available for your application to run, the actual command that needs to be executed (with the necessary command line arguments), any Unix environment settings (optional), etc. Effectively, you need to describe the Unix process(es) that needs to be launched for your ApplicationMaster.


The YARN ResourceManager will then launch the ApplicationMaster (as specified) on an allocated container. The ApplicationMaster is then expected to communicate with the ResourceManager using the 'AMRMProtocol'. Firstly, the ApplicationMaster needs to register itself with the ResourceManager. To complete the task assigned to it, the ApplicationMaster can then request for and receive containers via AMRMProtocol#allocate. After a container is allocated to it, the ApplicationMaster communicates with the NodeManager using ContainerManager#startContainer to launch the container for its task. As part of launching this container, the ApplicationMaster has to specify the ContainerLaunchContext which, similar to the ApplicationSubmissionContext, has the launch information such as command line specification, environment, etc. Once the task is completed, the ApplicationMaster has to signal the ResourceManager of its completion via the AMRMProtocol#finishApplicationMaster.


Meanwhile, the client can monitor the application's status by querying the ResourceManager or by directly querying the ApplicationMaster if it supports such a service. If needed, it can also kill the application via ClientRMProtocol#forceKillApplication.

Interfaces

The interfaces you'd most like be concerned with are:

Writing a Simple Yarn Application

Writing a simple Client

   1     ClientRMProtocol applicationsManager;
   2     YarnConfiguration yarnConf = new YarnConfiguration(conf);
   3     InetSocketAddress rmAddress =
   4         NetUtils.createSocketAddr(yarnConf.get(
   5             YarnConfiguration.RM_ADDRESS,
   6             YarnConfiguration.DEFAULT_RM_ADDRESS));
   7     LOG.info("Connecting to ResourceManager at " + rmAddress);
   8     configuration appsManagerServerConf = new Configuration(conf);
   9     appsManagerServerConf.setClass(
  10         YarnConfiguration.YARN_SECURITY_INFO,
  11         ClientRMSecurityInfo.class, SecurityInfo.class);
  12     applicationsManager = ((ClientRMProtocol) rpc.getProxy(
  13         ClientRMProtocol.class, rmAddress, appsManagerServerConf));

   1     GetNewApplicationRequest request =
   2         Records.newRecord(GetNewApplicationRequest.class);
   3     GetNewApplicationResponse response =
   4         applicationsManager.getNewApplication(request);
   5     LOG.info("Got new ApplicationId=" + response.getApplicationId());

   1     // Create a new ApplicationSubmissionContext
   2     ApplicationSubmissionContext appContext =
   3         Records.newRecord(ApplicationSubmissionContext.class);
   4     // set the ApplicationId
   5     appContext.setApplicationId(appId);
   6     // set the application name
   7     appContext.setApplicationName(appName);
   8 
   9     // Create a new container launch context for the AM's container
  10     ContainerLaunchContext amContainer =
  11         Records.newRecord(ContainerLaunchContext.class);
  12 
  13     // Define the local resources required
  14     Map<String, LocalResource> localResources =
  15         new HashMap<String, LocalResource>();
  16     // Lets assume the jar we need for our ApplicationMaster is available in
  17     // HDFS at a certain known path to us and we want to make it available to
  18     // the ApplicationMaster in the launched container
  19     Path jarPath; // <- known path to jar file
  20     FileStatus jarStatus = fs.getFileStatus(jarPath);
  21     LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
  22     // Set the type of resource - file or archive
  23     // archives are untarred at the destination by the framework
  24     amJarRsrc.setType(LocalResourceType.FILE);
  25     // Set visibility of the resource
  26     // Setting to most private option i.e. this file will only
  27     // be visible to this instance of the running application
  28     amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
  29     // Set the location of resource to be copied over into the
  30     // working directory
  31     amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
  32     // Set timestamp and length of file so that the framework
  33     // can do basic sanity checks for the local resource
  34     // after it has been copied over to ensure it is the same
  35     // resource the client intended to use with the application
  36     amJarRsrc.setTimestamp(jarStatus.getModificationTime());
  37     amJarRsrc.setSize(jarStatus.getLen());
  38     // The framework will create a symlink called AppMaster.jar in the
  39     // working directory that will be linked back to the actual file.
  40     // The ApplicationMaster, if needs to reference the jar file, would
  41     // need to use the symlink filename.
  42     localResources.put("AppMaster.jar",  amJarRsrc);
  43     // Set the local resources into the launch context
  44     amContainer.setLocalResources(localResources);
  45 
  46     // Set up the environment needed for the launch context
  47     Map<String, String> env = new HashMap<String, String>();
  48     // For example, we could setup the classpath needed.
  49     // Assuming our classes or jars are available as local resources in the
  50     // working directory from which the command will be run, we need to append
  51     // "." to the path.
  52     // By default, all the hadoop specific classpaths will already be available
  53     // in $CLASSPATH, so we should be careful not to overwrite it.
  54     String classPathEnv = "$CLASSPATH:./*:";
  55     env.put("CLASSPATH", classPathEnv);
  56     amContainer.setEnvironment(env);
  57 
  58     // Construct the command to be executed on the launched container
  59     String command =
  60         "${JAVA_HOME}" + /bin/java" +
  61         " MyAppMaster" +
  62         " arg1 arg2 arg3" +
  63         " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
  64         " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
  65 
  66     List<String> commands = new ArrayList<String>();
  67     commands.add(command);
  68     // add additional commands if needed
  69 
  70     // Set the command array into the container spec
  71     amContainer.setCommands(commands);
  72 
  73     // Define the resource requirements for the container
  74     // For now, YARN only supports memory so we set the memory
  75     // requirements.
  76     // If the process takes more than its allocated memory, it will
  77     // be killed by the framework.
  78     // Memory being requested for should be less than max capability
  79     // of the cluster and all asks should be a multiple of the min capability.
  80     Resource capability = Records.newRecord(Resource.class);
  81     capability.setMemory(amMemory);
  82     amContainer.setResource(capability);
  83 
  84     // Set the container launch content into the ApplicationSubmissionContext
  85     appContext.setAMContainerSpec(amContainer);

   1     // Create the request to send to the ApplicationsManager
   2     SubmitApplicationRequest appRequest =
   3         Records.newRecord(SubmitApplicationRequest.class);
   4     appRequest.setApplicationSubmissionContext(appContext);
   5 
   6     // Submit the application to the ApplicationsManager
   7     // Ignore the response as either a valid response object is returned on
   8     // success or an exception thrown to denote the failure
   9     applicationsManager.submitApplication(appRequest);

   1       GetApplicationReportRequest reportRequest =
   2           Records.newRecord(GetApplicationReportRequest.class);
   3       reportRequest.setApplicationId(appId);
   4       GetApplicationReportResponse reportResponse =
   5           applicationsManager.getApplicationReport(reportRequest);
   6       ApplicationReport report = reportResponse.getApplicationReport();

   1     KillApplicationRequest killRequest =
   2         Records.newRecord(KillApplicationRequest.class);
   3     killRequest.setApplicationId(appId);
   4     applicationsManager.forceKillApplication(killRequest);

Writing an ApplicationMaster

   1     Map<String, String> envs = System.getenv();
   2     ApplicationAttemptId appAttemptID =
   3         Records.newRecord(ApplicationAttemptId.class);
   4     if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
   5       // app attempt id should always be set in the env by the framework
   6       throw new IllegalArgumentException(
   7           "ApplicationAttemptId not set in the environment");
   8     }
   9     appAttemptID =
  10         ConverterUtils.toApplicationAttemptId(
  11             envs.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));

   1     // Connect to the Scheduler of the ResourceManager.
   2     YarnConfiguration yarnConf = new YarnConfiguration(conf);
   3     InetSocketAddress rmAddress =
   4         NetUtils.createSocketAddr(yarnConf.get(
   5             YarnConfiguration.RM_SCHEDULER_ADDRESS,
   6             YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
   7     LOG.info("Connecting to ResourceManager at " + rmAddress);
   8     AMRMProtocol resourceManager =
   9         (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
  10 
  11     // Register the AM with the RM
  12     // Set the required info into the registration request:
  13     // ApplicationAttemptId,
  14     // host on which the app master is running
  15     // rpc port on which the app master accepts requests from the client
  16     // tracking url for the client to track app master progress
  17     RegisterApplicationMasterRequest appMasterRequest =
  18         Records.newRecord(RegisterApplicationMasterRequest.class);
  19     appMasterRequest.setApplicationAttemptId(appAttemptID);
  20     appMasterRequest.setHost(appMasterHostname);
  21     appMasterRequest.setRpcPort(appMasterRpcPort);
  22     appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
  23 
  24 
  25     // The registration response is useful as it provides information about the
  26     // cluster.
  27     // Similar to the GetNewApplicationResponse in the client, it provides
  28     // information about the min/mx resource capabilities of the cluster that
  29     // would be needed by the ApplicationMaster when requesting for containers.
  30     RegisterApplicationMasterResponse response =
  31         resourceManager.registerApplicationMaster(appMasterRequest);

   1     // Resource Request
   2     ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
   3 
   4     // setup requirements for hosts
   5     // whether a particular rack/host is needed
   6     // useful for applications that are sensitive
   7     // to data locality
   8     rsrcRequest.setHostName("*");
   9 
  10     // set the priority for the request
  11     Priority pri = Records.newRecord(Priority.class);
  12     pri.setPriority(requestPriority);
  13     rsrcRequest.setPriority(pri);
  14 
  15     // Set up resource type requirements
  16     // For now, only memory is supported so we set memory requirements
  17     Resource capability = Records.newRecord(Resource.class);
  18     capability.setMemory(containerMemory);
  19     rsrcRequest.setCapability(capability);
  20 
  21     // set no. of containers needed
  22     // matching the specifications
  23     rsrcRequest.setNumContainers(numContainers);

   1     List<ResourceRequest> requestedContainers;
   2     List<ContainerId> releasedContainers
   3     AllocateRequest req = Records.newRecord(AllocateRequest.class);
   4 
   5     // The response id set in the request will be sent back in
   6     // the response so that the ApplicationMaster can
   7     // match it to its original ask and act appropriately.
   8     req.setResponseId(rmRequestID);
   9 
  10     // Set ApplicationAttemptId
  11     req.setApplicationAttemptId(appAttemptID);
  12 
  13     // Add the list of containers being asked for
  14     req.addAllAsks(requestedContainers);
  15 
  16     // If the ApplicationMaster has no need for certain
  17     // containers due to over-allocation or for any other
  18     // reason, it can release them back to the ResourceManager
  19     req.addAllReleases(releasedContainers);
  20 
  21     // Assuming the ApplicationMaster can track its progress
  22     req.setProgress(currentProgress);
  23 
  24     AllocateResponse allocateResponse = resourceManager.allocate(req);

   1     // Get AMResponse from AllocateResponse
   2     AMResponse amResp = allocateResponse.getAMResponse();
   3 
   4     // Retrieve list of allocated containers from the response
   5     // and on each allocated container, lets assume we are launching
   6     // the same job.
   7     List<Container> allocatedContainers = amResp.getAllocatedContainers();
   8     for (Container allocatedContainer : allocatedContainers) {
   9       LOG.info("Launching shell command on a new container."
  10           + ", containerId=" + allocatedContainer.getId()
  11           + ", containerNode=" + allocatedContainer.getNodeId().getHost()
  12           + ":" + allocatedContainer.getNodeId().getPort()
  13           + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
  14           + ", containerState" + allocatedContainer.getState()
  15           + ", containerResourceMemory"
  16           + allocatedContainer.getResource().getMemory());
  17 
  18 
  19       // Launch and start the container on a separate thread to keep the main
  20       // thread unblocked as all containers may not be allocated at one go.
  21       LaunchContainerRunnable runnableLaunchContainer =
  22           new LaunchContainerRunnable(allocatedContainer);
  23       Thread launchThread = new Thread(runnableLaunchContainer);
  24       launchThreads.add(launchThread);
  25       launchThread.start();
  26     }
  27 
  28     // Check what the current available resources in the cluster are
  29     Resource availableResources = amResp.getAvailableResources();
  30     // Based on this information, an ApplicationMaster can make appropriate
  31     // decisions
  32 
  33     // Check the completed containers
  34     // Let's assume we are keeping a count of total completed containers,
  35     // containers that failed and ones that completed successfully.
  36     List<ContainerStatus> completedContainers =
  37         amResp.getCompletedContainersStatuses();
  38     for (ContainerStatus containerStatus : completedContainers) {
  39       LOG.info("Got container status for containerID= "
  40           + containerStatus.getContainerId()
  41           + ", state=" + containerStatus.getState()
  42           + ", exitStatus=" + containerStatus.getExitStatus()
  43           + ", diagnostics=" + containerStatus.getDiagnostics());
  44 
  45 
  46       int exitStatus = containerStatus.getExitStatus();
  47       if (0 != exitStatus) {
  48         // container failed
  49         // -100 is a special case where the container
  50         // was aborted/pre-empted for some reason
  51         if (-100 != exitStatus) {
  52           // application job on container returned a non-zero exit code
  53           // counts as completed
  54           numCompletedContainers.incrementAndGet();
  55           numFailedContainers.incrementAndGet();
  56         }
  57         else {
  58           // something else bad happened
  59           // app job did not complete for some reason
  60           // we should re-try as the container was lost for some reason
  61           // decrementing the requested count so that we ask for an
  62           // additional one in the next allocate call.
  63           numRequestedContainers.decrementAndGet();
  64           // we do not need to release the container as that has already
  65           // been done by the ResourceManager/NodeManager.
  66         }
  67       }
  68       else {
  69         // nothing to do
  70         // container completed successfully
  71         numCompletedContainers.incrementAndGet();
  72         numSuccessfulContainers.incrementAndGet();
  73       }
  74     }

   1     //Assuming an allocated Container obtained from AMResponse
   2     Container container;
   3     // Connect to ContainerManager on the allocated container
   4     String cmIpPortStr = container.getNodeId().getHost() + ":"
   5         + container.getNodeId().getPort();
   6     InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
   7     ContainerManager cm =
   8         (ContainerManager)rpc.getProxy(ContainerManager.class, cmAddress, conf);
   9 
  10 
  11     // Now we setup a ContainerLaunchContext
  12     ContainerLaunchContext ctx =
  13         Records.newRecord(ContainerLaunchContext.class);
  14 
  15 
  16     ctx.setContainerId(container.getId());
  17     ctx.setResource(container.getResource());
  18 
  19 
  20     try {
  21       ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
  22     } catch (IOException e) {
  23       LOG.info(
  24           "Getting current user failed when trying to launch the container",
  25           + e.getMessage());
  26     }
  27 
  28 
  29     // Set the environment
  30     Map<String, String> unixEnv;
  31     // Setup the required env.
  32     // Please note that the launched container does not inherit
  33     // the environment of the ApplicationMaster so all the
  34     // necessary environment settings will need to be re-setup
  35     // for this allocated container.
  36     ctx.setEnvironment(unixEnv);
  37 
  38 
  39     // Set the local resources
  40     Map<String, LocalResource> localResources =
  41         new HashMap<String, LocalResource>();
  42     // Again, the local resources from the ApplicationMaster is not copied over
  43     // by default to the allocated container. Thus, it is the responsibility
  44           // of the ApplicationMaster to setup all the necessary local resources
  45           // needed by the job that will be executed on the allocated container.
  46 
  47     // Assume that we are executing a shell script on the allocated container
  48     // and the shell script's location in the filesystem is known to us.
  49     Path shellScriptPath;
  50     LocalResource shellRsrc = Records.newRecord(LocalResource.class);
  51     shellRsrc.setType(LocalResourceType.FILE);
  52     shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
  53     shellRsrc.setResource(
  54         ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
  55     shellRsrc.setTimestamp(shellScriptPathTimestamp);
  56     shellRsrc.setSize(shellScriptPathLen);
  57     localResources.put("MyExecShell.sh", shellRsrc);
  58 
  59 
  60     ctx.setLocalResources(localResources);
  61 
  62 
  63     // Set the necessary command to execute on the allocated container
  64     String command = "/bin/sh ./MyExecShell.sh"
  65         + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
  66         + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
  67 
  68 
  69     List<String> commands = new ArrayList<String>();
  70     commands.add(command);
  71     ctx.setCommands(commands);
  72 
  73 
  74     // Send the start request to the ContainerManager
  75     StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
  76     startReq.setContainerLaunchContext(ctx);
  77     cm.startContainer(startReq);

   1     GetContainerStatusRequest statusReq =
   2         Records.newRecord(GetContainerStatusRequest.class);
   3     statusReq.setContainerId(container.getId());
   4     GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
   5     LOG.info("Container Status"
   6         + ", id=" + container.getId()
   7         + ", status=" + statusResp.getStatus());

FAQ

How can I distribute my application's jars to all of the nodes in the YARN cluster that need it?

   1     File packageFile = new File(packagePath);
   2     Url packageUrl = ConverterUtils.getYarnUrlFromPath(
   3         FileContext.getFileContext.makeQualified(new Path(packagePath)));
   4 
   5     packageResource.setResource(packageUrl);
   6     packageResource.setSize(packageFile.length());
   7     packageResource.setTimestamp(packageFile.lastModified());
   8     packageResource.setType(LocalResourceType.ARCHIVE);
   9     packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
  10 
  11 
  12     resource.setMemory(memory)
  13     containerCtx.setResource(resource)
  14     containerCtx.setCommands(ImmutableList.of(
  15         "java -cp './package/*' some.class.to.Run "
  16         + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
  17         + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"))
  18     containerCtx.setLocalResources(
  19         Collections.singletonMap("package", packageResource))
  20     appCtx.setApplicationId(appId)
  21     appCtx.setUser(user.getShortUserName)
  22     appCtx.setAMContainerSpec(containerCtx)
  23     request.setApplicationSubmissionContext(appCtx)
  24     applicationsManager.submitApplication(request)

How do I get the ApplicationMaster's ApplicationAttemptId?

My container is being killed by the Node Manager

How can my ApplicationMaster kill a container? Releasing it via AMRMProtocol#allocate does not seem to work.

Useful Links

WritingYarnApps (last edited 2011-11-03 23:34:40 by HiteshShah)