Status

Current state: Partially Accepted. Superseded by SEP-13.

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201704.mbox/%3CCAFvExu0UptjFJnHtbbVsHteM3gfsZKHMPq%2BVrLjWsPmmjPunzw%40mail.gmail.com%3E

JIRA: SAMZA-1130

Released: 

Problem

With introduction of fluent API (SAMZA-1073) and standalone Samza (SAMZA-1063), we have the following new scenarios:

  1. A Samza application may include multiple stages in a user-defined DAG, each corresponding to a separate Samza job (SAMZA-1041)
  2. Samza jobs may be user programs that are executed in standalone mode, which introduces yet another way of starting the JVM process for a Samza job (e.g. we already have ThreadJobFactory, ProcessJobFactory, and YarnJobFactory to start a Samza job).

Motivation

We want to have a simplified solution to run the new Samza application in various execution environments. The solution should allow user to run a Samza application written in either fluent API or task-level API, and in local standalone environment, or a cluster-executor (e.g. YARN). In addition, the solution should also support easy local tests by providing an abstract layer to offer file/Java collection based input/output streams and Key-Value stores without the need to change user code.

Proposed Changes

In the proposed change, we introduce a new programming API layer, ApplicationRunner, between the user program and the physical execution engine to achieve the following goals:

  • Hides away the actual Job deployment to various different physical environment from the user
  • Provide a swappable RuntimeEnvironment within ApplicationRunner to allow changing the implementation of input/output and stores without change the user code

Overview of ApplicationRunner

An overview of ApplicationRunner is shown below:

 

 ExecutionPlanner - the planner will generate the plan for execution, including all the job configs, intermediate streams to create and a JSON representation of the JobGraph from the user-defined StreamGraph. A JobGraph includes JobNodes and StreamEdges, in which each JobNode represents a physical Samza job to run and each StreamEdge represents an intermediate stream between jobs.

 

  • StreamManager: StreamManager is responsible for creating intermediate streams before deploying the actual job(s).

  • JobRunner - the runner to deploy the jobs in the JobGraph. It will launch a physical job for each JobNode in the JobGraph.

Based on different execution environment, the JobRunner behavior is different:

    • LocalJobRunner will launch a StreamProcessor corresponding to each JobNode within the local JVM. In this mode the LocalJobRunner will run on every host that the application is deployed to. It is also used to launch jobs in test environment as well.

    • RemoteJobRunner will submit a Samza job for remote execution for each corresponding JobNode. It is for clusters like Yarn.

ApplicationRunners in Different Execution Environments

Based on the different JobRunners and runtime environment used to execute JobGraph, we categorize the ApplicationRunner accordingly:

  • LocalApplicationRunner: deploy and launch jobs in local JVM process. This is the public programming API used in user-defined application main() method.

  • RemoteApplicationRunner: submit the Samza job(s) to a remote cluster executor (like YARN). Not a public programming API. Only used and implemented by service providers to deploy Samza jobs to a remote cluster.

  • TestApplicationRunner: uses the same LocalJobRunner to launch jobs locally, with a test runtime environment providing file/collection based input / outputs. Non-production public programming API.


LocalApplicationRunner

When the LocalApplicationRunner executes the JobGraph, each JobNode is executed by a LocalJobRunner. LocalJobRunner will start and stop a StreamProcessor by invoking the StreamProcessor.start() and StreamProcessor.stop() functions. LocalApplicationRunner.run() is a blocking call, so it can be used to run in the main thread (see Table 1 below).

 

/**
* Example code to implement window-based counter with a re-partition stage
*/
public class PageViewCounterExample implements StreamApplication {

 @Override public void init(StreamGraph graph, Config config) {
   MessageStream<PageViewEvent> pageViewEvents = graph.createInputStream(“myinput”);
   MessageStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutputStream(“myoutput”);

   pageViewEvents.
       partitionBy(m -> m.getMessage().memberId).
       window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
           setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
           setAccumulationMode(AccumulationMode.DISCARDING)).
       map(MyStreamOutput::new).
       sendTo(pageViewPerMemberCounters);

 }

 public static void main(String[] args) throws Exception {
   CommandLine cmdLine = new CommandLine();
   Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
   ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
   localRunner.run(new PageViewCounterExample());
 }
}

Table 1: Repartition counter example using new fluent API

RemoteApplicationRunner

When the RemoteApplicationRunner executes the JobGraph, each JobNode is executed as a remote Job (for now we only support single-node JobGraph, so only one remote job will be created). The Jobs will be submitted through the RemoteJobRunner. RemoteJobRunner is the runner for a single remote Job. RemoteJobRunner is cluster agnostic, meaning it should be able to submit jobs to different types of clusters based on configs (right now we only support Yarn). Samza provides a ApplicationRunnerMain class to use RemoteApplicationRunner, as shown below:

