Status

Current state: ACCEPTED

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/202001.mbox/browser

JIRASAMZA-2373

Released: Samza 1.5

Problem

Samza operates in a multi-tenant environment with cluster managers like Yarn and Mesos where a single host can run multiple Samza containers. Often due to soft limits configured for cluster managers like Yarn and no notion of dynamic workload balancing in Samza a host lands in a situation where it is underperforming and it is desired to move one or more containers from that host to other hosts. Today this is not possible without affecting other jobs on the hot host or restarting the affected job manually. In other use cases like resetting checkpoints of a single container or supporting canary or rolling bounces the ability to restart a single or a subset of containers without restarting the whole job is highly desirable. 

This doc addresses the problem of restarting/moving a single container of a job without affecting other containers of the same job or other jobs running on the same host. X 

Motivation

Alleviating Hot Host Problems: Yarn as a resource manager in Samza is configured to operate on soft limits (a job sets vcore for a container or has defaulted, this is the minimum guarantee of resources to be provided by Yarn) & most of the customers go by default and fail to right-size their Job. In addition, there is no notion of a dynamic cluster balancer in Samza at LinkedIn today. Although Yarn to some capacity acts as a Cluster Balancer if configured with hard limits. Due to this often a host lands in a situation when containers on it are underperforming (also referred to as a hot host) because it has CPU heavy containers running on it while some other hosts are underutilized. Now it is desirable to move this container to a different host. To achieve the same following solutions exists:

  1. Rewrite the locality mapping in coordinator stream and restart the job 
  2. Take the hot host out of rotation which kills containers from all the jobs running on that host. Then trigger a restart on a job whose container is supposed to be moved so Yarn would try to allocate some other host for it. Once the container starts on other hosts, then put the hot host in rotation again so that other container who were killed as a result of taking host out can be attempted to restart on the hot host again. It ain't easy!

If the ability to move a container exists, someone at the simplest can manually move containers to different hosts or write some simple scripts to automate that.

Canary / Rolling Bounces: When there is a bug in the Samza framework code that affects the Samza container deployment, Samza engineer needs to manually restart the container process on the given machine with the given binary version. This involves multiple steps (e.g. manually identify and log in the container host before using kill -9 to stop the process) which is inconvenient. With the restart ability, the same system can be used for building support for Canary or Rolling Bounces for YARN based Samza deployments, restart ability can be easily extended to deploy a single or subset of containers using a different version of application code. 

Resetting Checkpoints: Startpoint API has made resetting checkpoints easy but it still needs a dev to restart his job once he has set his start points. The restart can be potentially be prevented with restart ability of a single container or a few containers

Draining a host: Moving all running containers from a host sequentially to other hosts or in parallel.

Fix a Job in Degraded State: Often users find it desirable to just restart a single container for various reasons like underperforming containers when only a few containers running into exceptions because some partitions have corrupt messages. Today Samza kills the job if a container has run into exceptions more than a fixed configured number of times. In these scenarios, it's desirable for Users to keep the job running in a degraded state and only fix one or few containers of the job and issue a restart for them. 

Dynamic Workload Balancer: The ability to move and restart containers is the fundamental building block to developing a load balancer (like Cruise Control) for Samza. At the very simple this load balancer can be a simple script trying to balance cluster, later it can be built into a more sophisticated system. 

Heterogeneous container: Each Samza job today has homogenous containers (in regards to memory & vcore configurations). One of the desired use cases for Samza in the future is the ability to restart a container with different sizes (memory & cpu) which can be used by ay auto-sizing engine designed for samza.

Selectively Spin StandBy Containers: Samza has a feature of Hot StandBy Containers for reducing stateful restoration times. Enabling this feature for a job involves doubling the containers at the least (simplest case where every container has 1 standby replica enabled). Customers are reluctant to enable this since doubling the containers increases the cost to serve. To improve the adoption for this feature we can build the ability to spin up StandBy Containers for a single or a subset of containers while the job is running, these StandBy Containers then can be used for failover to reduce downtime.  

Requirements

Goals

  • Build the ability to move container (non-am) of a running job from its existing host to a preferred host (if specified) or any host
    • If an existing stateful container has a standby container, move to the active container to standby container 
    • If an existing stateful container is without a standby container, spin up a standby container and then move the active to standBy once stand by has bootstrapped to reduce downtime [Stretch]
  • Build the ability to restart containers with the same host preferences
  • Build the ability to selectively spin up standby for one or a subset of containers [Stretch]
  • Expose these APIs via a tool or dashboard for operability

