Edward Capriolo

Saturday Jan 31, 2015

Request coordination, eventual consistency, and CAP - Building a NoSQL store - Part 10

In the last blog we mentioned the CAP theorem but now it is time to dive in. Distributed systems face challenges that non distributed systems do not. If data is replicated the system needs some notion of where data is, how it is replicated for redundancy, how the data is kept in sync, and what operations are available to the user.

"Eventual consistency" was the "Affordable Care Act" of the Dynamo Style (Cassandra) vs Strongly Consistent (HBase) debate. So embattled was the term "Eventual consistency" that "Tunable consistency" was coined. Fast forward some time and Cassandra now supports "atomic compare and swap" and HBase has support for "sometimes relaxed consistency". ROFL. That, as they say, is politics ... I mean... vendor driven open source...

By the way, in case you think I am a drama queen/conspiracy theorist and surely there is no quasi-political system attempting to defame eventual consistency, I did some searches for reference links to put in this article. As a result I have fallen into an anti-eventual consistency add targeting campaign...

In any case, a point I will constantly beat to death Nibiru aims to be plugable-everything. A lofty goal is to support all (possible) consistency models. Can it be done? I don't know, but it is worth trying I guess. I do not have all the answers. That being said, a system can not be "half-atomic" or "kinda-consistent", but per Column Family/Table we should be able to chose what consistency model fits what we want. For example, one store "pets" is an eventually consistency ColumnFamily store while another store "credit_cards" is configured as a strongly consistent key value store.

Cassandra's implementation of CAP is to provide per-request Consistency Level. The consistency level allows the user to say what aspects they desire. For example:

WRITE.ALL: The write is acknowledged (durable) by ALL Natural Endpoints  (all replicas of the data) before a result is returned to the client.
READ.ALL: The read request is sent to ALL Natural Endpoints the results are compared and the highest time stamp wins. (Other logic for timestamp tie)

WRITE.ONE: The request is sent to all Natural Endpoints, after one acknowledged (durable) write completes the result is returned, other writes still continue on the server
READ.ONE: The request is sent to one (or more) Natural Endpoints, the first result is returned to the client

There are other Consistency Levels including QUORUM, LOCAL_ONE, Serial (For compare and swap), etc, but save those for another day.

As our last blog showed a router can return one or more Destination(s)/Natural Endpoints for a given row key. Given the destinations and the consistency level we can make different strategies (code paths) for the desired results.

Lets take a case that is easy to explain, WRITE.ALL. A client sends a write operation for Column Family pets
{ rowkey: rover , column: type, value: poodle }

The node that receives this request is called the coordinator. The coordinator computes the Natural Endpoints (one of which could be local) where the data lives. For replication factor N, N futures will be created, each future will forward the request. Since the request is as CONSISTENCY ALL, we want to wait for all the futures to finish before a result is returned to the client. We also have an upper limit on how long we wish to wait for the futures before we consider a timeout.

This process looks something like this:

client ->  serverX --->|
server3 |
server4 <---|
server5 <---|
server6 <---|

client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 -------------------->|
server6 -------------------->|

I took the approach of separating the EventualCoordinator (the thing managing the futures) from the Merger (the logic merging the results). I did this because HighestTimestampWins does not make sense for a key value store which has no timestamp.

Lets do this

