Edward Capriolo

Sunday Jul 12, 2015

Why Hive on Cloudera is like Python on Redhat

I used to be fairly anti-cloudera. I was never really convinced you needed someone to package up hadoop for you and your admins should just learn it. These days Hadoop is N degrees harder and I don't really have as much give-a-crap for learning to configure all the nobs that change names all the time. Thus I am more or less happy to let cloudera handle installing the 9000 hadoop components.

But really cloudera's testing is not that great. In my last version of cdh, decomissioning NodeManagers causes yarn to stop accepting jobs. ::Major fail:: Upgrade and in the new version the version hive can not support custom hive serde's because of an upstream Hive bug.

Filed this to CDH user:


Got the ::cricket:: response

Thinking of getting away from CDH hive at this point. Why?

  1. Waited a long time for this so I could easily build in tez support
  2. Still no out of the box tez support even though its clearly the way forward (and would make everything umpteeth times faster)
  3. Does not really look like cloudera can/wants to keep up with Hive's release cycle
  4. Sabotaging features by adding check boxes and disabling things that work out of the box "Check the box for Enable Hive on Spark (Unsupported)."
  5. Constant complaints in manager that you should have a metastore server or should have zookeeper when truth is most users wont need either. (and I sure do not need this)
  6. N day wait to cofirm bugs, "Whenever we get to it" fixes
  7. 1 zillion unneeded jars in classpath , hbase etc that Im not actually using with hive.

Im tired of dealing with backreved revsions and cloudera's "Why aren't you just using impala" type stance.

I am going back to rolling my own. I will still use cdh to manager hdfs proper and YAWN, but this hive situation is unmanagable. Hive on cloudera is like Python on Redhat 5. You are painted into an annoying box and you have no direct way to make it better other than ignoring it entirely and rolling your own!

Friday Apr 17, 2015

Triggers and Coprocessors - Building a NoSQL store - Part 14

Hello again! The last blog in this series was about cleanup compaction. While cleanup and compaction is interesting I do not think it has that web scale 'pop'. Definitely not sexy enough for Nibiru, the worlds first Internet of Things NoSql. I decided to treat myself and do something fun, so I decided now would be a good time to build triggers/coprocessor support.

We might as well start by defining some terminology. Many databases have trigger support, typically a trigger is a type of insert or update query that happens inside the RDBMS as a result of another insert or update operation. I first saw the term co-processor used in Google's BigTable white paper. Hbase is an open source implementation based on the BigTable spec has different types of coprocessors.

HBase has a region server that serves the region (shard), and replication is provided by the file system. In the Cassandra/Dynamo style a row key has multiple natural endpoints, no replicated file system, and the system needs to actively execute the operation across N replicas as we showed here.

Triggers/CoProcessors were batted around with Cassandra for a while. The implementation can be debated, for example should the trigger run be closer to the storage layer or closer to the coordinator level? Unlike Hbase where we can be sure one region server is "in charge" of a key, we would need a distributed locking mechanism to be "in charge" of a key in Cassandra and distributed locking is "heavy". Another potential implementation would be leveraging idempotent and retry-able operations like writes and deletes with timestamps. There are probably other ways to go about triggers as well.

Pick your poison

I decided to take the approach of coordinator triggers. In a previous blog we showed the coordinator is the piece that receives the request from the client and dispatches it to multiple servers. The good parts of this implementation are we can easily hook into the code before the result is returned to the client. The downside is that the trigger could timeout after the initial user operation (and it can not be easily unrolled if we wanted to try that). Maybe in a later blog we can build triggers closer to the storage layer.

The next code samples are all part of this commit.

Let's create an emum to describe the trigger levels.