public static void main(String[] args) throws Exception {
  ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerCommandLine();
  OptionSet options = cmdLine.parser().parse(args);
  Config orgConfig = cmdLine.loadConfig(options);
  Config config = Util.rewriteConfig(orgConfig);
  ApplicationRunnerOperation op = cmdLine.getOperation(options);
  AppConfig appConfig = new AppConfig(config);

  if (appConfig.getAppClass() != null) {
    RemoteApplicationRunner runner = new RemoteApplicationRunner(config);
    StreamApplication app =
        (StreamApplication) Class.forName(appConfig.getAppClass()).newInstance();
    switch (op) {
      case RUN:
        runner.start(app);
        break;
      case KILL:
        runner.stop(app);
        break;
      case STATUS:
        System.out.println(runner.status(app));
        break;
      default:
        throw new IllegalArgumentException("Unrecognized operation: " + op);
    }
  } else {
    JobRunner$.MODULE$.main(args);
  }
}

Table 2: Samza ApplicationRunnerMain to use RemoteApplicationRunner starting user-application

TestApplicationRunner

TestApplicationRunner executes JobGraph using LocalJobRunner as well, except that it uses a TestRuntimeEnvironment internally to enable input/output streams using files and Java collections. Note that TestApplicationRunner is blocking similar to LocalApplicationRunner. For bounded input streams (data collections or files), the system consumers will emit end-of-stream message in the end, and the runner will stop afterwards, which allows user to validate processing results. User test code example is the following:

public class TestPageViewCountExample {
 private PageViewCounterExample userApp = new PageViewCounterExample();

 Queue<Entry<String, PageViewCounterExample.PageViewEvent>> inputMessages = new LinkedBlockingQueue<>();
 Queue<Entry<String, PageViewCounterExample.PageViewCount>> outgoingMessages = new LinkedBlockingQueue<>();

 TestApplicationRunner testRunner;

 @Before
 public void setup() {
   // preparation for config, input, output, and local store
   Config config = mock(Config.class);
   testRunner = new TestApplicationRunner(new AppConfig(config));
   testRunner.addStream("pageViewEventStream", inputMessages);
   testRunner.addStream("pageViewEventPerMemberStream", outgoingMessages);

   inputMessages.offer(new Entry<>("my-page-id", new PageViewCounterExample.PageViewEvent("my-page-id", "my-member-id", 1234556L)));
 }

 @After
 public void shutdown() {
 }

 @Test
 public void test() throws InterruptedException {
   // run the user app with bounded input stream
   this.testRunner.run(userApp);
 
   // validate the test results
   Entry<String, PageViewCounterExample.PageViewCount> countEvent = outgoingMessages.poll();
   String wndKey = getWindowKey("my-member-id", 1234556L);
   assertTrue(countEvent.getKey().equals(wndKey));
   assertTrue(countEvent.getValue().count == 1);
 }

 private String getWindowKey(String s, long timestamp) {
   return String.format("%s-%d", s, (timestamp / 10000) * 10000);
 }
}

Table 3: User test code using TestApplicationRunner

 

Interaction between RuntimeEnvironment and ApplicationRunners:

LocalApplicationRunner:

The figure below shows the interaction between LocalApplicationRunner and the LocalRuntimeEnvironment. Note that the methods in LocalRuntimeEnvionment are high-lighted with RED font. Interaction between TestApplicationRunner and TestRuntimeEnvironment is exactly the same as the local ones, except that TestApplicationRunner / TestRuntimeEnvironment supports additional methods to add file/Java collection based streams and stores.

RemoteApplicationRunner:

The figure below shows the interaction between RemoteApplicationRunner and the RemoteRuntimeEnvironment. Note that the methods in RemoteRuntimeEnvionment are high-lighted with RED font.

Public Interfaces

LocalApplicationRunner

/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface LocalApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void run(StreamApplication app);
}


TestApplicationRunner

/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
 * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
 */
@InterfaceStability.Unstable
public interface TestApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void run(StreamApplication app);
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param streamId  the logic identifier for a stream
     * @param messages  the queue of messages in the stream
	 */
	<K, V> void addStream(String streamId, Queue<Entry<K, V>> messages);

	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param storeId  the logic identifier for a KV-store
     * @param storeEntries  the map contains all KV-store entries
	 */
	<K, V> void addStore(String storeId, Map<K, V> storeEntries);

}

Non-public API and implementation classes

RemoteApplicationRunner

/**
 * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.application.StreamApplication}
 *
 */
@InterfaceStability.Unstable
public interface RemoteApplicationRunner {
	/**
	 * Method to be invoked to deploy and run the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void start(StreamApplication app);

	/**
	 * Method to be invoked to stop the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void stop(StreamApplication app);

	/**
	 * Method to be invoked to get the status of the actual Samza application
	 * @param app  the user-defined {@link StreamApplication} object
	 */
	void status(StreamApplication app);

}


RuntimeEnvironment

/**
 * Interface to be implemented by any environment that supports physical input/output streams and local stores
 *
 */
public interface RuntimeEnvironment {

  /**
   * Method to get the physical {@link org.apache.samza.system.StreamSpec} to describe the stream with ID {@code streamId}
   */
  StreamSpec getStreamSpec(String streamId);

  /**
   * Method to get the physical {@link org.apache.samza.system.SystemFactory} with the name {@code system}
   */
  SystemFactory getSystemFactory(String system);

