Edward Capriolo

Thursday Mar 12, 2015

Battling entropy with Hinted Handoff - Building a NoSQL store - Part 11

It has been nearly a month since my last NoSQL blog. :( Sorry about that. I actually spent some time working on my stream processing platform teknek. And yes in my free time I work on my own stream processing platform and my own nosql, and no, there is no support group I know of that can help me.

My last blog covered eventual consistency and CAP, so it makes sense to move entropy resolution. CAP is typically described as a tradeoff off between the three features Consistency, Availability, and Partition Tolerance. Eventually Consistent systems favor A. If you remember we demonstrated a case where we wrote data at different Consistency Levels and we observed what is returned to a client.  Let's describe a situation where entropy happens:

Imagine we are writing at Consistency Level ONE. The coordinator needs only wait for 1 ACK before returning success to the client:

client ->  serverX --->|
server3 |
server4 <---|
server5 <---|
server6 <---|

This is the normal case:

client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 -------------------->|
server6 -------------------->|

But imagine this happens:

client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 (brain fart) ?|
server6 -------------------->|

The system now has entropy, because we do not know if server5 died permanently, or if someone tripped over the Ethernet cable in the data center and is rushing to plug it back in. The write could have completed and the server died before the reply or not.

One way to combat entropy is to read from multiple places like reading from 2 nodes, a quorum of nodes, or ALL the nodes that have the data. Reads tend to be expensive and reading from multiple places multiplies the expense.


Another way to deal with failed write/delete operations is to store a Hint (commonly called Hinted Handoff). Remember the column with the highest timestamp wins, so if writes are delivered late or are of order the final result is the one with the highest timestamp. To implement Hints we need to track which writes fail or timeout and put them in a place where they can be redelivered later. You can almost think of Hints as a queue of delayed writes.

I have refactored the EventualCoordinator slightly from the last blog. The keys aspects are the same. We have an ExecutorCompletionService completionService that has futures handling writes to networked nodes.

    else if (c.getLevel() == ConsistencyLevel.N) {
      Response response = handleN(start, deadline, completionService, destinations, merger, message, c); //1
      if (hinter != null) {
        maybeSendHints(remote, remoteFutures, start, deadline, hinter); //2
      return response;

This part is going to bend your mind a bit. For replication THREE we would have THREE futures. Each one handling the write operation on a different server in the cluster. Lets assume the user has specified Consistency N as ONE. This means that the method handleN will return as soon as 1 future is complete. The other futures may also have returned or they may still be in the process. We should not consider them failed and create a hint yet, because we should give them up to the normal timeout window to complete. We do not want to block this coordinator thread anymore because the consistency level is met.

Since we do not want to block the client request anymore we need to handle the events still pending. We made an executor service 'lastChance' for this. RemoteMessageCallable is an abstraction I made that tracks the original message, which destination it was sent to, and provides a field so we can tell if the future logically finished all its work.

Again, we do not want maybeSendHints to block, because we have to return to the client.

private ExecutorService lastChance;

private void maybeSendHints(List<RemoteMessageCallable> remotes,
List<Future<Response>> remoteFutures, long start, long deadline, final Hinter hinter) {
for (int i = 0; i < remotes.size(); i++) {
final RemoteMessageCallable remote = remotes.get(i);
final Future<Response> future = remoteFutures.get(i);
//the future is done but the method must have throw exception we need a hint here
if (!remote.isComplete() && future.isDone()) {
hinter.hint(remote.getMessage(), remote.getDestination());

if (!remote.isComplete() && !future.isDone()){
final long remaining = deadline - System.currentTimeMillis();
//Not complete and not exception but time is up we need a hint here
if (remaining <= 0){
hinter.hint(remote.getMessage(), remote.getDestination());
} else {
//Not complete and not exception but not past the deadline either
//put request on lastChance executor so we do not block
lastChance.submit(new Callable<Void>() {
public Void call() throws Exception {
//here we block, than hint on failure :)
future.get(remaining, TimeUnit.MILLISECONDS);
if (!remote.isComplete()){
hinter.hint(remote.getMessage(), remote.getDestination());
return null;

Whew! When I write something like that I never know if I am the smartest guy in the world or if it is a fork bomb. I think it is l33t hacking... The next pieces of the puzzle are a little easier to grock.


The first thing we will do is automatically create a column family to store our hints.

private void addSystemKeyspace(){
    Keyspace system = new Keyspace(configuration);
    KeyspaceMetaData ksmd = new KeyspaceMetaData();
    ksmd.setPartitioner( new NaturalPartitioner());
    ksmd.setRouter(new LocalRouter());
      Map<String,Object> properties = new HashMap<>();
      properties.put("implementing_class", DefaultColumnFamily.class.getName());
      system.createColumnFamily("hints", properties);
    server.getKeyspaces().put("system", system);

We had writes that did not complete, and what we want to do is put the message on a queue to be delivered later. We use destination as a row key, make the column a uuid, and store the message as JSON text in the value.

public class Hinter {

private final AtomicLong hintsAdded = new AtomicLong();
private final ColumnFamilyPersonality hintsColumnFamily;
private ObjectMapper OM = new ObjectMapper();

public Hinter(ColumnFamilyPersonality person){
this.hintsColumnFamily = person;

public void hint(Message m, Destination destination) {
try {
hintsColumnFamily.put(destination.getDestinationId(), UUID.randomUUID().toString(),
OM.writeValueAsString(m), System.currentTimeMillis() * 1000L);
} catch (IOException e) {}


This allows us to ask the data store "give me all the hints for server X". (This can be modeled other ways) This shows you how simple, flexible, and eloquent the ColumnFamily model is.

Hint Replayer

I just happened to read a post that suggested that class names ending in 'er' are bad ideas. Well fuck that noise! jk. You may remember that we have built plugable cluster membership into Nibiru. ClusterMembership allows us know if the other nodes in the cluster are up or down. We will implement the HintReplayer as a thread that:

  1. scans the list of cluster members that are ALIVE.
  2. If the host is ALIVE we see if we have any Hints for that node.
  3. If there are hints deliver them and then delete the hint
public class HintReplayer extends AbstractPlugin implements Runnable {

ObjectMapper om = new ObjectMapper();

public static final String MY_NAME = "hint_replayer";
private volatile boolean goOn = true;
private ColumnFamilyPersonality hintCf;

public HintReplayer(Server server) {

public String getName() {
return MY_NAME;

public void init() {
hintCf = Coordinator.getHintCf(server);
runThread = new Thread(this);

public void run() {
while (goOn){
List<ClusterMember> live = server.getClusterMembership().getLiveMembers();//1
for (ClusterMember member : live){
SortedMap<AtomKey,AtomValue> results = hintCf.slice(member.getId(), "", "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"); //2
if (results.size() > 0){
for (Entry<AtomKey, AtomValue> i : results.entrySet()){
if (i.getValue() instanceof ColumnValue){
ColumnValue v = (ColumnValue) i.getValue();
ColumnKey c = (ColumnKey) i.getKey();
writeAndDelete(member, c, v);
try {
} catch (InterruptedException e) { }

private void writeAndDelete(ClusterMember m, ColumnKey c, ColumnValue v){
Destination d = new Destination();
Client client = this.clientForDestination(d);
try {
client.post(om.readValue(v.getValue(), Message.class));
hintCf.delete(m.getId(), c.getColumn(), System.currentTimeMillis() * 1000);
} catch ( IOException | RuntimeException e) {
try {
} catch (InterruptedException e1) {


We already have a test that demonstrated how the coordinator works.  Testing this code is basic. We shut down some servers and do a write. The node that receives the write should store hints for the down nodes, when those nodes come back alive the hints should be re-delivered.

    //insert when some nodes are down
failIfSomeAreDown(s, clAll);
for (int i=0;i<2;i++){
//start up all nodes
for (int i = 0; i < s.length; i++) {
s[i] = new Server(cs[i]);
//get the hint replayer from the first node.
HintReplayer h = (HintReplayer) s[0].getPlugins().get(HintReplayer.MY_NAME);
for (int tries = 0; tries < 10; tries++) {
//wait for the delivered counter to get to 3
long x = h.getHintsDelivered();
if (x == 3) {
Assert.assertEquals(3, h.getHintsDelivered());
//Now go to the servers and ensure the new value has been delivered
int found = 0;
for (int i = 0; i < s.length; i++) {
ColumnValue cv = (ColumnValue) s[i].get("abc", "def", "a", "b");
if (cv != null && cv.getValue().equals("d")){
Assert.assertEquals(3, found);


Hints help us ensure writes and deletes do not get lost during random outages. They make the system more self-healing, and in CAP they can help you have a 'little more C'. Even though hinted handoff does not provide any additional guarantees that the Consistency Level does not.

What should Nibiru do next? Hit me up on twitter and let me know what NoSQL feature I should try to build next!



Post a Comment:
Comments are closed for this entry.