Non-Goals

  • This system does not intend to solve the problem of Dynamic Workload Balance i.e  Cruise control. It may act as a building block for one later.
  • Solving the canary problem for YARN based deployment model is out of the scope of this solution however system built should be easily extensible to support canary 
  • This system will not have built-in intelligence to find a better match for the host for a container it will make simplistic decisions as per params passed by the user.

SLA / SCALE / LIMITS (Assumptions)

  • At a time AM for a single job will only serve one request per container, parallel requests across containers are still supported. If a control request is underway any other requests issued on the same container will be queued. Same assumption holds for in-flight requests on standby and active i.e if any container placement request is in-progress for an active or its standby replica, all subsequent placement actions on either are queued
  • Actions are de-queued and sorted in order of timestamps populated by the client and are executed in that order
  • The system should be capable of scaling to be used across different jobs at the same time 

General Requirements

At a high level the system can be divided into two logical components:

  1. Container Placement Handler (CPH): to serve as a dispatcher of specific control requests from a control plane. Fetches the container placement control action from control-plane, queues it up and dispatches requests on containers of a job (non-am) with policy (details below) 
  2. Container Placement Service: API layer built around Cluster Manager to move containers. Reacts to control actions from CPH and interacts with underlying Cluster manager to execute them 
  • Requirements for Container Placement Handler

  1. Should be able to handle multiple move request for different containers at the time for a single job
  2. Should provide the good monitoring mechanism 
  3. Should be general enough to be coupled with any Control Plane
  4. Should allow queuing requests for multiple containers across multiple jobs at the same time 
  5. Should be easily extensible to build canary support for yarn deployments 
  • Requirements for Container Placement Service

  1. Under no circumstances should Control Actions result in an inconsistent / non-working state of the job (i.e., #running-containers should equal number of configured containers).
  2. Should not be tightly coupled with YARN, should be extensible to be used by any cluster manager implementation

Failure Scenarios & Assumptions

  1. Each control action will be associated with a deployment id of an app it is intended to be issued upon
  2. Across Job restarts, any requests issued for the previous deployment incarnation will not be respected. This includes AM restarts (today if the AM restarts that implies a job restart)

Glossary


Term

Description

Cluster Manager

Resource manager used by samza eg. Yarn, Mesos, etc

Job Coordinator (JC) { Also referred to as ApplicationMaster (AM) for Yarn }

Each Samza application has a JC that manages the application’s workload, asks RM for containers and handles notifications when one of its containers fails.

Node Manager (NM)

A single node in a Yarn Cluster

Resource Manager (RM)

Central Service that provides like scheduling, heartbeats, liveness monitoring to all applications running in the YARN cluster

Host Affinity

The ability of Samza to allocate a container to the same machine across job restarts is referred to as host-affinity. 

Container processorId

Samza allocates containers resource Ids eg 0, 1, 2 which remains the same across restarts of a job

Proposed Solution:

Solution 1. Write locality to Coordinator Stream (Metastore) and restart job [Rejected]

This approach proposes to directly change the container locality messages (Container to host mapping messages) in the Metadata Store (currently Kafka) and issuing a restart

Pros

Cons

  • Simple to implement the current tool does that for host affinity enabled jobs (since they maintain locality mapping)
  • Needs a job restart and does a best effort to get preferred hosts for containers but has no guarantee on getting them
  • If a job has standby containers enabled, this method involves changing standby mapping in addition to active container mappings 
  • Job faces downtime when the job has hundreds of containers and only one of them needs to be restarted, if it is stateful there is a likelihood that containers might not get the new asked resource on the restart and start bootstrapping
  • This solution is not scalable to be used by Controllers who want to take multiple control actions on containers across several jobs, for example, auto-sizing controller
  • This method will not be work for building Canary / Cluster Balancer

Solution 2. Container Placement API [Accepted]

API design

On the basis of types of Control actions, the commands are the following:       

1. Container Placement Actions (Move / Restart)

API

placeContainer

Description

Active Container: Stops container process on source-host and starts it for 

  1. Stateless Job on either
    1. Destination-host (destination host can be source as well)
    2. Any host (destination-host = ANY_HOST)
  2. Stateful Job on either 
    1. Destination-host (if specified, destination host can be source as well)
    2. Standby Container (destination-host = STANDBY)
    3. Any host (destination-host = ANY_HOST)

StandBy Container: Stops container process on source-host and starts it on:

    1. Destination-host (if specified & matches StandBy Constraints)
    2. Any host (otherwise which matches StandBy Constraints)

Parameters

deploymentId: unique identifier of the deployed app for which the action is taken

processor-id: Samza resource id of container e.g 0, 1, 2 

destination-host: valid hostname / “ANY_HOST” / “STANDBY”

request-expiry-timeout: [optional]: timeout for any resource request to the cluster manager 

Status code

CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED

Returns

UUID for the client to query the status of request

Failure Scenarios

There are following cases under which a request to place container might fail:

  1. When an active container stop fails, in this case, we mark the request failed
  2. When requested resources cannot be obtained from the cluster manager, in this case, we mark the request failed
  3. When stopped active container fails to start on destination host in that case we mark the request failed and attempt to start on the source host, failure to do so results in starting the same on ANY_HOST


Note: For supporting canary above parameter list can be easily extended to support the following parameters

Parameters

app-version: user application version [optional]

samza-version: samza framework version [optional]

jvm-args: arbitrary string to be used as jvm arguments [optional]


2. Container Status

API

containerStatus

Description

Gives the status & info of the container placement request, for ex is it running, stopped what control commands are issued on it

Parameters

processor-id: Samza resource id of container e.g 0, 1, 2 

deploymentId: unique identifier of the deployed app for which the action is taken

Status code

BAD_REQUEST, SUCCEEDED

Returns

Status of the Container placement action 


2. Enable & Disable StandBy [Stretch]

API

controlStandBy

Description

Starts or Stops a standBy container for the active container

Parameters

processor-id: Samza resource id of container e.g 0, 1, 2 

deploymentId: unique identifier of the deployed app for which the action is taken

Status code

CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED

Returns UUID for the client to query the status of the request

Architecture

For implementing a scalable container placement control system, the proposed solution is divided into two parts:

Part 1. Container Placement Handler

  1. Control Plane is a channel outside the job that allows taking control actions by multiple controllers like Samza Dashboard, Startpoints controller. 
  2. ContainerPlacementHandler is a stateless handler registered to control plane that dispatches placement actions to invoke Container Placement Service APIs


This control plane can be implemented in the following ways 

Option 1: Samza Metastore API [Preferred]

Samza Metastore provides an API to write to the coordinator stream. One simple way to expose Container Placement API is, Container Placement handler can have a coordinator stream consumer polling control messages from the coordinator stream

under a separate namespace (different than the one where locality messages of containers live)  & acting on them.

Pros

Cons

  • No need to build Authentication & Authorization, already handled by the Metadata auth service
  • No need to enable Rate limiting since requests are queued so the flow of requests can be regulated at the consumer side
  • If AM dies there can be still queued requests in Coordinator Stream, such requests have to be handled across AM restarts
  • Coordinator stream is log compacted so control messages written to the coordinator stream need to be deleted to prevent it from growing to large sizes which can affect job start times


Option 2: Stub a Rest Endpoint in JC [Rejected Alternative]

It's fairly simple to embed a REST endpoint in JC or extend the existing Rest endpoint in JC that exposes configs in JC to support apis listed above 

Pros

Cons

  • Simple to extend the existing REST endpoint
  • Need to build authentication
  • If the AM dies all the outstanding requests are discarded (no additional handling needed)
  • Need to build Authorization layer around these rest endpoints 
  • Loading the already Heavy loaded Job coordinator with another service might cause an increase in memory used
  • Need to build a service for discovery or rely on Yarn embedded Servlet
Implementation Details:
  1. ContainerPlacementHandler is a stateless handler dispatching ContainerPlacementRequestMessages from Metastore to Container Placement Service & ContainerPlacementResponseMessages from Container Placement Service to metastore for external controls to query the status of an action. (PR). 
  2. Metastore used today by in Samza by default is Kafka (coordinator stream) which is used to store configs & container mappings & is log compacted
  3. ContainerPlacementRequestMessage & ContainerPlacementResponseMessage are maintained in same namespace using NamespaceAwareMetaStore ("samza-place-container-v1")

Key-Value Format

Key for storing the ContainerPlacementRequestMessage & ContainerPlacementResponseMessage in Metastore is chosen to be UUID + "." + messageType(ContainerPlacementResponseMessage or ContainerPlacementRequestMessage). Value will be payload container ContainerPlacementRequestMessage & ContainerPlacementResponseMessage. Messages are written and read to the Metastore through the MetadataStore abstraction. 

ContainerPlacementRequestMessage:

KeyValueField DescriptionField Type
"UUID.subType"uuidUnique identifier of a response messageRequired

processorId 

Logical processor id 0,1,2 of the containerRequired

deploymentIdUnique identifier for a deploymentRequired

subTypeType of message here: ContainerPlacementRequestMessageRequired

destinationHostDestination host where the container is desired to be movedRequired

statusCodeStatus of the requestRequired

timestampThe timestamp of the response messageRequired

requestExpiryRequest expiry which acts as a timeout for any resource request to cluster resource managerOptional

Sample KV

KeyValue

[1,"samza-place-container-v1","f068175b-c9b6-4f34-982b-ecb5619f21de.ContainerPlacementRequestMessage"]

{"processorId":"1","deploymentId":"app-atttempt-001","subType":"ContainerPlacementRequestMessage","uuid":"f068175b-c9b6-4f34-982b-ecb5619f21de","destinationHost":"ANY_HOST","statusCode":"CREATED","timestamp":1578693870484}


ContainerPlacementResponseMessage:

KeyValueField DescriptionField Type
"UUID.subType"uuidUnique identifier of a response messageRequired

processorId 

Logical processor id 0,1,2 of the containerRequired

deploymentIdUnique identifier for a deploymentRequired

subTypeType of message here: ContainerPlacementResponseMessageRequired

destinationHostDestination host where the container is desired to be movedRequired

statusCodeStatus of the responseRequired

responseMessageResponse message in conjunction to statusRequired

timestampThe timestamp of the response messageRequired

requestExpiryRequest expiry which acts as a timeout for any resource request to cluster resource managerOptional

Sample KV

KeyValue

[1,"samza-place-container-v1","88b0d30c-d518-4307-9e8e-c8529eb30f04.ContainerPlacementResponseMessage"]

{"processorId":"1","deploymentId":"app-atttempt-001","subType":"ContainerPlacementResponseMessage","responseMessage":"Request is accepted","uuid":"88b0d30c-d518-4307-9e8e-c8529eb30f04","destinationHost":"ANY_HOST","statusCode":"ACCEPTED","timestamp":1578694070875}

Challenges with Metastore

Metastore today (Kafka) is at least once & eventually consistent, hence ContainerPlacementService has to do in-memory caching of UUIDs of accepted actions so that it does not take one request twice in case of duplicates delivered. But the in-memory caching must not be an unbounded cache since that can result in a job running out of memory. Size of a UUID is 16bytes, at max a job lets say might have 500 containers, then one request action per container for 500 containers will result in 0.008 MBs of increase memory (just in memory lookup). If we cache lets say last 20K actions (which can accomodate 40 failovers of 500 containers in the current scenario) the memory used will be 0.64 MBs at max if we implement a FIFO cache (inmemory lookup + fifo queue).

GC policy for stale messages in metastore

  1. One way to delete stale ContainerPlacementMessages is to delete request/responses from the previous incarnation of the job in the metastore on job restarts, this is the responsibility of ContainerPlacementService 
  2. Once the request is complete, ContainerPlacementService can issue an async delete to clean up the request from the metastore
  3. Request/response message can be externally cleaned by a tool
Part 2. Container Placement Service

Container Placement service is a set of APIs built around AM to move/restart containers. Container Placement Service periodically gets a queue of placement actions per container which it issues in parallel across different containers but sequentially on one container. Each placement request has a "deploymentId" attached to it because if a job restarts all the placement actions queued for the previous deployment must be disregarded and deleted. Samza internally has an id generated for each run of a job ("app.run.id") that is generated at the job planning phase we can use that id as the "deploymentId" for placement requests.

The solution proposes to refactor & simplify the current AM code & introduce a ContainerManager which is a single entity managing container actions like start, stop for both active and standby containers. Enlisted are functions of ContainerManager & proposed refactoring around the AM code 

  • Remove the HostAwareContainerAllocator & ContainerAllocator, simplify Container Allocator as a simple lightweight entity allocating requests to available resources (PR1, PR2)
  • Introduce ContainerManager which acts as a brain for validating and issuing any actions on containers in the Job Coordinator for both active & Standby containers. (PR)
    • Transfer state & validation of container launch & expired request handling from ContainerAllocator to ContainerManager
    • Transfer state & lifecycle management of Container allocator & resource request on job start (reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager*
  • Encapsulates logic and state related to container placement actions like move, restarts for active & standby container in ContainerManager (PR-1, TDB)
    • It is ContainerManager’s duty to validate any ContainerPlacementRequestMessages & also invalidate messages from the previous deployment incarnation
    • It is ContainerManager’s duty to write ContainerPlacementResponseMessages to Metastore for the external controller to query the status of the request
    • ContainerPlacementMetadata is a metadata holder for container actions (ControlActionMetadata) for a request_id, current status, requested resources etc


Note: *ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes will be done except for moving state & lifecycle management of Container allocator & resource request on job start (reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests. 

2.1 Container Move

2.1.1 Stateless Container Move & Stateful Container Move (without Standby)

Option 1: Reserve-Stop-Move: Request Resource first and Only make a move once resources are allocated by ClusterResource Manager (Preferred)

Pros

Cons

  • Container only moves when a preferred resource is available for the container to start
  • Offers stronger move semantics more effective when used with a Workload balancer 
  • Complex implementation as compared to Option 2


Option 2: Stop-Reserve-Move: Stop the container and then make a resource request for preferred resources and attempt a start (Rejected Alternative)

Pros

Cons

  • Easier to implement
  • Weaker Move semantics since if preferred resources are not available and since the container is already stopped then clearly this has a higher chance of moving container to anything other than a preferred host which is ineffective for a workload balancer

Orchestration of Stateless Container Move using Option 1: The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.

The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.

  1. ContainerPlacementHandler dispatches the move request to ContainerProcessManager 
  2. ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
  3. Now there are two possible scenarios
    1. Requested Resources are allocated by ClusterResourceManager 
      1. In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
      2. On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage  on allocated resources 
      3. If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
  4. Resource Request expires
    1. In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected


Who writes the new locality mapping after a successful move?

Samza container on a successful start write their new locality message to the metadata store (code), hence after a successful move container writes its new locality

Failure Scenarios:

  • If the preferred resources are not able to be acquired the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
  • If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that  case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
  • If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then container fallbacks to source host and a failure notification are sent for the ContainerPacementRequest. If a container fails to start on source host then an attempt is made to start on ANY_HOST


Note: ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes (in part 2) will be done except for moving state & lifecycle management of Container allocator & resource request on job start(reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests. 

Option 3: Stateful without Standby (Spin Up StandBy container & then move) (Phase 2) [Strech]

If we expose an API to spin a StandBy Container for an active container, a client can then make two requests, one to spin up a StandBy then periodically monitor the lag for StandByCotntainer (Using Diagnostics). Once StandBy Container has a steady lag is ready to move, this case becomes Case 1 mentioned above. (More details TBD later)

 

2.1.2 Stateful Container Move with Standby Enabled

The stateful container is divided into two cases on the basis of whether the job has StandBy Container for it or not

Let's take a case when the job has a stateful container C on H1 and has a StandBy container enabled C’ on H2

Option 1: Stop-Reserve-Move: Initiate a Stop on C & issue a Standby failover (Preferred - Phase 1*)

In this option, C is stopped on H1 and failover is initiated from H1 to H2 with current semantics of Failover of an active Container in the Hot standby feature. This ensures C moves either to H2 (in the best case) or to some other hosts (when Cluster Manager cannot allocate capacity on H2, i.e H2 is under capacity)

Pros

Cons

  • Easy to implement, ability to do this already exists 
  • C always moves from H1 and there a slim chance that C won’t land on H2 as per the current semantics (stolen-host scenario: Cluster Manager fails to allocate H2 for C)


Option 2: Reserve-Stop-Move: Initiate a Standby move first then issue a move for Active Container (Preferred - Phase 2*)

  1. In this option request is first issued to stop C’ 
  2. Then request H2 from cluster manager to start C 
  3. Issue a stop on C only H2 can be allocated (#2 succeeds). 
  4. Then request C’ to start ANY_HOST except for H1, H2 

Two ways to achieve this: 

  • A client can make two calls for this first move C’ to ANY_HOST apart from H2, H1 (similar to a stateless move), then move the request of C to C’ (similar to a stateless move) so the state maintenance of this lives on the client
  • Maintain state in JC to accomplish this 

Pros

Cons

  • Stronger move semantics, since the active container won’t be stopped if it cannot be moved to the standby container
  • More complex as compared to Option 1 needs to maintain more state  

* Phase 1 & Phase 2 refer to the implementation phases, please see the section: Implementation Plan 


Option 3: Reserve-Stop-Move: First Request H2 from Cluster Manager and initiate the failover once you get H2 as a preferred resource [Rejected Alternative]

  • In this option C’ is stopped on H2 and failover is initiated from H1 to H2 with current semantics as developed by the feature Hot standby
  • In case of the move failed only affects standby

Pros

Cons

  • Stronger move semantics are guaranteed, the active container won’t be stopped unless it can be started with H2
  • Since there is a standby already running on H2, there is a higher chance that cluster manager might fail to allocate for H2 and move requests are ineffective (Host is running full capacity)
  • Need to design a new failover scheme hence more development time to implement

Orchestration of StandBy Enabled Container Move using Option 1 (Phase 1)

  1. ContainerPlacementHandler dispatches the move request to ContainerProcessManager
  2. ContainerProcessManager registers a move & initiates a failover with StandByContainerManager then the following failover sequence is followed

2.2 Container Restart

This is much simpler as compared to Move. We can either make restart equivalent to a move or issue a stop a container and ContainerProcessManager will try to start that container again. Let's discuss these options in detail:

Option 1: Reserve-Stop-Start: Restart is equivalent to move on the same host [Preferred]

  • In this option, resources are requested on the same host first before issuing a stop on Container
  • Once the resources are accrued a container is stopped & requested to start 
  • So this is equivalent to move semantics above 

Pros

Cons

  • Strong Restart semantics since the container is not stopped if it cannot be restarted on the same resource
  • For a container to restart, it will be holding 2x resources on the same host for the time it has accrued resources to the time when the active container is issued a stop

Both stateless & stateful container restarts will be equivalent to Stateless Container Move on the same host. Since the container is restarted on the same host then


Option 2: Stop-Reserve-Start: Restart is equivalent to stopping container first then attempting to start it [Rejected Alternative]

  • In this option, a container is issued a stop first
  • Once the container stops, resources are requested for starting on the same host (last seen host)
  • Once the resources are accrued, the container is issued a start on the same host

Pros

Cons

  • The container at any point in time is only holding resources it needs to start on a host 
  • Weaker Restart semantics since there is a likely chance of restarting this container on any other host then the source host (since container is stopped & when Cluster Manager cannot return resources)

Usage Example:

 place-container --deployment-id 1581635852024-5117e303 --app-name snjain-test-cp --app.id = 1 --processor-id 4 --request-expiry 10 --destination-host abc.prod.com
@CommandLine.Command(name = "place-container", description = "Request to move/restart container at destination-host")
public class ContainerPlacementTool {
    ...
    _appName = // read from commandline
    _appId = // read from commandline
    _deploymentId = // read from commandline
    _processorId = // read from commandline
    _destinationHost = // read from commandline
    _requestExpiry = // read from commandline
    
    MetadataStore metadataStore = buildMetadataStore(_appName, _appId);
    try {
      ContainerPlacementMetadataStore containerPlacementMetadataStore =
          new ContainerPlacementMetadataStore(metadataStore);
      containerPlacementMetadataStore.start();
      Duration requestExpiry = _requestExpiry != null ? Duration.ofSeconds(_requestExpiry) : null;
      UUID uuid = containerPlacementMetadataStore.writeContainerPlacementRequestMessage(_deploymentId, _processorId,
          _destinationHost, _requestExpiry, System.currentTimeMillis());
      System.out.println("Request received query the status using: " + uuid);
    } finally {
      metadataStore.close();
   }
}


Public Interfaces


ContainerPlacementMessage.java
/**
* Encapsulates the request or response payload information between the ContainerPlacementHandler service and external
* controllers issuing placement actions
*/
@InterfaceStability.Evolving
public abstract class ContainerPlacementMessage {

public enum StatusCode {
 /**
  * Indicates that the container placement action is created
  */
 CREATED,

 /**
  * Indicates that the container placement action was rejected because request was deemed invalid
  */
 BAD_REQUEST,

 /**
  * Indicates that the container placement action is accepted and waiting to be processed
  */
 ACCEPTED,

 /**
  * Indicates that the container placement action is in progress
  */
 IN_PROGRESS,

 /**
  * Indicates that the container placement action is in progress
  */
 SUCCEEDED,

 /**
  * Indicates that the container placement action is in failed
  */
 FAILED;
}

/**
* UUID attached to a message which helps in identifying duplicate request messages written to metastore and not
* retake actions even if metastore is eventually consistent
*/
protected final UUID uuid;
/**
* Unique identifier for a deployment so messages can be invalidated across a job restarts
* for ex yarn bases cluster manager should set this to app attempt id
*/
protected final String deploymentId;
// Logical container Id 0, 1, 2
protected final String processorId;
// Destination host where container is desired to be moved
protected final String destinationHost;
// Optional request expiry which acts as a timeout for any resource request to cluster resource manager
protected final Duration requestExpiry;
// Status of the current request
protected final StatusCode statusCode;
// Timestamp of the request or response message
protected final long timestamp;

protected ContainerPlacementMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
   Duration requestExpiry, StatusCode statusCode, long timestamp) {…}

}
ContainerPlacementRequestMessage
/**
* Encapsulates the request sent from the external controller to the JobCoordinator to take a container placement action
*/
public class ContainerPlacementRequestMessage extends ContainerPlacementMessage {

public ContainerPlacementRequestMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...}

public ContainerPlacementRequestMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, long timestamp) {...}
}
ContainerPlacementResponseMessage
/**
* Encapsulates the response sent from the JobCoordinator for a container placement action
*/
public class ContainerPlacementResponseMessage extends ContainerPlacementMessage {
 // Returned status of the request
 private String responseMessage;

 public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
     Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {...}

 public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
     StatusCode statusCode, String responseMessage, long timestamp) {...}

Other Interfaces


ContainerPlacementMetadataStore
/**
 * Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage}
 * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage}
 */
public class ContainerPlacementMetadataStore {

/**
 * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore. This method should be used by external controllers
 * to issue a request to JobCoordinator
 *
 * @param deploymentId identifier of the deployment
 * @param processorId logical id of the samza container 0,1,2
 * @param destinationHost host where the container is desired to move
 * @param requestExpiry optional per request expiry timeout for requests to cluster manager
 * @param timestamp timestamp of the request
 * @return uuid generated for the request
 */
public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost,
      Duration requestExpiry, long timestamp) {...}

/**
 * Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore
 * @param uuid uuid of the request
 * @return ContainerPlacementRequestMessage is its present
 */
public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(UUID uuid) {...}

/**
 * Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore
 * @param uuid uuid of the request
 * @return ContainerPlacementResponseMessage is its present
 */
public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(UUID uuid) {..}

/**
 * Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code uuid}
 * @param uuid uuid of the request
 */
public void deleteContainerPlacementRequestMessage(UUID uuid) {..}

/**
 * Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code uuid}
 * @param uuid uuid of the request
 */
public void deleteContainerPlacementResponseMessage(UUID uuid) {..}

/**
 * Deletes both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} identified by uuid
 * @param uuid uuid of request and response message
 */
public void deleteAllContainerPlacementMessages(UUID uuid) {...}

/**
 * Deletes all {@link ContainerPlacementMessage}
 * @param uuid uuid of the request or response message
 */
public void deleteAllContainerPlacementMessages(UUID uuid) {..}

/**
 * Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore. 
 * This method should be used by Job Coordinator only to write responses to Container Placement Action
 * @param responseMessage response message
 */
void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..}

}
ContainerPlacementRequestAllocator
/**
 * Stateless handler that periodically dispatches {@link ContainerPlacementRequestMessage} read from Metadata store to Job Coordinator
 */