public enum TriggerLevel {
/** Request will block while trigger is executing, trigger can timeout, **/
/** Request will not block while trigger is executing.
* Triggers operations may be dropped if back pressure**/
/** Request will not block while trigger is executing.
* Trigger operations retry, potentially later */

public class TriggerDefinition {
private TriggerLevel triggerLevel;
private String triggerClass;

Next, the user needs an interface to plug the trigger logic into. We give the user access the message, the response, and the server. In most cases we have avoided passing the Server to make interfaces very discrete, but here we are going for flexible.

public interface CoordinatorTrigger {
void exec(Message message, Response response, Server server);

Next, we can build a component to execute triggers. We are only going to build the blocking case for now, but implementing the non blocking case will not be a hard since we are using a callable.

package io.teknek.nibiru.trigger;

public class TriggerManager {

private ExecutorService executor;
private final Server server;

public TriggerManager(Server server){
this.server = server;

public Response executeTriggers(final Message message, final Response response, Keyspace keyspace,
Store store, long timeoutInMs, long requestStart){
long now = System.currentTimeMillis();
for (TriggerDefinition d : store.getStoreMetadata().getCoordinatorTriggers()){
if (d.getTriggerLevel() == TriggerLevel.BLOCKING){
long remaining = (requestStart + timeoutInMs) - now;
if (remaining > 0){
final CoordinatorTrigger ct = getReusableTrigger(d);
Callable<Boolean> c = new Callable<Boolean>(){
public Boolean call() throws Exception {
ct.exec(message, response, server);
return Boolean.TRUE;

Future<Boolean> f = null;
try {
f = executor.submit(c);
Boolean b = f.get(remaining, TimeUnit.MILLISECONDS);
if (!b.equals(Boolean.TRUE)){
return new Response().withProperty("exception", "trigger returned false");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return new Response().withProperty("exception", "trigger exception " + e.getMessage());
return response;


The last piece is to call this code from the coordinator, only after a successful request.

    if (ColumnFamilyPersonality.PERSONALITY.equals(message.getPersonality())) {
LocalAction action = new LocalColumnFamilyAction(message, keyspace, columnFamily);
ResultMerger merger = new HighestTimestampResultMerger();
Response response = eventualCoordinator.handleMessage(token, message, destinations,
timeoutInMs, destinationLocal, action, merger, getHinterForMessage(message, columnFamily));
if (!response.containsKey("exception")){
response = triggerManager.executeTriggers(message, response, keyspace, columnFamily, timeoutInMs, requestStart);

return response;

Lets get testing

A typical use case for triggers is building a reverse index during an insert. For each insert to a column family named Pets we will check to see if the column name is "age". If the column name matches we make another insert into another column family that organized the data by age.

  public static class PetAgeReverseTrigger implements CoordinatorTrigger {
public void exec(Message message, Response response, Server server) {
String column = (String) message.getPayload().get("column");
String value = (String) message.getPayload().get("value");
String rowkey = (String) message.getPayload().get("rowkey");
if ("age".equalsIgnoreCase(column)){
Message m = new Message();
m.setPayload( new Response().withProperty("type", "put")
.withProperty("rowkey", value)
.withProperty("column", rowkey)
.withProperty("value", "")
.withProperty("time", System.currentTimeMillis())

To test:

  1. create a column family for the reverse index
  2. Add the trigger to the pets column family
  3. Insert some entries with age columns
  4. verify the reverse index is now populated
  public void reverseIndexTrigger() throws ClientException{

MetaDataClient meta = new MetaDataClient(server.getConfiguration().getTransportHost(),
new Response().withProperty(StoreMetaData.IMPLEMENTING_CLASS,
DefaultColumnFamily.class.getName())); //1

TriggerDefinition td = new TriggerDefinition();
List<TriggerDefinition> defs = server.getKeyspaces().get(TestUtil.DATA_KEYSPACE).getStores()
defs.add(td); //2

ColumnFamilyClient client = new ColumnFamilyClient( new Client(server.getConfiguration().getTransportHost(),

Session s = client.createBuilder().withKeyspace(TestUtil.DATA_KEYSPACE).withStore(TestUtil.PETS_COLUMN_FAMILY).build();
s.put("rover", "age", "5", 1L);
s.put("sandy", "age", "3", 1L);
s.put("spot", "age", "5", 1L); //3

Session s1 = client.createBuilder().withKeyspace(TestUtil.DATA_KEYSPACE).withStore(PET_AGE_CF).build();
SortedMap<String,Val> res = s1.slice("5", "a", "zzzzzzzzzzzzzzzzz");
Assert.assertEquals(2, res.size());
Assert.assertEquals("rover", res.firstKey());
Assert.assertEquals("spot", res.lastKey()); //4



Triggers handle tasks without building logic into the client application. They can also optimize processes that would potentially involve multiple client server exchanges.

Monday Apr 13, 2015

Great NoSQL-isms of the 34th and a half century

So someone just sent me this page on elastic search.

One of the best NoSql-isms is when someone tells you about some elaborate feature, next they tell you NOT to use it. EVER!

Here is an example:

The second workaround is to add ?search_type=dfs_query_then_fetch to your search requests. The dfs stands for Distributed Frequency Search, and it tells Elasticsearch to first retrieve the local IDF from each shard in order to calculate the global IDF across the whole index

Sounds great! Until you read the next advice:

Don’t use dfs_query_then_fetch in production. It really isn’t required. Just having enough data will ensure that your term frequencies are well distributed. There is no reason to add this extra DFS step to every query that you run.



Wednesday Apr 08, 2015

Cleanup compaction - Building a NoSQL store - Part 13

In our last blog we showed how we can have nodes dynamically join our cluster to achieve a web scale, Internet Of Things, (Internet Of Things is now a buzzword I say once an hour) NoSql database. That blog had an incredible info graphic that demonstrated what happens when a node joins our cluster. Here it is again in techNoSQLcolor:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1|

If you add a second node at position 50:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 | node2 node2 node2 node2 node2   |

Now, remember our data files are write once so we can't change them after the fact. After the split, requests that get sent to node1 are cut in half, but the data files on node1 contain more data than they need to. What we need is a way to remove all the data that is on a node that is no longer needed. 

Cassandra has a command for this called 'cleanup' that needs to be run on each node. The theory, in the olden days, a node join could go bad in some way and the system could be "recovered" by manually adjusting the tokens on each nodes and doing various repair process. In practice not many people (including myself) know exactly what to do when node joins go wrong, adjust tokens, move files, run repairs? The system SHOULD be able to automatically remove the old data, but no one has gotten to this yet as far as I can tell.

To handle cleanup we need two things:

  1. A command that can iterate the data files (SsTables) and remove data that no longer belongs on the node.
  2. A variable that can control allow normal compaction processes to cleanup data automatically.

You may want to look back at our previous blog on compaction to get an idea of how we merge SsTables.

Lets get to it

We are going to enhance the compaction process to handle this special case. First, we have a boolean that controls cleanup. If the token does not belong on this node we do not write it during compaction.

      if (cleanOutOfRange){
if (coordinator.destinationsForToken(lowestToken, keyspace).contains(coordinator.getDestinationLocal())){
newSsTable.write(lowestToken, allColumns);
} else {
newSsTable.write(lowestToken, allColumns);

Cleanup is simple, unlike normal compaction we do not have to merge multiple tables together (we could however). One table in makes one table out.

  public void cleanupCompaction(Keyspace keyspace, DefaultColumnFamily defaultColumnFamily){
Set<SsTable> tables = new TreeSet<>(defaultColumnFamily.getSstable());//duplicate because we will mutate the collection
for (SsTable table : tables){
String newName = getNewSsTableName();
try {
SsTable [] ssArray = {table};
SsTable s = compact(ssArray, newName, server.getServerId(), server.getCoordinator(), true, keyspace);
//todo delete old
} catch (IOException e) {
throw new RuntimeException(e);

Testing time

For our coordinator, we made more of an integration test by launching a second server and joining it to the first. I typically like to be about 20% integration tests, 80% unit tests, and 0% mock tests. Why do I take this approach?

First, I believe mock tests are cheating. That is not say that mocking does not have it's uses, but I feel it is used to cover up code smells. If you have a good design and good API, not much mocking should not be needed. Integration tests are good at proving the entire process can run end-to-end, but they are long and redundant.

Unit tests do two some imporant for me: they test things, and work like tripwire for bad code and bad assumptions, they document things. They document things because they show what components should do. They tell a story.

The story for cleanup is simple: A system has some data on disk. After topology changes (like node join or leave or change of replication factor) some of that data is no longer required to be on a given node and can be removed.

I wrote this test by hiding code in methods with friendly names that say what they are doing. It is a little cute I know but why not? We insert 10 rows directly to the server to start things off. When the test is done only 1 of the 10 rows should still be on disk.

public void cleanupTest() throws IOException, InterruptedException, ClientException {
for (int i = 0; i < 9; i++) {
server.put(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, i+"", "age", "4", 1);

forceFlushAndConfirmFilesOnDisk(server); //flush memtables to disk
changeTheRouter(server); // change the routing information to simulate topology changes
assertSomeDatum(server); // assert data is on disk
runCleanup(server); // run cleanup
assertDatumAfterCompaction(server); //assert some data has been removed

Rather than writing a long involved integration test to move data off the node, we implement a router that routes token "1" locally and routes everything else nowhere! This way when we Cleanup the data everything else should go. (No need for mocking libraries, just good old Object Oriented Design)

  public static class OnlyTheBestRouter implements Router {
public List<Destination> routesTo(ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
if (token.getRowkey().equals("1")){
Destination d = new Destination();
return Arrays.asList(d);
return Arrays.asList();

 This method installs the router in place of the default router which writes everything locally.

  private void changeTheRouter(Server s) throws ClientException{
MetaDataClient metaDataClient = new MetaDataClient(s.getConfiguration().getTransportHost(), s
new Response().withProperty(KeyspaceMetaData.ROUTER_CLASS, OnlyTheBestRouter.class.getName()), true);

This method runs the cleanup.

  private void runCleanup(Server s){
CompactionManager cm = ((CompactionManager) s.getPlugins().get(CompactionManager.MY_NAME));
cm.cleanupCompaction(s.getKeyspaces().get(TestUtil.DATA_KEYSPACE), (DefaultColumnFamily)

After the cleanup only columns for the row key/token "1" should be present and all others should be missing.

  private void assertDatumAfterCompaction(Server s){
Assert.assertEquals(null, s.get(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, "3", "age")); //gone!
String res = ((ColumnValue) s.get(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, "1", "age")).getValue(); //still there!
Assert.assertEquals("4", res);

Wrap up

There you have it, cleanup, the name say it all. Sexy and automatic!

Thursday Apr 02, 2015

Elastic node scale up - Building a NoSQL store - Part 12

The last major feature I added to Nibiru was adding Hinted Handoff, an optimization used to re-deliver lost messages and reduce entropy from some natural endpoints missing writes. I sat a while trying to decide what to do next...

I thought to myself, "Nibiru has code to route request, and a CLI to read and write data, but is it web scale? After all anyone could just write a library that hashes data around!"

The answer was no and I decided it was time to change that. Lets make this biznotch auto-scale.


Every NoSQL has a different wrinkle on how it does things. The Dynamo style databases like Cassandra and Riak create a Token(by a hash) for the data and use that information to route requests. A previous blog demonstrated how Nibiru implements this type of Consistent Hashing using  Request Routing.

Thus far we have implemented an Eventually Consistent, ColumnFamily store in Nibiru which does not use shared storage. This means that growing the cluster involves physically moving data between nodes in a way that no data is unavailable during the transition period.

How can we do this? If a token is calculated from a user data with a hash you might expect that changing the number of nodes results in almost all the data having to be moved around, like when a hash map is resized. But luckily we can avoid moving the majority of data by using something like consistent hashing. You can easily find literature on this and even cool interactive presentations, but I will explain it my way, with ascii art and crude examples.

Things hash as I explained here:

10 MOD 7. Replication 3

But lets forget about MOD 7. Instead we hash something into a fixed space. The size of this hash space will never change. To keep it simple assume the space is 0 (inclusive)  to 100 (exclusive).

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100

If you have one node :

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1|

If you add a second node at position 50:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 | node2 node2 node2 node2 node2   |

Half of the data stays in place, and half needs to move to the new node. Lets add a third node at position 25

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node3 node3 node3 | node1 node1 node1 | node2 node2 node2 node2 node2  |

Node 1 'gave away' some of it's range, but node2 did not. The lesson is each time we add nodes we split the hash space, not rehash.

This is how we scale, each new node reduces the burden on the rest of the nodes.


Let's call the new node that is joining a PROTEGE. These are things to consider for a PROTEGE.

  1. DO NOT want to send reads to that node (because new data may still be moving to it)
  2. DO want it receive new writes/deletes/updates for the section of data it will be responsible for

The protege is taking data from another node. Let's call that node the SPONSOR. For a sponsor:

  1. DO want to write locally until the PROTEGE is fully joined because if the join fails we do not want to lose data
  2. DO want to send portions of locally data to protege

(Another way to implement this would be to track separately the JOINING node(s). On the write path the system would be writing to (Replication Factor ) + 1 nodes but only reading from (REPLICATION FACTOR) nodes)


First we build an internode client. The protege uses it to initiate the join request. Basically the request says, "Hey node [$Sponsor]! I notice you have a lot of data and I would like you to dived your data by  $[token] and I will take over one part of that"

  public void join(String keyspace, String sponsorHost, String wantedToken){
InternodeClient internodeClient = new InternodeClient(sponsorHost, configuration.getTransportPort());
internodeClient.join(keyspace, sponsorHost, serverId, wantedToken, configuration.getTransportHost());

That message gets transmitted to the sponsor and makes its way to a handler class. The way I implemented this we only handle a single protege at a time to keep the process sane. At a high level we need to:

  1. Atomically set the protege
  2. Start a thread that will
    1. replicate meta data (keyspaces and store definitions) to the new node
    2. replicate data (data inside keyspaces and store definitions) to the new node
    3. update the metadata so the cluster is aware of the new node
    4. remove the protege
   public Response handleSponsorRequest(final Message message){
final String requestId = (String) message.getPayload().get("request_id");
final String joinKeyspace = (String) message.getPayload().get("keyspace");
final String wantedToken = (String) message.getPayload().get("wanted_token");
final String protogeHost = (String) message.getPayload().get("transport_host");
final Destination protegeDestination = new Destination();
final MetaDataClient metaDataClient = getMetaClientForProtege(protegeDestination);

boolean res = protege.compareAndSet(null, protegeDestination);
if (!res){
return new Response().withProperty("status", "fail")
.withProperty("reason", "already sponsoring") ;

Thread t = new Thread(){
public void run(){
InternodeClient protegeClient = new InternodeClient(protogeHost, server.getConfiguration().getTransportPort());
Keyspace ks = server.getKeyspaces().get(joinKeyspace);
replicateData(protegeClient, ks);
updateTokenMap(ks, metaDataClient, wantedToken, requestId);
try {
Thread.sleep(10000); //wait here for propagations
} catch (InterruptedException e) { }
protege.compareAndSet( protegeDestination, null);

return new Response().withProperty("status", "ok");

So easy right? JK this process was a beast to write. It was only after I had it all done that I made it look purdy like this.

Lets dive into the piece that replicates the data: We need to move data from the sponsor to the protege. To do this we:

  1. Flush memtables to sstables (memtables and sstable are described here)
  2. For each Store in keyspace
    1. For each SStable in store
      1. Copy table

  private void replicateData(InternodeClient protegeClient, Keyspace ks){
for (Entry<String, Store> storeEntry : ks.getStores().entrySet()){
if (storeEntry.getValue() instanceof DefaultColumnFamily){
DefaultColumnFamily d = (DefaultColumnFamily) storeEntry.getValue();
String bulkUid = UUID.randomUUID().toString();
for (SsTable table : d.getSstable()){
protegeClient.createSsTable(ks.getKeyspaceMetaData().getName(), d.getStoreMetadata().getName(), bulkUid);
try {
SsTableStreamReader stream = table.getStreamReader();
Token token = null;
while ((token = stream.getNextToken()) != null){
SortedMap<AtomKey,AtomValue> columns = stream.readColumns();
protegeClient.transmit(ks.getKeyspaceMetaData().getName(), storeEntry.getKey(), token, columns, bulkUid);
} catch (IOException e) {
throw new RuntimeException (e);
protegeClient.closeSsTable(ks.getKeyspaceMetaData().getName(), d.getStoreMetadata().getName(), bulkUid);

The protegeClient is a low level way to transmit SsTables. It is fairly interesting why we need this. In a structure-log-merge system with write once tables deletes are actually a special write called a tombstone. Tombstones mask other columns. No user facing API like (get, or slice) can return a tombstone, so we needed a lower API to get the SsTable files.

We could have gone even lower and simply moved bytes (save that for another day) but this interface also made an attractive bulk load API. WIN! WIN!

The next piece is called after the join is complete. Once we have moved all the data to this new node, we add this node into the token map and send that update around. (*Note assuming one node added at once and no racing meta-data changes. )

  private void updateTokenMap(Keyspace ks, MetaDataClient metaDataClient, String wantedToken, String requestId){
ObjectMapper om = new ObjectMapper();
TreeMap<String,String> t = om.convertValue(ks.getKeyspaceMetaData().getProperties().get(TokenRouter.TOKEN_MAP_KEY),
t.put(wantedToken, requestId);
Map<String,Object> send = ks.getKeyspaceMetaData().getProperties();
send.put(TokenRouter.TOKEN_MAP_KEY, t);
try {
metaDataClient.createOrUpdateKeyspace(ks.getKeyspaceMetaData().getName(), send, true);
} catch (ClientException e) {
throw new RuntimeException(e);

Great! so this covers getting the already existing data to the new node. But what about the new mutations (writes and deletes) that happen while the join is going on? This is handled by a small addition in our normal write path. If we have a protege and the write is going to us we also send it to the protege!

    if (sponsorCoordinator.getProtege() != null && destinations.contains(destinationLocal)){
String type = (String) message.getPayload().get("type");
if (type.equals("put") || type.equals("delete") ){

This all seems bad ass are you sure this actually works?

I am sure there are some kinks to work out :), but yes it can be demonstrated in testing.

What we did here is:

  1. start a single node
  2. Insert 10 rows
  3. join a second node
  4. insert some more rows
  5. check that data is being divided across nodes
ColumnFamilyClient c = new ColumnFamilyClient(servers[0].getConfiguration().getTransportHost(), servers[0]
Session session = c.createBuilder().withKeyspace("abc")
.withWriteConsistency(ConsistencyLevel.ALL, new HashMap())
.withReadConsistency(ConsistencyLevel.ALL, new HashMap())
for (int k = 0; k < 10; k++) {
session.put(k+"", k+"", k+"", 1);

Assert.assertEquals(servers[0].getClusterMembership().getLiveMembers().size(), 1);

servers[1].join("abc", "", "5");



private void insertDataOverClient(Session session) throws ClientException {
session.put("1", "1", "after", 8);
session.put("7", "7", "after", 8);
session.put("11", "11", "after", 8);

private void assertDataIsDistributed(Server [] servers){
Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "11" , "11")).getValue());
Assert.assertEquals("after", ((ColumnValue) servers[0].get("abc", "def", "1" , "1")).getValue());

Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "11" , "11")).getValue());
Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "1" , "1")).getValue());


Whew!  There you go! Web scale! Elastic! Auto Resharding NoSQL action!

Tuesday Mar 17, 2015

Nibiru has a CLI!

Ow yea!

connect connect 7070
createcolumnfamily data pets
describekeyspace data
use data pets
set jack type bunny
get jack type
Val [value=bunny, time=1426649553036000, createTime=1426649553120, ttl=0]

Ow yea!

Saturday Mar 14, 2015

On my fling with the ASF and Cassandra

Nothing good lasts forever your know. At one point I may have been the #1 Apache Cassandra fanboy, buy like many ASF projects they basically reach the point where they stop being a meritocracy and start becoming a bureaucracy. 

A while back I tried to implement this idea in Cassandra that was not even a super complicated one. Do server side code. Like many times in Apache Software open source even if you are 100% willing to do ALL the work and TESTING to see something get done, a crew of people manages to block your ideas. You can read the ticket and read the responses form your own opinion.

Well as usual it takes people a while to come around to an idea. For the ASF this concept is basically once they think it is an idea them came up with...They love it.

Well there you go. Here is another classic Cassandra-isms for you:

In memory column families...https://issues.apache.org/jira/browse/CASSANDRA-1657

Closed: Later


Great idea! So good we will make everyone pay for it as a secret sauce closed source feature!

Thursday Mar 12, 2015

Battling entropy with Hinted Handoff - Building a NoSQL store - Part 11

It has been nearly a month since my last NoSQL blog. :( Sorry about that. I actually spent some time working on my stream processing platform teknek. And yes in my free time I work on my own stream processing platform and my own nosql, and no, there is no support group I know of that can help me.

My last blog covered eventual consistency and CAP, so it makes sense to move entropy resolution. CAP is typically described as a tradeoff off between the three features Consistency, Availability, and Partition Tolerance. Eventually Consistent systems favor A. If you remember we demonstrated a case where we wrote data at different Consistency Levels and we observed what is returned to a client.  Let's describe a situation where entropy happens:

Imagine we are writing at Consistency Level ONE. The coordinator needs only wait for 1 ACK before returning success to the client:

client ->  serverX --->|
server3 |
server4 <---|
server5 <---|
server6 <---|

This is the normal case:

client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 -------------------->|
server6 -------------------->|

But imagine this happens:

client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 (brain fart) ?|
server6 -------------------->|

The system now has entropy, because we do not know if server5 died permanently, or if someone tripped over the Ethernet cable in the data center and is rushing to plug it back in. The write could have completed and the server died before the reply or not.

One way to combat entropy is to read from multiple places like reading from 2 nodes, a quorum of nodes, or ALL the nodes that have the data. Reads tend to be expensive and reading from multiple places multiplies the expense.


Another way to deal with failed write/delete operations is to store a Hint (commonly called Hinted Handoff). Remember the column with the highest timestamp wins, so if writes are delivered late or are of order the final result is the one with the highest timestamp. To implement Hints we need to track which writes fail or timeout and put them in a place where they can be redelivered later. You can almost think of Hints as a queue of delayed writes.

I have refactored the EventualCoordinator slightly from the last blog. The keys aspects are the same. We have an ExecutorCompletionService completionService that has futures handling writes to networked nodes.

    else if (c.getLevel() == ConsistencyLevel.N) {
      Response response = handleN(start, deadline, completionService, destinations, merger, message, c); //1
      if (hinter != null) {
        maybeSendHints(remote, remoteFutures, start, deadline, hinter); //2
      return response;

This part is going to bend your mind a bit. For replication THREE we would have THREE futures. Each one handling the write operation on a different server in the cluster. Lets assume the user has specified Consistency N as ONE. This means that the method handleN will return as soon as 1 future is complete. The other futures may also have returned or they may still be in the process. We should not consider them failed and create a hint yet, because we should give them up to the normal timeout window to complete. We do not want to block this coordinator thread anymore because the consistency level is met.

Since we do not want to block the client request anymore we need to handle the events still pending. We made an executor service 'lastChance' for this. RemoteMessageCallable is an abstraction I made that tracks the original message, which destination it was sent to, and provides a field so we can tell if the future logically finished all its work.

Again, we do not want maybeSendHints to block, because we have to return to the client.

private ExecutorService lastChance;

private void maybeSendHints(List<RemoteMessageCallable> remotes,
List<Future<Response>> remoteFutures, long start, long deadline, final Hinter hinter) {
for (int i = 0; i < remotes.size(); i++) {
final RemoteMessageCallable remote = remotes.get(i);
final Future<Response> future = remoteFutures.get(i);
//the future is done but the method must have throw exception we need a hint here
if (!remote.isComplete() && future.isDone()) {
hinter.hint(remote.getMessage(), remote.getDestination());

if (!remote.isComplete() && !future.isDone()){
final long remaining = deadline - System.currentTimeMillis();
//Not complete and not exception but time is up we need a hint here
if (remaining <= 0){
hinter.hint(remote.getMessage(), remote.getDestination());
} else {
//Not complete and not exception but not past the deadline either
//put request on lastChance executor so we do not block
lastChance.submit(new Callable<Void>() {
public Void call() throws Exception {
//here we block, than hint on failure :)
future.get(remaining, TimeUnit.MILLISECONDS);
if (!remote.isComplete()){
hinter.hint(remote.getMessage(), remote.getDestination());
return null;

Whew! When I write something like that I never know if I am the smartest guy in the world or if it is a fork bomb. I think it is l33t hacking... The next pieces of the puzzle are a little easier to grock.


The first thing we will do is automatically create a column family to store our hints.

private void addSystemKeyspace(){
    Keyspace system = new Keyspace(configuration);
    KeyspaceMetaData ksmd = new KeyspaceMetaData();
    ksmd.setPartitioner( new NaturalPartitioner());
    ksmd.setRouter(new LocalRouter());
      Map<String,Object> properties = new HashMap<>();
      properties.put("implementing_class", DefaultColumnFamily.class.getName());
      system.createColumnFamily("hints", properties);
    server.getKeyspaces().put("system", system);

We had writes that did not complete, and what we want to do is put the message on a queue to be delivered later. We use destination as a row key, make the column a uuid, and store the message as JSON text in the value.

public class Hinter {

private final AtomicLong hintsAdded = new AtomicLong();
private final ColumnFamilyPersonality hintsColumnFamily;
private ObjectMapper OM = new ObjectMapper();

public Hinter(ColumnFamilyPersonality person){
this.hintsColumnFamily = person;

public void hint(Message m, Destination destination) {
try {
hintsColumnFamily.put(destination.getDestinationId(), UUID.randomUUID().toString(),
OM.writeValueAsString(m), System.currentTimeMillis() * 1000L);
} catch (IOException e) {}


This allows us to ask the data store "give me all the hints for server X". (This can be modeled other ways) This shows you how simple, flexible, and eloquent the ColumnFamily model is.

Hint Replayer

I just happened to read a post that suggested that class names ending in 'er' are bad ideas. Well fuck that noise! jk. You may remember that we have built plugable cluster membership into Nibiru. ClusterMembership allows us know if the other nodes in the cluster are up or down. We will implement the HintReplayer as a thread that:

  1. scans the list of cluster members that are ALIVE.
  2. If the host is ALIVE we see if we have any Hints for that node.
  3. If there are hints deliver them and then delete the hint
public class HintReplayer extends AbstractPlugin implements Runnable {

ObjectMapper om = new ObjectMapper();

public static final String MY_NAME = "hint_replayer";
private volatile boolean goOn = true;
private ColumnFamilyPersonality hintCf;

public HintReplayer(Server server) {

public String getName() {
return MY_NAME;

public void init() {
hintCf = Coordinator.getHintCf(server);
runThread = new Thread(this);

public void run() {
while (goOn){
List<ClusterMember> live = server.getClusterMembership().getLiveMembers();//1
for (ClusterMember member : live){
SortedMap<AtomKey,AtomValue> results = hintCf.slice(member.getId(), "", "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"); //2
if (results.size() > 0){
for (Entry<AtomKey, AtomValue> i : results.entrySet()){
if (i.getValue() instanceof ColumnValue){
ColumnValue v = (ColumnValue) i.getValue();
ColumnKey c = (ColumnKey) i.getKey();
writeAndDelete(member, c, v);
try {
} catch (InterruptedException e) { }

private void writeAndDelete(ClusterMember m, ColumnKey c, ColumnValue v){
Destination d = new Destination();
Client client = this.clientForDestination(d);
try {
client.post(om.readValue(v.getValue(), Message.class));
hintCf.delete(m.getId(), c.getColumn(), System.currentTimeMillis() * 1000);
} catch ( IOException | RuntimeException e) {
try {
} catch (InterruptedException e1) {


We already have a test that demonstrated how the coordinator works.  Testing this code is basic. We shut down some servers and do a write. The node that receives the write should store hints for the down nodes, when those nodes come back alive the hints should be re-delivered.

    //insert when some nodes are down
failIfSomeAreDown(s, clAll);
for (int i=0;i<2;i++){
//start up all nodes
for (int i = 0; i < s.length; i++) {
s[i] = new Server(cs[i]);
//get the hint replayer from the first node.
HintReplayer h = (HintReplayer) s[0].getPlugins().get(HintReplayer.MY_NAME);
for (int tries = 0; tries < 10; tries++) {
//wait for the delivered counter to get to 3
long x = h.getHintsDelivered();
if (x == 3) {
Assert.assertEquals(3, h.getHintsDelivered());
//Now go to the servers and ensure the new value has been delivered
int found = 0;
for (int i = 0; i < s.length; i++) {
ColumnValue cv = (ColumnValue) s[i].get("abc", "def", "a", "b");
if (cv != null && cv.getValue().equals("d")){
Assert.assertEquals(3, found);


Hints help us ensure writes and deletes do not get lost during random outages. They make the system more self-healing, and in CAP they can help you have a 'little more C'. Even though hinted handoff does not provide any additional guarantees that the Consistency Level does not.

What should Nibiru do next? Hit me up on twitter and let me know what NoSQL feature I should try to build next!


Saturday Jan 31, 2015

Request coordination, eventual consistency, and CAP - Building a NoSQL store - Part 10

In the last blog we mentioned the CAP theorem but now it is time to dive in. Distributed systems face challenges that non distributed systems do not. If data is replicated the system needs some notion of where data is, how it is replicated for redundancy, how the data is kept in sync, and what operations are available to the user.

"Eventual consistency" was the "Affordable Care Act" of the Dynamo Style (Cassandra) vs Strongly Consistent (HBase) debate. So embattled was the term "Eventual consistency" that "Tunable consistency" was coined. Fast forward some time and Cassandra now supports "atomic compare and swap" and HBase has support for "sometimes relaxed consistency". ROFL. That, as they say, is politics ... I mean... vendor driven open source...

By the way, in case you think I am a drama queen/conspiracy theorist and surely there is no quasi-political system attempting to defame eventual consistency, I did some searches for reference links to put in this article. As a result I have fallen into an anti-eventual consistency add targeting campaign...

In any case, a point I will constantly beat to death Nibiru aims to be plugable-everything. A lofty goal is to support all (possible) consistency models. Can it be done? I don't know, but it is worth trying I guess. I do not have all the answers. That being said, a system can not be "half-atomic" or "kinda-consistent", but per Column Family/Table we should be able to chose what consistency model fits what we want. For example, one store "pets" is an eventually consistency ColumnFamily store while another store "credit_cards" is configured as a strongly consistent key value store.

Cassandra's implementation of CAP is to provide per-request Consistency Level. The consistency level allows the user to say what aspects they desire. For example:

WRITE.ALL: The write is acknowledged (durable) by ALL Natural Endpoints  (all replicas of the data) before a result is returned to the client.
READ.ALL: The read request is sent to ALL Natural Endpoints the results are compared and the highest time stamp wins. (Other logic for timestamp tie)

WRITE.ONE: The request is sent to all Natural Endpoints, after one acknowledged (durable) write completes the result is returned, other writes still continue on the server
READ.ONE: The request is sent to one (or more) Natural Endpoints, the first result is returned to the client

There are other Consistency Levels including QUORUM, LOCAL_ONE, Serial (For compare and swap), etc, but save those for another day.

As our last blog showed a router can return one or more Destination(s)/Natural Endpoints for a given row key. Given the destinations and the consistency level we can make different strategies (code paths) for the desired results.

Lets take a case that is easy to explain, WRITE.ALL. A client sends a write operation for Column Family pets
{ rowkey: rover , column: type, value: poodle }

The node that receives this request is called the coordinator. The coordinator computes the Natural Endpoints (one of which could be local) where the data lives. For replication factor N, N futures will be created, each future will forward the request. Since the request is as CONSISTENCY ALL, we want to wait for all the futures to finish before a result is returned to the client. We also have an upper limit on how long we wish to wait for the futures before we consider a timeout.

This process looks something like this:

client ->  serverX --->|
server3 |
server4 <---|
server5 <---|
server6 <---|

client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 -------------------->|
server6 -------------------->|

I took the approach of separating the EventualCoordinator (the thing managing the futures) from the Merger (the logic merging the results). I did this because HighestTimestampWins does not make sense for a key value store which has no timestamp.

Lets do this

The coordinator does a few things. First the request could have a local component. The LocalAction class contains the logic to physically carry out the operation locally. The merger is a class that produces one result from multiple responses. Because N nodes will return N results, but only a single result is returned to the client. Again we defined the LocationAction and the Merge here because the eventualCoordinator is message/response agnostic.

  public Response handle(Message message) {
Token token = keyspace.getKeyspaceMetadata().getPartitioner().partition((String) message.getPayload().get("rowkey"));
List<Destination> destinations = keyspace.getKeyspaceMetadata().getRouter()
.routesTo(message, server.getServerId(), keyspace, server.getClusterMembership(), token);
long timeoutInMs = determineTimeout(columnFamily, message);

if (ColumnFamilyPersonality.COLUMN_FAMILY_PERSONALITY.equals(message.getRequestPersonality())) {
LocalAction action = new LocalColumnFamilyAction(message, keyspace, columnFamily);
ResultMerger merger = new HighestTimestampResultMerger();
return eventualCoordinator.handleMessage(token, message, destinations,
timeoutInMs, destinationLocal, action, merger);


So let's dive into the eventual coordinator. If there is only one destination and that destination is here. We do the action and return the results.

  public Response handleMessage(Token token, final Message message, List<Destination> destinations,
long timeoutInMs, Destination destinationLocal, final LocalAction action, ResultMerger merger) {
if (destinations.size() == 1 && destinations.contains(destinationLocal)) {
return action.handleReqest();

This next part prevents us from 1 request turning into 3 turning into 9 ...turning into doom. Basically we tag messages with "reroute" so we know if it is a client or server that sent this message.

    if (message.getPayload().containsKey("reroute")){
return action.handleReqest();
if (!message.getPayload().containsKey("reroute")){
message.getPayload().put("reroute", "");

I'm back Doc. I'm back from the future!

Java's ExecutorCompletionService is pretty nifty. It reminds me of what ThreadGroup does for threads. For each Destination the data is either local or remote so we have a callable to deal with both cases. LocalActionCallable wraps the LocalAction and the RemoteMessageCallable forwards the request over the network.

    ExecutorCompletionService<Response> completionService = new ExecutorCompletionService<>(executor);
final ArrayBlockingQueue<Response> results = new ArrayBlockingQueue<Response>(destinations.size());
ArrayBlockingQueue<Future<Response>> futures = new ArrayBlockingQueue<Future<Response>>(destinations.size());
for (final Destination destination : destinations) {
Future<Response> f = null;
if (destination.equals(destinationLocal)) {
f = completionService.submit(new LocalActionCallable(results, action));
} else {
f = completionService.submit(new RemoteMessageCallable(results, clientForDestination(destination), message));

After the futures have been submitted we want to wait for some amount of time for all to return. If we hit the deadline we fail, if we do not get all results before the deadline we fail, and if one future fails we fail.

    long start = System.currentTimeMillis();
long deadline = start + timeoutInMs;
List<Response> responses = new ArrayList<>();
if (c.getLevel() == ConsistencyLevel.ALL) {
while (start <= deadline) {
Response r = null;
try {
Future<Response> future = completionService.poll(deadline - start, TimeUnit.MILLISECONDS);
r = future.get();
if (r != null){
} catch (InterruptedException | ExecutionException e) {
if (r == null){
return new Response().withProperty("exception", "coordinator timeout");
if (r.containsKey("exception")){
return r;
if (responses.size() == destinations.size()){
//success condition

start = System.currentTimeMillis();
if (responses.size() == destinations.size()){
return merger.merge(responses, message);
} else {
return new Response().withProperty("exception", "coordinator timeout");

Note at level ALL two success and one fail is still a fail. Write operations do not have anything fancy to merge but reads do. Here is the highest timestamp wins merge logic. Every time I write the "find the highest" algorithm I am reminded of my first CS class in fortan. Thanks Professor D! (But we aint fortran-in no more)

private static ObjectMapper OM = new ObjectMapper();

public Response merge(List<Response> responses, Message message) {
if ("put".equals(message.getPayload().get("type"))
|| "delete".equals(message.getPayload().get("type"))) {
return new Response();
} else if ("get".equals(message.getPayload().get("type"))) {
return highestTimestampResponse(responses);

} else {
return new Response().withProperty("exception", "unsupported operation " + message);

private Response highestTimestampResponse(List<Response> responses){
long highestTimestamp = Long.MIN_VALUE;
int highestIndex = Integer.MIN_VALUE;
for (int i = 0; i < responses.size(); i++) {
Val v = OM.convertValue(responses.get(i).get("payload"), Val.class);
if (v.getTime() > highestTimestamp) {
highestTimestamp = v.getTime();
highestIndex = i;
return responses.get(highestIndex);



This code is taken from the unit test here. (I should be doing perma-links I am so lazy.) We should be able to test two things:

  1. If all nodes are up, operations at ALL should pass
  2. If at least one node is down, operations at ALL should fail
    ColumnFamilyClient cf = new ColumnFamilyClient(s[0].getConfiguration().getTransportHost(), s[0]
Session clAll = cf.createBuilder().withKeyspace("abc").withColumnFamily("def")
.withWriteConsistency(ConsistencyLevel.ALL, new HashMap())
.withReadConsistency(ConsistencyLevel.ALL, new HashMap()).build();
passIfAllAreUp(s, clAll);
failIfSomeAreDown(s, clAll);

public void passIfAllAreUp(Server [] s, Session sb) throws ClientException {
Response r = sb.put("a", "b", "c", 1);
int found = 0 ;
for (int i = 0; i < s.length; i++) {
ColumnFamilyPersonality c = (ColumnFamilyPersonality) s[i].getKeyspaces().get("abc").getColumnFamilies().get("def");
Val v = c.get("a", "b");
if (v != null){
//assert data is in three places
Assert.assertEquals("c", c.get("a", "b").getValue());
found ++;
Assert.assertEquals(3, found);
Assert.assertEquals("c", sb.get("a", "b").getValue());


public void failIfSomeAreDown(Server [] s, Session sb) throws ClientException {
for (int i = 2; i < s.length; i++) {
try {
sb.put("a", "b", "c", 1);

} catch (ClientException ex){
Assert.assertTrue(ex.getMessage().equals("coordinator timeout"));


Sweet! Dude! There you have it! Eventual Consistency, I mean Tunable Consistency, I mean Sometimes Relaxed Consistency :) Basically from here implementing other consistency levels is adding different case logic and pulling results from the ExecutorCompletionService. What is coming next? Hinted Handoff I think :) Stay tuned!

Wednesday Jan 28, 2015

The curios case of the never joining node.

[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h status
Note: Ownership information does not include topology; for complete information, specify a keyspace
Datacenter: datacenter1
|/ State=Normal/Leaving/Joining/Moving
--  Address      Load       Tokens  Owns   Host ID                               Rack
UN  47.47 GB   256     10.4%  5a635595-1cc9-4b43-8e6b-12ef4639c1db  rack1
UN   42.12 GB   256     8.9%   bca09da7-85a9-476b-9a60-1b6a0973f375  rack1
UN   44.71 GB   256     10.0%  06b6a87e-7c3e-42ae-be28-20efe8920615  rack1
UN   48.01 GB   256     9.9%   d5746748-457e-4821-8f3d-bdac850cfcd3  rack1
UN  45.47 GB   256     10.3%  1d3bdec7-6f0b-4bc9-9329-72cd41cf5934  rack1
UJ  29.35 GB   256     ?      e973ef19-bd06-4a2a-bff1-661742b5ca14  rack1
UN   43.39 GB   256     11.3%  ea95e65d-8365-402c-aab9-3658fe5cc69f  rack1
UN  46.3 GB    256     10.7%  007c3169-dde4-4d81-acb9-3bbeda76723a  rack1
UN  48.15 GB   256     9.6%   0ed47a3d-cf70-4e08-99a3-3fdbe5ea4c72  rack1
UN   48.39 GB   256     9.4%   ca0144ac-948e-49ce-a3b9-857827c3cf5b  rack1
UN   43.54 GB   256     9.6%   59d23ac8-50ab-45dc-9ddc-d7f2825e1169  rack1

[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h netstats
Bootstrap d4ab87c0-a6ff-11e4-82bf-6f05f2e5b99d
Read Repair Statistics:
Attempted: 0
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name                    Active   Pending      Completed
Commands                        n/a         0              0
Responses                       n/a         0        4248573

Can this be expla

[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h join
This node has already joined the ring.

Yea you could have fucking fooled me.

[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h gossipinfo

Monday Jan 26, 2015



Good: Good to have a storage format that matches what the database does.

C* has come a long way over the past few years, and unfortunately our storage format hasn't kept pace with the data models we are now encouraging people to utilise.

Bad! Why people encouraged to utilize database wrong?

??And historically, "columnar oltp sql data store" has not really been a thing.

Bad/Confused? is Cassandra a columnar oltp data store? is it trying to become one?

I see it as specialization for special cases.




  • deal with potential use cases where maybe the current sstables are not the best fit
  • allow several types of internal storage formats (at the same time) optimized for different data types

I understand the "wouldn't it be cool" factor but let's be clear: Cassandra is not a research-paper creation engine. Pluggability here improves generality at the cost of a substantial increase of implementation complexity, QA work, and cognitive burden on users. This is a bad direction for the project.

Bad: Basically it got hand waved, the concept of making a special format or letting the database be pluggable in any way. Got shot down even though people were interested.

That all being said.

Cassandra has evolved/is evolving into a quasi colummnar, kinda fixed schema, around custom query language database. As such it only makes sense that it have a quasi/columnar, kinda fixed schema friendly sstable format (not saying that jokiningly).

Also if it works just as well in the general case, that is good as well. Assuming it does not take 4 major versions to become stable, and in the process cause multiple heart aches, along the lines of broken streaming, lost data shit.

Request Routing - Building a NoSQL Store - Part 9

This blog series is trucking along. What fun! I decided to switch the titles from 'Building a Column Family store' to 'building a NoSQL store' because we are going far beyond demonstrating a Column Family, store since a Column Family is only an on-disk representation of data.

In our last blog we used a gossiper for cluster membership and showed how to replicating meta-data requests across the cluster. Now, we will focus on implementing plugable request routing. Unless you have been living under a rock you are probably aware of a few things like the CAP theorem and Dynamo Style consistent hashing. Stepping back a bit, a goal of Nimbiru is to support plugable everything, and request routing is no exception. For example, a user may wish a system where the node they send the operation to is the node where data is stored, they may wish a system like Cassandra where data is stored on multiple cluster nodes using consistent hashing, or something else entirely.

To make things plugable we start with a router interface.

public interface Router {

* Determine which hosts a message can be sent to. (in the future keyspace should hold a node list)
* @param message the message sent from the user
* @param local the uuid of this server
* @param requestKeyspace the keyspace/database of the request
* @param ClusterMembership an interface for live/dead cluster nodes
* @param token a token (typically a hash) computed from the rowkey of the message
* @return all hosts a given request can be routed to
public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace, ClusterMembership clusterMembership, Token token);


Wow! This method has a lot of arguments! Haha. Some of the parameters (clusterMembership and ServerId) probably could have been constructor injected, and token and message are a redundant (because you can determine the other) but for now using a pure interface seemed attractive (since everyone hates on abstract classes these days). Not all routers need each of these things.

I mentioned two cases. Lets start with an easy case store and read data locally only. 

public class LocalRouter implements Router{

public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
Destination d = new Destination();
return Arrays.asList(d);


This is fairly easy to reason about. For any request the answer is found here! This is basically a do-it-yourself (outside of nibiru) sharding. It also makes sense for a single node deployment.

Now it gets interesting

The dynamo style request routing is more involved. Even thought it has been explained many times I will offer my own quick description.

At first do not think ring, instead think of hashing a key into a flat fixed sized array.

10 MOD 7

If you have replication you would also write that next slot. Imagine we had replication 3.

10 MOD 7. Replication 3

What happens if the data hashes to the last element? Think of the array as a ring and the replication will wrap-around.

6 MOD 7. Replication 3

Implementing this is slightly different. We do not use array indexes. Instead we consider that each node has a token that represents it's position in the ring.

The token router needs to be aware of two facts, a Replication Factor (RF) that describes how many copies of data, and a token map that describes the splits.

1. Find the first destination in the sorted map
2. while < replication factor add the next destination (wrap around if need be)

public class TokenRouter implements Router {

public static final String TOKEN_MAP_KEY = "token_map";
public static final String REPLICATION_FACTOR = "replication_factor";

public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
//TODO this is not efficient we should cache this or refactor
Map<String, String> tokenMap1 = (Map<String, String>) requestKeyspace
TreeMap<String,String> tokenMap = new TreeMap<String,String>(tokenMap1);
Integer replicationFactor = (Integer) requestKeyspace
int rf = 1;
if (replicationFactor != null){
rf = replicationFactor;
if (rf > tokenMap.size()) {
throw new IllegalArgumentException(
"Replication factor specified was larger than token map size");
List<Destination> destinations = new ArrayList<Destination>();

Map.Entry<String,String> ceilingEntry;
ceilingEntry = tokenMap.ceilingEntry(token.getToken());//1
if (ceilingEntry == null){
ceilingEntry = tokenMap.firstEntry();//1

destinations.add(new Destination(ceilingEntry.getValue()));
for (int i = 1; i < rf; i++) {
ceilingEntry = tokenMap.higherEntry(ceilingEntry.getKey());//2
if (ceilingEntry == null) {
ceilingEntry = tokenMap.firstEntry();//2
destinations.add(new Destination(ceilingEntry.getValue()));
return destinations;

If that code is making your head buzz the unit test may explain it better. One of the really great parts of Nibiru is that the decision to make everything plugable makes testing and developing easier.

We can mock the cluster membership to provide a 3 node cluster that is always 'alive'.

  private ClusterMembership threeLiveNodes(){
ClusterMembership mock = new ClusterMembership(null, null) {
public void init() {}
public void shutdown() {}
public List<ClusterMember> getLiveMembers() {
return Arrays.asList( new ClusterMember("", 2000, 1, "id1"),
new ClusterMember("", 2000, 1, "id2"),
new ClusterMember("", 2000, 1, "id3"));
public List<ClusterMember> getDeadMembers() { return null; }};
return mock;

Next we give each node a single token.

  public static TreeMap<String,String> threeNodeRing(){
TreeMap<String,String> tokenMap = new TreeMap<>();
tokenMap.put("c", "id1");
tokenMap.put("h", "id2");
tokenMap.put("r", "id3");
return tokenMap;

Now we can throw some data at it and see how the router routes it. Notice here that the "z" key "wraps" around to the first node.

public void test(){
TokenRouter token = new TokenRouter();
Keyspace keyspace = new Keyspace(new Configuration());
KeyspaceMetaData meta = new KeyspaceMetaData();
Map<String,Object> props = new HashMap<>();
props.put(TokenRouter.TOKEN_MAP_KEY, threeNodeRing());
Partitioner p = new NaturalPartitioner();
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("a")).get(0).getDestinationId());
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("aa")).get(0).getDestinationId());
Assert.assertEquals("id2", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("h")).get(0).getDestinationId());
Assert.assertEquals("id3", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("i")).get(0).getDestinationId());
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("z")).get(0).getDestinationId());

Great success! Now try with a replication factor of 2!

public void testWithReplicationFactor(){
TokenRouter token = new TokenRouter();
Keyspace keyspace = new Keyspace(new Configuration());
KeyspaceMetaData meta = new KeyspaceMetaData();
Map<String,Object> props = new HashMap<>();
props.put(TokenRouter.TOKEN_MAP_KEY, threeNodeRing());
props.put(TokenRouter.REPLICATION_FACTOR, 2);
Partitioner p = new NaturalPartitioner();
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("a")));
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("aa")));
Assert.assertEquals(Arrays.asList(new Destination("id2"), new Destination("id3")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("h")));
Assert.assertEquals(Arrays.asList(new Destination("id3"), new Destination("id1")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("i")));
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("z")));

Correctly the API returns two "natural endpoints" for the given token. Great Success!

Sweet! So I do not want to pat myself on the back much, but I have to take this opportunity to say I love the way Nibiru is shaping up. When I presented the idea of a completely plugable NoSql to a friend he did voice a concern that having 'plugable everything' could cause the system to have 'lots more bugs' because of all the potential use cases. I actually do not see it that way. I think that forcing as many components into interfaces and API's is making the process cleaner by having less use-case specific hacks and optimizations.

This is important if you think about it. Remember that both hbase and cassandra are data stores that were basically 'thrown away' by their creators, powerset and facebook respectively. (Yes they were both thrown away, abandoned and open sourced). IMHO both of them suffer by being build on specific assumptions for specific use cases. ( X is good and Y is bad because we value most X type rhetoric)

The next blog is going to show some already working code about coordinating requests against N nodes and building some of the plugable consistency on the read/write path. Futures, executors, and multi node failures coming, fun!


Monday Jan 19, 2015

Auto Clustering and gossip - Building a Column Family store - Part 8

Up until this point we have been building a single node NoSQL implementation. Though a NoSql can by definition be a single node system often times a NoSql data store is  a multi-node solution. Building a multiple node system has many challenges that a single node system does not. The designers need to consider how the CAP Theorem applies to the API they are attempting to implement. A goal of nibiru is to create a plug-able NoSql system build around APIs. Thus we can think of Cluster Membership as something plug-able.

Plug-able Cluster Membership

I have commonly seen a few systems for cluster membership:

  1. Static: Cluster membership is configuration based and there is no node discovery (e.g. zookeeper static list of peers)
  2. Master: Nodes register themselves with an entity (e.g HBase register servers register with a master)
  3. Peer-to-Peer: Nodes can auto-discover each other typically with one or more known contact points (e.g. Cassandra/Riak)
  4. active-passive: At any given time a single node is the master and typically there is an automatic/manual failover

Other components in the nosql stack will use the cluster information to route requests and place data. Lets shell out a base class for a ClusterMembership:

public abstract class ClusterMembership {

protected Configuration configuration;
protected ServerId serverId;

public ClusterMembership(Configuration configuration, ServerId serverId){
this.configuration = configuration;
this.serverId = serverId;

public abstract void init();
public abstract void shutdown();
public abstract List<ClusterMember> getLiveMembers();
public abstract List<ClusterMember> getDeadMembers();


public class ClusterMember {
private String host;
private int port;
private long heatbeat;
private String id;

Because I am from the Dynamo school of hard knocks, I decided to start off with method #3 and forked off a gossip implementation I found. Each nibiru node will create a node-id (uuid) on start up, get a seed-list from the configuration, and then attempt to join the cluster.

public class GossipClusterMembership extends ClusterMembership{

public static final String HOSTS = "gossip_hosts_list";
public static final String PORT = "gossip_port";

private GossipService gossipService;

public GossipClusterMembership(Configuration configuration, ServerId serverId) {
super(configuration, serverId);

public void init() {
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
GossipSettings settings = new GossipSettings();
List<String> hosts = null;
if (configuration.getClusterMembershipProperties() != null){
hosts = (List<String>) configuration.getClusterMembershipProperties().get(HOSTS);
} else {
hosts = new ArrayList<String>();
int port = 2000;
for (String host: hosts){
GossipMember g = new RemoteGossipMember(host, port, "");
try {
gossipService = new GossipService(configuration.getTransportHost(), port, serverId.getU().toString(), LogLevel.DEBUG, startupMembers, settings);
} catch (UnknownHostException | InterruptedException e) {
throw new RuntimeException(e);

Now, we should be able to make a test and prove that N instances of Nibiru have locate each other via gossip. We use as the seed-list and the other two nodes and should discover each other.

public void letNodesDiscoverEachOther() throws InterruptedException, ClientException{
Server [] s = new Server[3];
Configuration conf = TestUtil.aBasicConfiguration(node1Folder);
Map<String,Object> clusterProperties = new HashMap<>();
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList(""));
s[0] = new Server(conf);
Configuration conf = TestUtil.aBasicConfiguration(node2Folder);
Map<String,Object> clusterProperties = new HashMap<>();
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList(""));
s[1] = new Server(conf);
Configuration conf = TestUtil.aBasicConfiguration(node3Folder);
Map<String,Object> clusterProperties = new HashMap<>();
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList(""));
s[2] = new Server(conf);
for (Server server : s){
Assert.assertEquals(2 , s[2].getClusterMembership().getLiveMembers().size());
Assert.assertEquals("", s[2].getClusterMembership().getLiveMembers().get(0).getHost());

Winning! Now nodes can discover each other! The next question is what message types should be sent over gossip and which should be sent via some other method. I decided to keep the gossip code for only cluster membership..

The first thing I decided to tackle was meta-data operations. This makes sense because without tables or column families, we can nor write or read from tables or column families. Duh! The approach I took was:

if a node gets a meta-data request then re-broadcasts it out to all live hosts.

This is not a perfect solution because we need something to re-broadcast messages to down hosts (and ordering could be an issue down the line) but for now it gets the point across. We can potentially use something like hinted-handoff to deal with this. The piece that is handling message routing is known as the coordinator.

  public Response handle(Message message) {
if (SYSTEM_KEYSPACE.equals(message.getKeyspace())) {
return metaDataCoordinator.handleSystemMessage(message);
//other stuff

What we are doing here is: if the message does not have "reroute" in its payload then broadcast it to all. Otherwise we handle it locally. (By broadcast I mean send N unicast messages not a tcp broadcast or multicast)

  public Response handleSystemMessage(final Message message){
if (MetaPersonality.CREATE_OR_UPDATE_KEYSPACE.equals(message.getPayload().get("type"))){
(Map<String,Object>) message.getPayload().get("properties"));
if (!message.getPayload().containsKey("reroute")){
message.getPayload().put("reroute", "");
List<Callable<Void>> calls = new ArrayList<>();
for (ClusterMember clusterMember : clusterMembership.getLiveMembers()){
final MetaDataClient c = clientForClusterMember(clusterMember);
Callable<Void> call = new Callable<Void>(){
public Void call() throws Exception {
(Map<String,Object>) message.getPayload().get("properties")

return null;
try {
List<Future<Void>> res = metaExecutor.invokeAll(calls, 10, TimeUnit.SECONDS);
//todo return results to client
} catch (InterruptedException e) {

return new Response();
} else ...

Obviously this implementation needs better error handling and potentially longer timeouts. We test this by taking our last test and sending out a meta-data request. Within a few milliseconds the other nodes should have received and processed that message. In other words create a keyspace and column family then check each not to ensure this is complete on all hosts.

    MetaDataClient c = new MetaDataClient("", s[1].getConfiguration().getTransportPort());
c.createOrUpdateKeyspace("abc", new HashMap<String,Object>());

for (Server server : s){
Map<String,Object> stuff = new HashMap<String,Object>();
stuff.put(ColumnFamilyMetaData.IMPLEMENTING_CLASS, DefaultColumnFamily.class.getName());
c.createOrUpdateColumnFamily("abc", "def", stuff);
for (Server server : s){
for (Server server : s){

Great. Now that we have cluster membership we can start tackling the really - really - really fun stuff like request routing and quorum reads etc! Stay tuned!

Saturday Jan 03, 2015

Impala vs Hive - un Buzzifying big data

Everyone loves fast queries, no denying that. Everyone loves charts that say things are fast. I had some time to sit down and play with Impala for the past couple weeks and wanted to give my initial impressions on the debate from the seat of someone who uses Hive, MySql, and !Vertica! in my daily life.

Pre-blog rando notes

I had just installed CDH 5.3 in our 3 node staging sandbox. Believe it or not, I spend most of my time and very modest sized hardware clusters, 3 nodes or 10 nodes. These days I have read some vendor documentation that "suggests" the "standard" Hadoop now the i2.8xlarge, the computer that is $2476.16 monthly and $3.3920 hourly, with 200+ GB of RAM (on one year contract). Well guys like me without deep pockets routinely use m1.xlarge which is $163.52 monthly and $0.2240 hourly.

For hadoop, I sometimes go with m1.xlarge, so I can get 4x 400 GB disks without paying for crazy enterprise SSDs or 250GB RAM. I guess this makes me the oddball. This could lead me into an amazon rant, about lack of machines with nice local storage options but I will not go there.

Impala round 1: Kick the tires, they kick back

So anyway impala. I click a few buttons in Cloudera Manager and it is installed. Great. That was easy button. I go on a hadoop node and run impala-shell.

My old tables are there. New tables I am making do not appear. I spend some time and my co-worker finds a command  "invalidate metadata" which I run every so often. (maybe there is some way I would not need to do this, but not interested in searching ATM)

I query our main table.

impala > "select * from weblogs limit 4"
impala "Sorry I dont know what com.huffingtonpost.Serde is"

Ok fair. I expected this. I know impala is written in c/c++ and it is not going to be able to understand every format. Especially those just ones written in java (although it can bridge java UDFS). But this is an important thing you know, the BUZZ says "use X its fast and awesome!" but unless you are starting fresh compatibility and upgrading is something that you have to take seriously. Once you have a large infrastructure built around X you can not flap around and react to every new architecture or piece of software out there. You want to be getting things done, not rebuilding your stack everyday.

Impala Round 2: Best laid plans

So ok I get it. I cant expect impala to understand our proprietary serde/input format. The good news is that Impala and Hive are not enemies. At best they are frenemies :).

Now the great thing about hive is I can transmute things between different formats, json, and xml, and parquet, and orc, and text. So this query command will just make a plan text table.

hive > create table impala_test1 as select * from weblogs
hive [doing stuff]
hive [doing stuff]
impala > invalidate metadata
impala > select * from weblogs"
impala "Sorry I dont like list<string>"

So what happened here is I converted to text files, but Impala does not understand Hive's complex types yet...Damn... Well I see this on the impala roadmap, but if you are keeping score in the meaningless x vs y debate, Impala is only a subset of hive's data formats and is only a subset of it's features. Hives collection types are very powerful and missing them hurts. It would be better if impala just ignored these fields instead of blowing up and refusing to query the entire table.

Ok so lets deal with this.

Impala Round 3: I am willing to accept less than the table I had before

Lets do a query and strip out all the collection columns. Again it would be better if I did not have to have N copies of data in different formats here, but for the purposed of kicking the tires I will just make a duplicate table in a different format without the columns impala does not understand.

hive > create table impala2 as select col1,col2 from weblogs
impala> select * from impala2 limit 4
impala> result1 , result2, result3

Great success! So now I can run queries and I was really impressed with performance of some operations.

Impala round 4: Ok lets get real

We have to do a lot of top-n queries. Top n is a big PITA, sspecially over arbitrary windows because you can not easily pre-compute. We also have a lot of active entries (millions).

We went through a process and moved over some summary tables that looked like this:

entryid, viralcount, seedcount, othercount, otherstuff2count
3434252, 4, 2, 4,7
3412132, 10, 3, 4, 2

In a nutshell this is a PITA query we run:

select entryid, sum(viralcount) as x, sum(seedcount) from [data] where ( join conditions ) group by entryid, order by x

impala > [above query] where [one day time range]
impala > fast, excitingly fast, results

Great lets try the actual query on full data set. (11Gb data)

impala > [above query] where [30 day time range]
impala > out of memory

Impala Round 5: Superstitious tuning.

So when things run out of memory I add memory. The setting was 256MB. I tried 1GB. I queried again. It failed again.

I set the memory to no limit, and it worked.
I added the tables to hdfs cache.
Well that is good it worked and it was fast really fast.

Unfortunately, I don't love what this means for the "yarn" world because if impala is sucking up as much memory as it feels, I am not sure how happy everything else on my hadoop cluster is going to be.Maybe there was some magic setting like 4GB would have worked but I don't love having to constantly review memory thresholds of software so I set to unlimited.

It was also a bit frustrating to me because the table was only 11GB and the intermediate results of that query are actually very small (one row per article).

Who knows how it works and why? Whatevers.

Impala Round 6: Bring out tez

So as you know impala is a cloudera "thing" and tez is a Hortonworks "thing". One of the reasons I upgraded was that Tez is included in hive 13.1 which should be part of CDH 5.3.

Now apparently tez is supposed to deliver some serious hive performance gains. I went looking for a chart of tez vs impala for you. Instead I found a chart about how IBMS BIG SQL beats them both See what I said about VS debates? If all your care about is charts you always get beat by someone.


hive > set hive.exeecution.engine = tez
 class not found tez/whatever

Arge. You gotta be crapping me. Today was a frustrating day. Apparently Tez is missing some jars  in CDH in typical accidentally,potential not accidentally/[I dont care] fashion. Likely fallout from Next hadoop enterprise pissing match.

This really grinds my gears...

Final thoughts

If you thought I was going to sit here and tell you how great X is and how bad Y is and show you a chart validating my opinions. Well I did not do that.

I see a lot of potential for impala, but it is not close to the features of hive. Hive and impala actually work well together. Hive just simply is more plugable and versatile. Hive is not as fast on the read side, but for your level 1 data ingestion and aggregation building Hive is still the bomb. Once you have sliced and diced with the Hive "Hammer" impala makes a very nice "Scalpel". It is really nice that Hive and Impala share the same metastore, and can read and write each others data (parquet) as well.


Friday Jan 02, 2015

Bloom Filters - Building a Column Family store - Part 7

The last blog in this series we built a Commitlog for our quickly evolving NoSql/Column Family store nibiru. I decided this morning that I had been dodging BloomFilters for a while and it was time to get to it.

You may ask what are Bloom Filters and why might a Column Family store need/want them? If you read my post on SSTables and Compaction, you can see that we are implementing a Structured Log Merge system. Namely, we are not doing updates in places, and instead we periodically merging files. If you imagine data in these files over time, you can picture a scenario where there are 30 or more SSTables on disk. Now, imagine that only 1 of those 30 files has the rowkey being searched for. To find that rowkey we would have to search all 30 of the SStables. 

A Bloom Filter (described here in detail) could potentially help us avoid searching tables. This is because a Bloom Filter can give two answers 'Absolutely NO' or 'Maybe'. (Answers I used to get when I was asking someone out a date!)

'Absolutely No' in this case means we do not have to bother searching, because we are sure the rowkey is not in this table.

'Maybe' means we do have to search and maybe the row key is in the table or maybe it is not. (We would have searched it anyway without the Bloom Filter so no big loss)

Because of these properties, Bloom filters make a very nice option for a negative-cache. (For those interested the pull request with the majority of the feature is here  I did make other subtle changes along the way). So let's have at it and implement this bad boy.

Writing and Persisting Bloom Filters

Remember that our only writes are Memtable, Flushing, and SSTables. Thus we can simply hook into the SSTableWriting process and attach in a bloom filter writer.

public class SsTableStreamWriter {


private BloomFilterWriter bloomFilter;

public SsTableStreamWriter(String id, ColumnFamily columnFamily){
indexWriter = new IndexWriter(id, columnFamily.getKeyspace().getConfiguration());
bloomFilter = new BloomFilterWriter(id, columnFamily.getKeyspace().getConfiguration());

public void write(Token t, Map<String,Val> columns) throws IOException {
long startOfRecord = ssOutputStream.getWrittenOffset();

public void close() throws IOException {

I did not go out and write a Bloom Filter from scratch I decided to use guava. But I did wrap it in facade to make it look like I did something.

package io.teknek.nibiru.engine;

public class BloomFilterWriter {

private final BloomFilter<Token> bloomFilter;
private static final int expectedInsertions = 50000; //Be smarter here
private final String id;
private final Configuration configuration;

public BloomFilterWriter(String id, Configuration configuration){
this.id = id;
this.configuration = configuration;
bloomFilter = BloomFilter.create(TokenFunnel.INSTANCE, expectedInsertions);

public enum TokenFunnel implements Funnel<Token> {
public void funnel(Token person, PrimitiveSink into) {

public void put(Token t){
bloomFilter.put(t);//see I did something

public static File getFileForId(String id, Configuration configuration){
return new File(
configuration.getSstableDirectory(), id + ".bf");

public void writeAndClose() throws IOException {
BufferedOutputStream bo = new BufferedOutputStream(new FileOutputStream(getFileForId(id,configuration)));

You do have to size the filters correctly otherwise they are not efficient. A better thing to do here would be estimate on the SSTables being flushed or compacted. I just cheated and put it at 50000 for the time being.

Reading Bloomfilters

When we open up an SSTable we also open the associated Bloom Filter.

public class SsTable implements Comparable<SsTable>{

private KeyCache keyCache;
private ColumnFamily columnFamily;
private SsTableReader ssTableReader;
private BloomFilter bloomFilterReader;

public SsTable(ColumnFamily columnFamily){
this.columnFamily = columnFamily;

public void open(String id, Configuration conf) throws IOException {
bloomFilterReader = new BloomFilter();
bloomFilterReader.open(id, conf);

keyCache = new KeyCache(columnFamily.getColumnFamilyMetadata().getKeyCachePerSsTable());
ssTableReader = new SsTableReader(this, keyCache, bloomFilterReader);

Now here is the payday! Any operation that does a read we can ask the bloom filter for help, and skip the entire rest of the read path if the bloom filter says no!

  public Val get(Token searchToken, String column) throws IOException {
boolean mightContain = bloomFilter.mightContain(searchToken);
if (!mightContain) {
return null;

BufferGroup bgIndex = new BufferGroup();

Performance testing this is simple. First, we can just write a test that searches for a rowkey not in the SStable. This was a run I did without the Bloom Filter code.

long x = System.currentTimeMillis();
for (int i = 0 ; i < 50000 ; i++) {
Assert.assertEquals(null, s.get(ks1.getKeyspaceMetadata().getPartitioner().partition("wontfindthis"), "column2"));
System.out.println("non existing key " +(System.currentTimeMillis() - x));

result >> non existing key 11694

(I suspect the code is not opting out early when it reads a row key > search, but that is another performance tweak). Next we put the code into action.

result >> index match 891
result >> far from index 186
result >> index match 159
result >> far from index 139
result >>non existing key 26

Wow! Maximum performance!

There you have it. BloomFilters, great for looking for data that is not there :) Sounds stupid but the case happens more often then you might first think!