  /**
   * Method to get the physical {@link org.apache.samza.storage.StorageEngineFactory} for {@link org.apache.samza.storage.kv.KeyValueStore} 
   * with the name {@code storeId}
   */
  StorageEngineFactory getStorageEngineFactory(String storeId);

}

 

TestRuntimeEnvironment (implementation class)

 

public class TestRuntimeEnvironment implements RuntimeEnvironment {
  public StreamSpec getStreamSpec(String streamId) {...}
  public SystemFactory getSystemFactory(String system) {...}
  public StorageEngineFactory getStorageEngineFactory(String storeId) {...}
 
  /**
   * Additional methods to add Java collection based input/output streams
   */
  <K,V> public void addStream(String streamId, Queue<Entry<K, V>> messages) {...}
 
  /**
   * Additional methods to add Java collection based KV-store
   */
  <K,V> public void addStore(String storeId, Map<K, V> storeEntries) {...}
 
}

Implementation and Test Plan

LocalApplicationRunner:

The following pseudocode shows the run method of LocalApplicationRunner:

LocalApplicationRunner.run() {
  
  StreamGraph streamGraph = createStreamGraph();
  ExecutionPlan plan = executionPlanner.plan(streamGraph);

 If (plan has intermediate streams) {  
     streamManager.createStreams();
 }
  List<LocalJobRunner> jobs;
  plan.getJobConfigs().forEach() { jobConfig ->
     LocalJobRunner jobRunner = new LocalJobRunner();
     jobRunner.start(jobConfig);
     jobs.add(jobRunner);
  }


  jobs.forEach(job -> {
     wait for job complete;
  })
}


How this works in standalone deployment

  • Configure app.class = <user-app-name>

  • Deploy the application to standalone hosts.

  • Run run-local-app.sh on each node to start/stop the local application.

RemoteApplicationRunner:

The following pseudocode shows the start of RemoteApplicationRunner:

RemoteApplicationRunner.start() {
  StreamGraph streamGraph = createStreamGraph();
  ExecutionPlan plan = executionPlanner.plan(streamGraph);

  if (plan has intermediate streams) {  
     streamManager.createStreams();
  }

  plan.getJobConfigs().forEach() { jobConfig ->
     RemoteJobRunner jobRunner = new RemoteJobRunner();
     jobRunner.start(jobConfig);
  }
}

RemoteApplicationRunner.stop() is similar by replacing JobRunner.run() with JobRunner.stop()

How this works in YARN deployment

  • Configure app.class = <user-app-name>

  • Deploy the application to single deployment gateway (RM in YARN).

  • Run run-app.sh on the deployment gateway to start the whole application. Run stop-app.sh on the same node to stop it.

    • RemoteApplicationRunner will automatically configure task.execute = run-local-app.sh for each job to be deployed
  • When the cluster executor actually launches the Samza container processes, it runs run-local-app.sh on the target host, exactly like in the standalone deployment.

Note: that the last step to start the JVM process in YARN is exactly the same as the last step in the standalone environment. Hence, it provides a unified local runtime process between YARN and standalone environments. That makes it much easier to test and debug the actual runtime process in the local host for cluster deployment as well.


Compatibility, Deprecation, and Migration Plan

Compatibility

  1. In programming API: 
    1. the ApplicationRunner is a new interface, no compatibility issue
    2. JobRunner is going to be deprecated and replaced by LocalJobRunner and RemoteJobRunner. However, JobRunner is a Samza internal class. Hence, no interface compatibility issue.
    3. ThreadJobFactory and ProcessJobFactory will be deprecated. The LocalJobRunner is going to use StreamProcessor embedded library to replace those two classes.
  2. In configuration: the configuration for ApplicationRunner is at a new scope: app.*. Hence, no change to the existing configuration
  3. In launch script:
    1. New applications will use run-app.sh and run-local-app.sh for remote and local execution
    2. Old task-level application will continue to use run-job.sh and run-container.sh to submit and launch the local containers, where run-job.sh will be a wrapper for a single node application
  4. In metrics: no change in the metric names and reporter interfaces

Deprecation

Existing JobRunner class is going to be deprecated, together with ThreadJobFactory/ProcessJobFactory, given that LocalJobRunner will use StreamProcessor (i.e. standalone Samza lib) to replace ThreadJobFactory/ProcessJobFactory.

Migration Plan

ApplicationRunner is currently designed for users writing new application. Existing Samza jobs using task-level API will need to do source code change to migrate to the new APIs. Hence, there is no auto-migration plan.

Rejected Alternatives

  1. Using the same interface method run() in both RemoteApplicationRunner and LocalApplicationRunner: this causes confusion since run() in LocalApplicationRunner is blocking while in RemoteApplicationRunner is non-blocking.
  2. Expose StreamProcessor to the user who wants to run the application locally: this will potentially causing the following issues:
    1. Divergence of programming model for Samza application running in YARN vs running locally
    2. Divergence of application launching model for Samza application running locally written in fluent API vs task-level API
    3. Exposure of multiple stages of jobs to the user who only cares about the end-to-end Samza application

 

  • No labels