Gossiper

The gossiper is responsible for making sure every node in the system eventually knows important information about every other node's state, including those that are unreachable or not yet in the cluster when any given state change occurs.

API

Information to gossip is wrapped in an ApplicationState object, which is essentially a key/value pair. (See "Data Structures" below for more detail.) The gossiper propagates these to other nodes, where interested classes subscribe to changes via the IEndPointStateChangeSubscriber interface. This provides onJoin, onAlive, and onDead methods indicating the obvious things, and onChange for ApplicationState changes. onChange is called once for each ApplicationState. There are two non-obvious properties to this:

  1. If a node makes multiple changes to a given ApplicationState key, other nodes are guaranteed to see the most recent one but not intermediate ones

  2. There is no provision for deleting an ApplicationState entirely

Gossiper implementation

Gossip timer task runs every second. During each of these runs the node initiates gossip exchange according to following rules:

  1. Gossip to random live endpoint (if any)
  2. Gossip to random unreachable endpoint with certain probability depending on number of unreachable and live nodes
  3. If the node gossiped to at (1) was not seed, or the number of live nodes is less than number of seeds, gossip to random seed with certain probability depending on number of unreachable, seed and live nodes.

These rules were developed to ensure that if the network is up, all nodes will eventually know about all other nodes. (Clearly, if each node only contacts one seed and then gossips only to random nodes it knows about, you can have partitions when there are multiple seeds -- each seed will only know about a subset of the nodes in the cluster. Step 3 avoids this and more subtle problems.)

This way a node initiates gossip exchange with one to three nodes every round (or zero if it is alone in the cluster)

Data structures

HeartBeatState

Consists of generation and version number. Generation stays the same when server is running and grows every time the node is started. Used for distinguishing state information before and after a node restart. Version number is shared with application states and guarantees ordering. Each node has one HeartBeatState associated with it.

ApplicationState

Consists of state and version number and represents a state of single "component" or "element" within Cassandra. For instance application state for "load information" could be (5.2, 45), which means that node load is 5.2 at version 45. Similarly a node that is bootstrapping would have "bootstrapping" application state: (bxLpassF3XD8Kyks, 56) where first one is bootstrap token, and the second is version. Version number is shared by application states and HeartBeatState to guarantee ordering and can only grow.

EndPointState

Includes all ApplicationStates and HeartBeatState for certain endpoint (node). EndPointState can include only one of each type of ApplicationState, so if EndPointState already includes, say, load information, new load information will overwrite the old one. ApplicationState version number guarantees that old value will not overwrite new one.

endPointStateMap

Internal structure in Gossiper that has EndPointState for all nodes (including itself) that it has heard about.

Gossip Exchange

GossipDigestSynMessage

Node starting gossip exchange sends GossipDigestSynMessage, which includes a list of gossip digests. A single gossip digest consists of endpoint address, generation number and maximum version that has been seen for the endpoint. In this context, maximum version number is the biggest version number in EndPointState for this endpoint. An example to illustrate this better:

Suppose that node 10.0.0.1 has following information in its endPointStateMap (remember that endPointStateMap includes also node itself):

EndPointState 10.0.0.1
  HeartBeatState: generation 1259909635, version 325
  ApplicationState "load-information": 5.2, generation 1259909635, version 45
  ApplicationState "bootstrapping": bxLpassF3XD8Kyks, generation 1259909635, version 56
  ApplicationState "normal": bxLpassF3XD8Kyks, generation 1259909635, version 87
EndPointState 10.0.0.2
  HeartBeatState: generation 1259911052, version 61
  ApplicationState "load-information": 2.7, generation 1259911052, version 2
  ApplicationState "bootstrapping": AujDMftpyUvebtnn, generation 1259911052, version 31
EndPointState 10.0.0.3
  HeartBeatState: generation 1259912238, version 5
  ApplicationState "load-information": 12.0, generation 1259912238, version 3
EndPointState 10.0.0.4
  HeartBeatState: generation 1259912942, version 18
  ApplicationState "load-information": 6.7, generation 1259912942, version 3
  ApplicationState "normal": bj05IVc0lvRXw2xH, generation 1259912942, version 7

In this case max version number for these endpoints are 325, 61, 5 and 18 respectively. A gossip digest for endpoint 10.0.0.2 would be "10.0.0.2:1259911052:61" and essentially says "AFAIK endpoint 10.0.0.2 is running generation 1259911052 and maximum version is 61". When the node sends GossipDigestSynMessage, there will be exactly one gossip digest per known endpoint. That is, in this case GossipDigestSynMessage contents would be: "10.0.0.1:1259909635:325 10.0.0.2:1259911052:61 10.0.0.3:1259912238:5 10.0.0.4:1259912942:18". HeartBeatState version number is not necessarily always the biggest, but that is the most common situation by far.

Main code pointers:

Gossiper.GossipTimerTask.run: Main gossiper loop
Gossiper.makeRandomGossipDigest: Constructs gossip digest list to be used in GossipDigestSynMessage
Gossiper.makeGossipDigestSynMessage: Constructs GossipDigestSynMessage from a list of gossip digests

GossipDigestAckMessage

A node receiving GossipDigestSynMessage will examine it and reply with GossipDigestAckMessage, which includes _two_ parts: gossip digest list and endpoint state list. From the gossip digest list arriving in GossipDigestSynMessage we will know for each endpoint whether the sending node has newer or older information than we do. An example to illustrate this:

Suppose that we're now in node 10.0.0.2 and our endPointState is as follows:

EndPointState 10.0.0.1
  HeartBeatState: generation 1259909635, version 324
  ApplicationState "load-information": 5.2, generation 1259909635, version 45
  ApplicationState "bootstrapping": bxLpassF3XD8Kyks, generation 1259909635, version 56
  ApplicationState "normal": bxLpassF3XD8Kyks, generation 1259909635, version 87
EndPointState 10.0.0.2
  HeartBeatState: generation 1259911052, version 63
  ApplicationState "load-information": 2.7, generation 1259911052, version 2
  ApplicationState "bootstrapping": AujDMftpyUvebtnn, generation 1259911052, version 31
  ApplicationState "normal": AujDMftpyUvebtnn, generation 1259911052, version 62
EndPointState 10.0.0.3
  HeartBeatState: generation 1259812143, version 2142
  ApplicationState "load-information": 16.0, generation 1259812143, version 1803
  ApplicationState "normal": W2U1XYUC3wMppcY7, generation 1259812143, version 6

Remember that the arriving gossip digest list is: "10.0.0.1:1259909635:325 10.0.0.2:1259911052:61 10.0.0.3:1259912238:5 10.0.0.4:1259912942:18". When the receiving end is handling this, following steps are done:

Sort gossip digest list

Sort gossip digest list according to the difference in max version number between sender's digest and our own information in descending order. That is, handle those digests first that differ mostly in version number. Number of endpoint information that fits in one gossip message is limited. This step is to guarantee that we favor sending information about nodes where information difference is biggest (sending node has very old information compared to us).

Examine gossip digest list

At this stage we go through the arriving gossip digest list and construct the two parts of GossipDigestAckMessage mentioned above (gossip digest list and endpoint state list). Let us go through the example digest one by one:

10.0.0.1:1259909635:325 In our own endPointStateMap the generation is the same, so 10.0.0.1 has not rebooted since we have last heard of it. Version number in the digest is bigger than our max version number (325 > 324), so we have to ask the sender what has happened since version 324. For this purpose we include a gossip digest 10.0.0.1:1259909635:324, which says "I know about 10.0.0.1 only until generation 1259909635, version 324, please tell me anything that is newer than this".

10.0.0.2:1259911052:61 When examining this, we notice that we know more than the sender about 10.0.0.2 (generations match, but our version is bigger 63 > 61). Sender's max version is 61, so we look for any states that are newer than this. As we can see from the endPointStateMap, there are two: Application state "normal" (version 62) and HeartBeatState (version 63). We send these ApplicationStates to the sender. Please note that in this case we are not sending digests, as digest only tells the maximum version number. In this case we already know that there is difference, so we will send full ApplicationStates.

10.0.0.3:1259912238:5 In this case generations do not match. Our generation is smaller than the arriving, so 10.0.0.3 must have rebooted. We will ask all data from the sender for generation 1259912238 starting from smallest version number 0. That is, we insert gossip digest 10.0.0.3:1259912238:0 to the reply.

10.0.0.4:1259912942:18 We do not know anything about this endpoint, so we proceed in the same manner as 10.0.0.3 and ask for all information. Insert digest 10.0.0.4:1259912942:0 to the reply.

At this point we have constructed GossipDigestAckMessage, which includes following information:

10.0.0.1:1259909635:324
10.0.0.3:1259912238:0
10.0.0.4:1259912942:0
10.0.0.2:[ApplicationState "normal": AujDMftpyUvebtnn, generation 1259911052, version 62], [HeartBeatState, generation 1259911052, version 63]

We now send this GossipAckMessage to the sender of GossipSynMessage

Main Code Pointers:

GossipDigestSynVerbHandler.doVerb: Main function for handling GossipDigestSynMessage
GossipDigestSynVerbHandler.doSort: Sorts gossip digest list
Gossiper.examineGossiper: Examine gossip digest list
Gossiper.makeGossipDigestAckMessage: Constructs GossipDigestAckMessage from a list of gossip digests

GossipDigestSynVerbHandler

GossipDigestAck2Message

The GossipDigestAck2Message is very similar to the GossipDigestAckMessage, but is performed in the opposite direction. That is, if 10.0.0.2 previously sent the GossiperDigestAckMessage to 10.0.0.1, now 10.0.0.1 will send a GossipDigestAck2Message back to 10.0.0.2 containing any information that it requested or needs to be updated. Continuing the previous example, 10.0.0.1 will send the following information to 10.0.0.2:

10.0.0.1:[ApplicationState "load-information": 5.2, generation 1259909635, version 45], [ApplicationState "bootstrapping": bxLpassF3XD8Kyks, generation 1259909635, version 56], [ApplicationState "normal": bxLpassF3XD8Kyks, generation 1259909635, version 87], [HeartBeatState, generation 1259909635, version 325]
10.0.0.3:[ApplicationState "load-information": 12.0, generation 1259912238, version 3], [HeartBeatState, generation 1259912238, version 3]
10.0.0.4:[ApplicationState "load-information": 6.7, generation 1259912942, version 3], [ApplicationState "normal": bj05IVc0lvRXw2xH, generation 1259912942, version 7], [HeartBeatState: generation 1259912942, version 18]

Simplified, this what 10.0.0.2 asked for in the previous GossipDigestAckMessage: everything after version 324 for 10.0.0.1's current generation, and everything for both 10.0.0.3 and 10.0.0.4 at their current generations. After 10.0.0.2 applies this information, 10.0.0.1 and 10.0.0.2 will have synced their gossip information, and this gossip round is complete.

stats

ArchitectureGossip (last edited 2013-11-13 00:03:50 by GehrigKunz)