public class ContainerPlacementRequestAllocator implements Runnable {

@Override
  public void run() {...}
}
ContainerManager
public class ContainerManager {
/**
* Registers a container placement action to move the running container to destination host
*
* @param requestMessage request containing details of placement request
* @param containerAllocator to request physical resources
*/
public void registerContainerPlacementAction(ContainerPlacementRequestMessage requestMessage, ContainerAllocator containerAllocator) {...}

/**
* Handles the container start action for both active & standby containers. This method is invoked by the allocator thread
*
* @param request pending request for the preferred host
* @param preferredHost preferred host to start the container
* @param allocatedResource resource allocated from {@link ClusterResourceManager}
* @param resourceRequestState state of request in {@link ContainerAllocator}
* @param allocator to request resources from @{@link ClusterResourceManager}
*
* @return true if the container launch is complete, false if the container launch is in progress. 
*/
boolean handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource,
   ResourceRequestState resourceRequestState, ContainerAllocator allocator) {..}

/**
* Handle the container launch failure for active containers and standby (if enabled).
*
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param preferredHost host on which container is requested to be deployed
* @param containerAllocator allocator for requesting resources
*/
void handleContainerLaunchFail(String processorId, String containerId, String preferredHost,
   ContainerAllocator containerAllocator) {...}