The coordinator does a few things. First the request could have a local component. The LocalAction class contains the logic to physically carry out the operation locally. The merger is a class that produces one result from multiple responses. Because N nodes will return N results, but only a single result is returned to the client. Again we defined the LocationAction and the Merge here because the eventualCoordinator is message/response agnostic.

  public Response handle(Message message) {
Token token = keyspace.getKeyspaceMetadata().getPartitioner().partition((String) message.getPayload().get("rowkey"));
List<Destination> destinations = keyspace.getKeyspaceMetadata().getRouter()
.routesTo(message, server.getServerId(), keyspace, server.getClusterMembership(), token);
long timeoutInMs = determineTimeout(columnFamily, message);

if (ColumnFamilyPersonality.COLUMN_FAMILY_PERSONALITY.equals(message.getRequestPersonality())) {
LocalAction action = new LocalColumnFamilyAction(message, keyspace, columnFamily);
ResultMerger merger = new HighestTimestampResultMerger();
return eventualCoordinator.handleMessage(token, message, destinations,
timeoutInMs, destinationLocal, action, merger);


So let's dive into the eventual coordinator. If there is only one destination and that destination is here. We do the action and return the results.

  public Response handleMessage(Token token, final Message message, List<Destination> destinations,
long timeoutInMs, Destination destinationLocal, final LocalAction action, ResultMerger merger) {
if (destinations.size() == 1 && destinations.contains(destinationLocal)) {
return action.handleReqest();

This next part prevents us from 1 request turning into 3 turning into 9 ...turning into doom. Basically we tag messages with "reroute" so we know if it is a client or server that sent this message.

    if (message.getPayload().containsKey("reroute")){
return action.handleReqest();
if (!message.getPayload().containsKey("reroute")){
message.getPayload().put("reroute", "");

I'm back Doc. I'm back from the future!

Java's ExecutorCompletionService is pretty nifty. It reminds me of what ThreadGroup does for threads. For each Destination the data is either local or remote so we have a callable to deal with both cases. LocalActionCallable wraps the LocalAction and the RemoteMessageCallable forwards the request over the network.

    ExecutorCompletionService<Response> completionService = new ExecutorCompletionService<>(executor);
final ArrayBlockingQueue<Response> results = new ArrayBlockingQueue<Response>(destinations.size());
ArrayBlockingQueue<Future<Response>> futures = new ArrayBlockingQueue<Future<Response>>(destinations.size());
for (final Destination destination : destinations) {
Future<Response> f = null;
if (destination.equals(destinationLocal)) {
f = completionService.submit(new LocalActionCallable(results, action));
} else {
f = completionService.submit(new RemoteMessageCallable(results, clientForDestination(destination), message));

After the futures have been submitted we want to wait for some amount of time for all to return. If we hit the deadline we fail, if we do not get all results before the deadline we fail, and if one future fails we fail.

    long start = System.currentTimeMillis();
long deadline = start + timeoutInMs;
List<Response> responses = new ArrayList<>();
if (c.getLevel() == ConsistencyLevel.ALL) {
while (start <= deadline) {
Response r = null;
try {
Future<Response> future = completionService.poll(deadline - start, TimeUnit.MILLISECONDS);
r = future.get();
if (r != null){
} catch (InterruptedException | ExecutionException e) {
if (r == null){
return new Response().withProperty("exception", "coordinator timeout");
if (r.containsKey("exception")){
return r;
if (responses.size() == destinations.size()){
//success condition

start = System.currentTimeMillis();
if (responses.size() == destinations.size()){
return merger.merge(responses, message);
} else {
return new Response().withProperty("exception", "coordinator timeout");

Note at level ALL two success and one fail is still a fail. Write operations do not have anything fancy to merge but reads do. Here is the highest timestamp wins merge logic. Every time I write the "find the highest" algorithm I am reminded of my first CS class in fortan. Thanks Professor D! (But we aint fortran-in no more)

private static ObjectMapper OM = new ObjectMapper();

public Response merge(List<Response> responses, Message message) {
if ("put".equals(message.getPayload().get("type"))
|| "delete".equals(message.getPayload().get("type"))) {
return new Response();
} else if ("get".equals(message.getPayload().get("type"))) {
return highestTimestampResponse(responses);

} else {
return new Response().withProperty("exception", "unsupported operation " + message);

private Response highestTimestampResponse(List<Response> responses){
long highestTimestamp = Long.MIN_VALUE;
int highestIndex = Integer.MIN_VALUE;
for (int i = 0; i < responses.size(); i++) {
Val v = OM.convertValue(responses.get(i).get("payload"), Val.class);
if (v.getTime() > highestTimestamp) {
highestTimestamp = v.getTime();
highestIndex = i;
return responses.get(highestIndex);



This code is taken from the unit test here. (I should be doing perma-links I am so lazy.) We should be able to test two things:

  1. If all nodes are up, operations at ALL should pass
  2. If at least one node is down, operations at ALL should fail
    ColumnFamilyClient cf = new ColumnFamilyClient(s[0].getConfiguration().getTransportHost(), s[0]
Session clAll = cf.createBuilder().withKeyspace("abc").withColumnFamily("def")
.withWriteConsistency(ConsistencyLevel.ALL, new HashMap())
.withReadConsistency(ConsistencyLevel.ALL, new HashMap()).build();
passIfAllAreUp(s, clAll);
failIfSomeAreDown(s, clAll);

public void passIfAllAreUp(Server [] s, Session sb) throws ClientException {
Response r = sb.put("a", "b", "c", 1);
int found = 0 ;
for (int i = 0; i < s.length; i++) {
ColumnFamilyPersonality c = (ColumnFamilyPersonality) s[i].getKeyspaces().get("abc").getColumnFamilies().get("def");
Val v = c.get("a", "b");
if (v != null){
//assert data is in three places
Assert.assertEquals("c", c.get("a", "b").getValue());
found ++;
Assert.assertEquals(3, found);
Assert.assertEquals("c", sb.get("a", "b").getValue());


public void failIfSomeAreDown(Server [] s, Session sb) throws ClientException {
for (int i = 2; i < s.length; i++) {
try {
sb.put("a", "b", "c", 1);

} catch (ClientException ex){
Assert.assertTrue(ex.getMessage().equals("coordinator timeout"));


Sweet! Dude! There you have it! Eventual Consistency, I mean Tunable Consistency, I mean Sometimes Relaxed Consistency :) Basically from here implementing other consistency levels is adding different case logic and pulling results from the ExecutorCompletionService. What is coming next? Hinted Handoff I think :) Stay tuned!


Post a Comment:
Comments are closed for this entry.