Edward Capriolo

Thursday Apr 02, 2015

Elastic node scale up - Building a NoSQL store - Part 12

The last major feature I added to Nibiru was adding Hinted Handoff, an optimization used to re-deliver lost messages and reduce entropy from some natural endpoints missing writes. I sat a while trying to decide what to do next...

I thought to myself, "Nibiru has code to route request, and a CLI to read and write data, but is it web scale? After all anyone could just write a library that hashes data around!"

The answer was no and I decided it was time to change that. Lets make this biznotch auto-scale.


Every NoSQL has a different wrinkle on how it does things. The Dynamo style databases like Cassandra and Riak create a Token(by a hash) for the data and use that information to route requests. A previous blog demonstrated how Nibiru implements this type of Consistent Hashing using  Request Routing.

Thus far we have implemented an Eventually Consistent, ColumnFamily store in Nibiru which does not use shared storage. This means that growing the cluster involves physically moving data between nodes in a way that no data is unavailable during the transition period.

How can we do this? If a token is calculated from a user data with a hash you might expect that changing the number of nodes results in almost all the data having to be moved around, like when a hash map is resized. But luckily we can avoid moving the majority of data by using something like consistent hashing. You can easily find literature on this and even cool interactive presentations, but I will explain it my way, with ascii art and crude examples.

Things hash as I explained here:

10 MOD 7. Replication 3

But lets forget about MOD 7. Instead we hash something into a fixed space. The size of this hash space will never change. To keep it simple assume the space is 0 (inclusive)  to 100 (exclusive).

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100

If you have one node :

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1|

If you add a second node at position 50:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 | node2 node2 node2 node2 node2   |

Half of the data stays in place, and half needs to move to the new node. Lets add a third node at position 25

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node3 node3 node3 | node1 node1 node1 | node2 node2 node2 node2 node2  |

Node 1 'gave away' some of it's range, but node2 did not. The lesson is each time we add nodes we split the hash space, not rehash.

This is how we scale, each new node reduces the burden on the rest of the nodes.


Let's call the new node that is joining a PROTEGE. These are things to consider for a PROTEGE.

  1. DO NOT want to send reads to that node (because new data may still be moving to it)
  2. DO want it receive new writes/deletes/updates for the section of data it will be responsible for

The protege is taking data from another node. Let's call that node the SPONSOR. For a sponsor:

  1. DO want to write locally until the PROTEGE is fully joined because if the join fails we do not want to lose data
  2. DO want to send portions of locally data to protege

(Another way to implement this would be to track separately the JOINING node(s). On the write path the system would be writing to (Replication Factor ) + 1 nodes but only reading from (REPLICATION FACTOR) nodes)