/**
* Handles the state update on successful launch of a container
*
* @param processorId logical processor id of container 0,1,2
*/
void handleContainerLaunchSuccess(String processorId) {...}

/**
* Handles the action to be taken after the container has been stopped.
*
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param preferredHost host on which container was last deployed
* @param exitStatus exit code returned by the container
* @param preferredHostRetryDelay delay to be incurred before requesting resources
* @param containerAllocator allocator for requesting resources
*/
void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus,
   Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {..}

/**
* Handles an expired resource request for both active and standby containers.
*
* @param processorId logical id of the container
* @param preferredHost host on which container is requested to be deployed
* @param request pending request for the preferred host
* @param allocator allocator for requesting resources
* @param resourceRequestState state of request in {@link ContainerAllocator}
*/
void handleExpiredRequest(String processorId, String preferredHost,
   SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {..}
}

ContainerPlacementMessage
public class ContainerPlacementMetadata {
 /**
  * State to track container failover
  */
 public enum ContainerStatus { RUNNING, STOP_IN_PROGRESS, STOPPED }
 // Container Placement request message
 private final ContainerPlacementRequestMessage requestMessage;
 // Host where the container is actively running
 private final String sourceHost;
 // Resource requests issued during this failover
 private final Set<SamzaResourceRequest> resourceRequests;
 // State of the control action
 private ContainerPlacementMessage.StatusCode actionStatus;
 // State of the active container to track failover
 private ContainerStatus containerStatus;
 // Represents information on current status of action
 private String responseMessage;

}

Implementation and Test Plan

Implementation Plan


Phase 

Feature

Timeline 

Phase 1

  • Design Discussion & SEP
  • Stateless Container
    • Move: Option 1
    • Restart ability
  • Stateful Container with StandBy
    • Move: Option 1
    • Restart
  • Stateful Container without StandBy
    • Move like Stateless Container Move
    • Restart ability
  • Expose this ability to be used by Samza Users
  • Integrate with samza-tool (Coordinator Stream Writer)

Q4 2019

Phase 2

  • Stateful Container without StandBy
    • API to enableStandBy
  • StandBy Container Move 
  • Stateful Container with StandBy
    • Move: Option 2
  • Integration with Control Plane

TBD

Test Specifications

Compatibility, Deprecation, and Migration Plan

  • The new interfaces & API introduced should not affect any part of the User code & should be backward compatible. No migration needed for this new feature.
  • Jobs using older samza version shall be able to discard the Control messages written to the metastore

Rejected Alternatives

Note: These are described in the implementation along with preferred options above for the sake of consistency in reading

References

  1. https://www.cloudera.com/documentation/enterprise/5-10-x/topics/cm_mc_rolling_restart.html
  2. SEP-19: Hot standby state for Samza applications





  • No labels

6 Comments

  1. ContainerPlacementRequestMessage
    How is this different from the class above?
    1. Its exactly same as ContainerPlacementMessage but in future this class can be easily evolved to include more params

  2. ContainerPlacementHandler
    Why would tools use ContainerPlacementHandler to writeContainerPlacementRequestMessage?
    1. Shall I abstract it out as another util? 

  3. ContainerManager
    What does handleExpiredRequestForControlActionOrHostAffinityEnabled mean?
    1. I have changed it in code to handleExpiredRequest for the sake of brevity, expired request only apply to host affinity enabled cases & in cases of control actions