First we build an internode client. The protege uses it to initiate the join request. Basically the request says, "Hey node [$Sponsor]! I notice you have a lot of data and I would like you to dived your data by  $[token] and I will take over one part of that"

  public void join(String keyspace, String sponsorHost, String wantedToken){
InternodeClient internodeClient = new InternodeClient(sponsorHost, configuration.getTransportPort());
internodeClient.join(keyspace, sponsorHost, serverId, wantedToken, configuration.getTransportHost());

That message gets transmitted to the sponsor and makes its way to a handler class. The way I implemented this we only handle a single protege at a time to keep the process sane. At a high level we need to:

  1. Atomically set the protege
  2. Start a thread that will
    1. replicate meta data (keyspaces and store definitions) to the new node
    2. replicate data (data inside keyspaces and store definitions) to the new node
    3. update the metadata so the cluster is aware of the new node
    4. remove the protege
   public Response handleSponsorRequest(final Message message){
final String requestId = (String) message.getPayload().get("request_id");
final String joinKeyspace = (String) message.getPayload().get("keyspace");
final String wantedToken = (String) message.getPayload().get("wanted_token");
final String protogeHost = (String) message.getPayload().get("transport_host");
final Destination protegeDestination = new Destination();
final MetaDataClient metaDataClient = getMetaClientForProtege(protegeDestination);

boolean res = protege.compareAndSet(null, protegeDestination);
if (!res){
return new Response().withProperty("status", "fail")
.withProperty("reason", "already sponsoring") ;

Thread t = new Thread(){
public void run(){
InternodeClient protegeClient = new InternodeClient(protogeHost, server.getConfiguration().getTransportPort());
Keyspace ks = server.getKeyspaces().get(joinKeyspace);
replicateData(protegeClient, ks);
updateTokenMap(ks, metaDataClient, wantedToken, requestId);
try {
Thread.sleep(10000); //wait here for propagations
} catch (InterruptedException e) { }
protege.compareAndSet( protegeDestination, null);

return new Response().withProperty("status", "ok");

So easy right? JK this process was a beast to write. It was only after I had it all done that I made it look purdy like this.

Lets dive into the piece that replicates the data: We need to move data from the sponsor to the protege. To do this we:

  1. Flush memtables to sstables (memtables and sstable are described here)
  2. For each Store in keyspace
    1. For each SStable in store
      1. Copy table

  private void replicateData(InternodeClient protegeClient, Keyspace ks){
for (Entry<String, Store> storeEntry : ks.getStores().entrySet()){
if (storeEntry.getValue() instanceof DefaultColumnFamily){
DefaultColumnFamily d = (DefaultColumnFamily) storeEntry.getValue();
String bulkUid = UUID.randomUUID().toString();
for (SsTable table : d.getSstable()){
protegeClient.createSsTable(ks.getKeyspaceMetaData().getName(), d.getStoreMetadata().getName(), bulkUid);
try {
SsTableStreamReader stream = table.getStreamReader();
Token token = null;
while ((token = stream.getNextToken()) != null){
SortedMap<AtomKey,AtomValue> columns = stream.readColumns();
protegeClient.transmit(ks.getKeyspaceMetaData().getName(), storeEntry.getKey(), token, columns, bulkUid);
} catch (IOException e) {
throw new RuntimeException (e);
protegeClient.closeSsTable(ks.getKeyspaceMetaData().getName(), d.getStoreMetadata().getName(), bulkUid);

The protegeClient is a low level way to transmit SsTables. It is fairly interesting why we need this. In a structure-log-merge system with write once tables deletes are actually a special write called a tombstone. Tombstones mask other columns. No user facing API like (get, or slice) can return a tombstone, so we needed a lower API to get the SsTable files.

We could have gone even lower and simply moved bytes (save that for another day) but this interface also made an attractive bulk load API. WIN! WIN!

The next piece is called after the join is complete. Once we have moved all the data to this new node, we add this node into the token map and send that update around. (*Note assuming one node added at once and no racing meta-data changes. )

  private void updateTokenMap(Keyspace ks, MetaDataClient metaDataClient, String wantedToken, String requestId){
ObjectMapper om = new ObjectMapper();
TreeMap<String,String> t = om.convertValue(ks.getKeyspaceMetaData().getProperties().get(TokenRouter.TOKEN_MAP_KEY),
t.put(wantedToken, requestId);
Map<String,Object> send = ks.getKeyspaceMetaData().getProperties();
send.put(TokenRouter.TOKEN_MAP_KEY, t);
try {
metaDataClient.createOrUpdateKeyspace(ks.getKeyspaceMetaData().getName(), send, true);
} catch (ClientException e) {
throw new RuntimeException(e);

Great! so this covers getting the already existing data to the new node. But what about the new mutations (writes and deletes) that happen while the join is going on? This is handled by a small addition in our normal write path. If we have a protege and the write is going to us we also send it to the protege!

    if (sponsorCoordinator.getProtege() != null && destinations.contains(destinationLocal)){
String type = (String) message.getPayload().get("type");
if (type.equals("put") || type.equals("delete") ){

This all seems bad ass are you sure this actually works?

I am sure there are some kinks to work out :), but yes it can be demonstrated in testing.

What we did here is:

  1. start a single node
  2. Insert 10 rows
  3. join a second node
  4. insert some more rows
  5. check that data is being divided across nodes
ColumnFamilyClient c = new ColumnFamilyClient(servers[0].getConfiguration().getTransportHost(), servers[0]
Session session = c.createBuilder().withKeyspace("abc")
.withWriteConsistency(ConsistencyLevel.ALL, new HashMap())
.withReadConsistency(ConsistencyLevel.ALL, new HashMap())
for (int k = 0; k < 10; k++) {
session.put(k+"", k+"", k+"", 1);

Assert.assertEquals(servers[0].getClusterMembership().getLiveMembers().size(), 1);

servers[1].join("abc", "", "5");



private void insertDataOverClient(Session session) throws ClientException {
session.put("1", "1", "after", 8);
session.put("7", "7", "after", 8);
session.put("11", "11", "after", 8);

private void assertDataIsDistributed(Server [] servers){
Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "11" , "11")).getValue());
Assert.assertEquals("after", ((ColumnValue) servers[0].get("abc", "def", "1" , "1")).getValue());

Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "11" , "11")).getValue());
Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "1" , "1")).getValue());


Whew!  There you go! Web scale! Elastic! Auto Resharding NoSQL action!


Post a Comment:
Comments are closed for this entry.