Edward Capriolo

Tuesday Mar 29, 2016

Deus X: The Fall - Ed's review

I have decided to change gears a bit and review one of my favorite andriod games Deus Ex: The Fall . I was a big fan of Deus Ex 3 which came out on the xbox. For those not familiar, Deux Ex is a sneak shooter. I actually play 'the fall' on train rides home, it took me a few months of playing it periodically to beat it.

What makes this game special? 

In the near future humans can be outfitted with augmentations "augs". They do things like steady your gun arm, mimetic camouflage etc. The way the Deus Ex game balances is you can not afford all the augs, so you pick and chose ones that match your game play. For example if you like run and gun type, you focus on body armor, speed enhancements and take downs, but if you want to sneak around you focus on stealth enhancements.

What makes a BAD sneak shooter ?

What makes a bad sneak shooters is huge missions, when your walking through a warehouse and you have to choke out 500 people over 4 hours of game play , this is just annoying. Think about it, could you image that in three or four hours no one realized that 500 security guards have not checked in?  Or in 4 hours that one guy at the computer would not go for a bathroom break and just happen to look in one of the 90 lockers you have hidden bodies in? Just not possible and kinda silly.

Why does 'The Fall' avoid this ?

Well obviously this is an Android game, so by its nature it avoids huge levels. This actually gives the game the right feel, they are small levels with a few rooms, you execute a few tactical take downs and you get a reward! In the xbox game a lot of time is spent moving/hiding bodies, so as not to alert others and bring about a free for all. In 'the fall' the bodies just vanish after a few seconds. Bodies vanishing is not realistic, but I think it goes with the style you knock someone out and you move on. When I play I simply force myself in the mind of a character and play a 'realistic' way, there is no way an augmented human is going to huddle in a corner waiting for 3 hours for 3 different people to be in the "perfect place",. You just make a move and be dammed with the consequences. 


I was rather impressed with the controls in fact I enjoyed them more than the console version. On screen you can switch weapons fast, icons appear when you are in take down range. A rather cool thing is that in the settings menu you can adjust the placement of each of the on screen controls. I was super impressed by this. I really did not have to move anything but the fact you could I thought was pretty neat.


One thing I enjoy is that around the game there are PDAs and computers that you can read or hack into to get some back story into the game and hints into what is unfolding. I really like that in all games, they did this in a gears of war with Journals and cogs, the nice part is this is always optional. You are not forced to watch 10 minute movies but if you care you can review the data in the world better. You can also talk to random people like a standard RPG and while they do not offer a ton to say that is still pretty cool.



You are an ex special forces character with augs drawn into something bigger than you. You are living below the radar and have to go on a variety of missions to acquire the drugs that keep you from rejecting your augs. As that goes down you have to deal with people who offer you what you need in exchange for your services and you are free to embark on side quests.For a 99 cent andriod game this plot is on a amazing and it would still be a fairly in depth plot for a console game.


Flexible game play, large environment to explore, up gradable character attributes, upgradeable weapons. Nice graphics and controls for a cell phone game. Retained a lot of the feel from the xbox game while moving to a cell platform.


While it is a sneak shooter the game is more biased towards the sneak, even with armor upgrades a couple well placed shots from enemies can put you down. The game is less fun to play as a shooter IMHO. Environments seem more detailed than characters. 


If you like the console game and you have a 30 minute train ride home everyday this game is amazing. Since it is an older game it is totally worth the cost ~ 0.99 cents. I would still happily pay 3 or 4 dollars for it. 

My score is 9.  

Wednesday Mar 09, 2016

Great Moments in Job Negotiation: Volume 1

Huffington Post is my current employer.Huffington Post is owned by AOL. The interview process has to go through two stages of HR. At the time the head of AOL also approved each hire. 

After multiple interviews with multiple people over three weeks I finally got my offer letter.

I replied to the recruiter, "This is a nice offer, but if I don't have a floppy disk in my mail box my Monday  with 30 free hours of AOL, the deal is off"

Sunday Mar 06, 2016

Rasp PI 3 is here

Up until this point you have had to attach Wifii or a 4G card to your 'internet of thing'. Well no more! the new Raspberry Pi 3 has build in wireless networking. This is going to get interesting.



Wednesday Feb 17, 2016

Python users / Data Scientists measuring PITA levels

Before I get started trashing people me say I have the greatest respect for former and current colleges, but there is a large looming problem that needs to be addressed.

The fanboy level of Python usage in people, mainly data scientists, needs to stop.

A sick blind devotion to python complete unchecked by reason

I was talking to a Python user about Spark: 
Me: "What were you looking to use spark for"
Them: I hear there is PySpark
Me: Yes very interesting, what are you looking to use it for,
Them: PySpark 

ROFL: The only take away about the spark platform is PySpark? Nothing else seemingly was interesting or caught your attention? Really nothing about streaming or in memory processing, just PySpark? lol #blinders

Your would think [data] scientist want to learn things?

I encounter this debate mostly with hive-streaming. When someone asks me about hive streaming I look at the problem. Admittedly there are actually a couple of tasks most easily addressed with streaming. But the majority of streaming things can be solved much more efficiently and correctly by writing a simple UDF UDAF in Java. What normally is a common reply when a Hive Committer, who wrote a book on hive, explains unequivocally  that a UDF is better for performance, debugging, test ability, and is not that hard to write?

"I don't want learn how to compile things | learn about java | learn about what you think is the right way to do things", You would think that a data scientist who is trying to search for great truths would actually want to find the best way to use a tool they have been working with for years.

Just to note: In hive streaming everything moving in between processes via pipes and is like 4 context switches and two serializations for each row (not including the processing that has to happen in the pipe). 

I don't care that 100% of the environment is Java, im f*ckin special

A few years back someone (prototyping in python) suggested we install LibHDFS. later someone suggested we install WebHDFS. The only reason to install these things is they must use python to do things, even if there already is prior examples of doing this exact task in java in our code base. Sysadmins should install new libraries, open new ports, monitor new services, and we should change our architecture, just because the python user does want to use Java for a task that 10 previous people have used java for. 

"I'm Just prototyping"

This is the biggest hand waiver. When scoping out a new project don't bother looking for the best tool for the job. Just start hacking away at something and then whatever type of monstrosity appears, just say its already done, someone will just have you jam it into production anyway. Good lucky supporting the "prototype" with no unit tests in production for next 4 years. You would think that someone would take lead from a professional coder and absorb their best practices. No of course not, they instead will just tell you how best practices don't apply to them.#ThisISSparta!

Anyway its 7:00 am and I woke up to write this so that I can vent. But yea its not python, its not data scientists, but there is just a hybrid intersection of the two that is so vexing. 


Friday Jan 22, 2016

My day

[edward@bjack event-horizon-app]$ git log
commit 9de21fbc97a7f573f6b0564daff20f5ce23c723e
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 16:14:20 2016 -0500

    Ow yes yaml cares about spaces...beacause ansible

commit de07401a0087e86253cbf9c0369010e21d248eb9
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 16:10:57 2016 -0500

    Why not

commit 0be598151962f647528406bad21b3b8c8e887ffd
Author: Edward Capriolo <edward.capriolo@com>
Date:   Fri Jan 22 16:05:06 2016 -0500

    This is soo much better than just writing a shell script

commit 4f4ea0b8b462a61e3ecde71ff656da9e1324095b
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 16:01:53 2016 -0500

    Why dont we have a release engineer

commit b77264618f2fbe689ecc09e4575e10935ba20600
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 15:57:56 2016 -0500


commit 912597f1ba4284a5312398ad770f6fd1d76301a1
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 15:52:21 2016 -0500

    The real yaml apparently

commit ee64c5c4340202b95a0f05784f30b63abd755d2d
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 15:32:28 2016 -0500

    Always asume kill worked. so we can start if nothing is running

Tuesday Jan 12, 2016

'No deal' is better than a 'Bad Deal'

After working for a few companies a few things have become clear to me. Some background, I have been at small companies with no code, large companies with little code, small companies with a lot of code, and large companies where we constantly re-write the same code. 

I was watching an episode of 'shark tank'. Contestant X had a product, call it 'Product X', and four of the five sharks offered nothing. The 5th shark, being very shark like, used this opportunity to offer a 'bad' deal. The maker of 'Product X' thought it over, refused the deal, and left with no deal. The other sharks were more impressed with 'Contestant X' than Product X'. They remarked that , "No deal is better than a Bad Deal". This statement is profound and software products should be managed the same way.

Think about the phrase tech-debt. People might say tech-debt kills your agility. But it is really not the tech-debt alone that kills your agility, it is 'bad deals' that lead to tech debt. As software gets larger it becomes harder to shape and harder to manage. At some point software becomes very big, and change causes a cascade of tech debt. Few people want to remove a feature. Think about Mokeys on a Ladder, and compare this to your software. Does anyone ever ask you to remove a feature? Even if something is rarely used or never used someone might advocate keeping it, as it might be used later. Removing something is viewed as a loss, even if it really is addition by subtraction. Even if no one knows who asked for this rule people might advocate keeping it anyway! Heck even if you find the person who wanted the feature and they are no longer at the company, and no one else uses it, people might advocate keeping it anyway!

The result of just-keep-it thinking is you end up keeping around code you won't use, which prevents you from easily adding new code. How many times have your heard someone say, 'Project X (scoff)!? That thing is a mess! I can re-write that in scala-on-rails in 3 days'. 4 weeks later when Project X on-scala-on-rails is released a customer contacts you about how they were affected because some small business rule was not ported correctly due to an over-site.

The solution to these over-sites is not test-coverage or sprints dedicated to removing tech-dept. The solution is never to make a bad deal. Do not write software with niche cases. Do not write software with surprising rules. The way I do this is a mental litmus test: Take the exit criteria of an issue and ask yourself, "Will I remember this rule in one year". If someone asks you to implement something and you realize it was implemented a year ago and no one ever used it, push back let them know the software has already gone in this direction and it led no where. If your a business and your struggling to close deals because the 'tech people' can not implement X in time, close a deal that does not involve X.

'No deal' is better than a 'Bad Deal'

'No code' is better than 'Bad Code'

'No feature' is better than 'Bad Feature' 



Saturday Dec 26, 2015

Introducting TUnit

Some of my unit tests have annoying sleep statements in them. I open sourced TUnit for changing this.

The old way:

Assert.assertEquals(2 , s[2].getClusterMembership().getLiveMembers().size());

The new way:

TUnit.assertThat( new Callable(){
public Object call() throws Exception {
return s[2].getClusterMembership().getLiveMembers().size();
}}).afterWaitingAtMost(11, TimeUnit.SECONDS).isEqualTo(2);  

You can see this in action here.

Tuesday Dec 15, 2015

I am highly available



Monday Dec 14, 2015

Mounting a come back!

Hey all! It has been a long time. Well if you don't know, my wife Stacey and I had a baby boy Ian! 

Well besides that I am gearing up for the next teknek release. With some cleanups I also replaced the crappy zookeeper lock recipe and added curator

Sunday Jul 12, 2015

Why Hive on Cloudera is like Python on Redhat

I used to be fairly anti-cloudera. I was never really convinced you needed someone to package up hadoop for you and your admins should just learn it. These days Hadoop is N degrees harder and I don't really have as much give-a-crap for learning to configure all the nobs that change names all the time. Thus I am more or less happy to let cloudera handle installing the 9000 hadoop components.

But really cloudera's testing is not that great. In my last version of cdh, decomissioning NodeManagers causes yarn to stop accepting jobs. ::Major fail:: Upgrade and in the new version the version hive can not support custom hive serde's because of an upstream Hive bug.

Filed this to CDH user:


Got the ::cricket:: response

Thinking of getting away from CDH hive at this point. Why?

  1. Waited a long time for this so I could easily build in tez support
  2. Still no out of the box tez support even though its clearly the way forward (and would make everything umpteeth times faster)
  3. Does not really look like cloudera can/wants to keep up with Hive's release cycle
  4. Sabotaging features by adding check boxes and disabling things that work out of the box "Check the box for Enable Hive on Spark (Unsupported)."
  5. Constant complaints in manager that you should have a metastore server or should have zookeeper when truth is most users wont need either. (and I sure do not need this)
  6. N day wait to cofirm bugs, "Whenever we get to it" fixes
  7. 1 zillion unneeded jars in classpath , hbase etc that Im not actually using with hive.

Im tired of dealing with backreved revsions and cloudera's "Why aren't you just using impala" type stance.

I am going back to rolling my own. I will still use cdh to manager hdfs proper and YAWN, but this hive situation is unmanagable. Hive on cloudera is like Python on Redhat 5. You are painted into an annoying box and you have no direct way to make it better other than ignoring it entirely and rolling your own!

Friday Apr 17, 2015

Triggers and Coprocessors - Building a NoSQL store - Part 14

Hello again! The last blog in this series was about cleanup compaction. While cleanup and compaction is interesting I do not think it has that web scale 'pop'. Definitely not sexy enough for Nibiru, the worlds first Internet of Things NoSql. I decided to treat myself and do something fun, so I decided now would be a good time to build triggers/coprocessor support.

We might as well start by defining some terminology. Many databases have trigger support, typically a trigger is a type of insert or update query that happens inside the RDBMS as a result of another insert or update operation. I first saw the term co-processor used in Google's BigTable white paper. Hbase is an open source implementation based on the BigTable spec has different types of coprocessors.

HBase has a region server that serves the region (shard), and replication is provided by the file system. In the Cassandra/Dynamo style a row key has multiple natural endpoints, no replicated file system, and the system needs to actively execute the operation across N replicas as we showed here.

Triggers/CoProcessors were batted around with Cassandra for a while. The implementation can be debated, for example should the trigger run be closer to the storage layer or closer to the coordinator level? Unlike Hbase where we can be sure one region server is "in charge" of a key, we would need a distributed locking mechanism to be "in charge" of a key in Cassandra and distributed locking is "heavy". Another potential implementation would be leveraging idempotent and retry-able operations like writes and deletes with timestamps. There are probably other ways to go about triggers as well.

Pick your poison

I decided to take the approach of coordinator triggers. In a previous blog we showed the coordinator is the piece that receives the request from the client and dispatches it to multiple servers. The good parts of this implementation are we can easily hook into the code before the result is returned to the client. The downside is that the trigger could timeout after the initial user operation (and it can not be easily unrolled if we wanted to try that). Maybe in a later blog we can build triggers closer to the storage layer.

The next code samples are all part of this commit.

Let's create an emum to describe the trigger levels.

public enum TriggerLevel {
/** Request will block while trigger is executing, trigger can timeout, **/
/** Request will not block while trigger is executing.
* Triggers operations may be dropped if back pressure**/
/** Request will not block while trigger is executing.
* Trigger operations retry, potentially later */

public class TriggerDefinition {
private TriggerLevel triggerLevel;
private String triggerClass;

Next, the user needs an interface to plug the trigger logic into. We give the user access the message, the response, and the server. In most cases we have avoided passing the Server to make interfaces very discrete, but here we are going for flexible.

public interface CoordinatorTrigger {
void exec(Message message, Response response, Server server);

Next, we can build a component to execute triggers. We are only going to build the blocking case for now, but implementing the non blocking case will not be a hard since we are using a callable.

package io.teknek.nibiru.trigger;

public class TriggerManager {

private ExecutorService executor;
private final Server server;

public TriggerManager(Server server){
this.server = server;

public Response executeTriggers(final Message message, final Response response, Keyspace keyspace,
Store store, long timeoutInMs, long requestStart){
long now = System.currentTimeMillis();
for (TriggerDefinition d : store.getStoreMetadata().getCoordinatorTriggers()){
if (d.getTriggerLevel() == TriggerLevel.BLOCKING){
long remaining = (requestStart + timeoutInMs) - now;
if (remaining > 0){
final CoordinatorTrigger ct = getReusableTrigger(d);
Callable<Boolean> c = new Callable<Boolean>(){
public Boolean call() throws Exception {
ct.exec(message, response, server);
return Boolean.TRUE;

Future<Boolean> f = null;
try {
f = executor.submit(c);
Boolean b = f.get(remaining, TimeUnit.MILLISECONDS);
if (!b.equals(Boolean.TRUE)){
return new Response().withProperty("exception", "trigger returned false");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return new Response().withProperty("exception", "trigger exception " + e.getMessage());
return response;


The last piece is to call this code from the coordinator, only after a successful request.

    if (ColumnFamilyPersonality.PERSONALITY.equals(message.getPersonality())) {
LocalAction action = new LocalColumnFamilyAction(message, keyspace, columnFamily);
ResultMerger merger = new HighestTimestampResultMerger();
Response response = eventualCoordinator.handleMessage(token, message, destinations,
timeoutInMs, destinationLocal, action, merger, getHinterForMessage(message, columnFamily));
if (!response.containsKey("exception")){
response = triggerManager.executeTriggers(message, response, keyspace, columnFamily, timeoutInMs, requestStart);

return response;

Lets get testing

A typical use case for triggers is building a reverse index during an insert. For each insert to a column family named Pets we will check to see if the column name is "age". If the column name matches we make another insert into another column family that organized the data by age.

  public static class PetAgeReverseTrigger implements CoordinatorTrigger {
public void exec(Message message, Response response, Server server) {
String column = (String) message.getPayload().get("column");
String value = (String) message.getPayload().get("value");
String rowkey = (String) message.getPayload().get("rowkey");
if ("age".equalsIgnoreCase(column)){
Message m = new Message();
m.setPayload( new Response().withProperty("type", "put")
.withProperty("rowkey", value)
.withProperty("column", rowkey)
.withProperty("value", "")
.withProperty("time", System.currentTimeMillis())

To test:

  1. create a column family for the reverse index
  2. Add the trigger to the pets column family
  3. Insert some entries with age columns
  4. verify the reverse index is now populated
  public void reverseIndexTrigger() throws ClientException{

MetaDataClient meta = new MetaDataClient(server.getConfiguration().getTransportHost(),
new Response().withProperty(StoreMetaData.IMPLEMENTING_CLASS,
DefaultColumnFamily.class.getName())); //1

TriggerDefinition td = new TriggerDefinition();
List<TriggerDefinition> defs = server.getKeyspaces().get(TestUtil.DATA_KEYSPACE).getStores()
defs.add(td); //2

ColumnFamilyClient client = new ColumnFamilyClient( new Client(server.getConfiguration().getTransportHost(),

Session s = client.createBuilder().withKeyspace(TestUtil.DATA_KEYSPACE).withStore(TestUtil.PETS_COLUMN_FAMILY).build();
s.put("rover", "age", "5", 1L);
s.put("sandy", "age", "3", 1L);
s.put("spot", "age", "5", 1L); //3

Session s1 = client.createBuilder().withKeyspace(TestUtil.DATA_KEYSPACE).withStore(PET_AGE_CF).build();
SortedMap<String,Val> res = s1.slice("5", "a", "zzzzzzzzzzzzzzzzz");
Assert.assertEquals(2, res.size());
Assert.assertEquals("rover", res.firstKey());
Assert.assertEquals("spot", res.lastKey()); //4



Triggers handle tasks without building logic into the client application. They can also optimize processes that would potentially involve multiple client server exchanges.

Monday Apr 13, 2015

Great NoSQL-isms of the 34th and a half century

So someone just sent me this page on elastic search.

One of the best NoSql-isms is when someone tells you about some elaborate feature, next they tell you NOT to use it. EVER!

Here is an example:

The second workaround is to add ?search_type=dfs_query_then_fetch to your search requests. The dfs stands for Distributed Frequency Search, and it tells Elasticsearch to first retrieve the local IDF from each shard in order to calculate the global IDF across the whole index

Sounds great! Until you read the next advice:

Don’t use dfs_query_then_fetch in production. It really isn’t required. Just having enough data will ensure that your term frequencies are well distributed. There is no reason to add this extra DFS step to every query that you run.



Wednesday Apr 08, 2015

Cleanup compaction - Building a NoSQL store - Part 13

In our last blog we showed how we can have nodes dynamically join our cluster to achieve a web scale, Internet Of Things, (Internet Of Things is now a buzzword I say once an hour) NoSql database. That blog had an incredible info graphic that demonstrated what happens when a node joins our cluster. Here it is again in techNoSQLcolor:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1|

If you add a second node at position 50:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 | node2 node2 node2 node2 node2   |

Now, remember our data files are write once so we can't change them after the fact. After the split, requests that get sent to node1 are cut in half, but the data files on node1 contain more data than they need to. What we need is a way to remove all the data that is on a node that is no longer needed. 

Cassandra has a command for this called 'cleanup' that needs to be run on each node. The theory, in the olden days, a node join could go bad in some way and the system could be "recovered" by manually adjusting the tokens on each nodes and doing various repair process. In practice not many people (including myself) know exactly what to do when node joins go wrong, adjust tokens, move files, run repairs? The system SHOULD be able to automatically remove the old data, but no one has gotten to this yet as far as I can tell.

To handle cleanup we need two things:

  1. A command that can iterate the data files (SsTables) and remove data that no longer belongs on the node.
  2. A variable that can control allow normal compaction processes to cleanup data automatically.

You may want to look back at our previous blog on compaction to get an idea of how we merge SsTables.

Lets get to it

We are going to enhance the compaction process to handle this special case. First, we have a boolean that controls cleanup. If the token does not belong on this node we do not write it during compaction.

      if (cleanOutOfRange){
if (coordinator.destinationsForToken(lowestToken, keyspace).contains(coordinator.getDestinationLocal())){
newSsTable.write(lowestToken, allColumns);
} else {
newSsTable.write(lowestToken, allColumns);

Cleanup is simple, unlike normal compaction we do not have to merge multiple tables together (we could however). One table in makes one table out.

  public void cleanupCompaction(Keyspace keyspace, DefaultColumnFamily defaultColumnFamily){
Set<SsTable> tables = new TreeSet<>(defaultColumnFamily.getSstable());//duplicate because we will mutate the collection
for (SsTable table : tables){
String newName = getNewSsTableName();
try {
SsTable [] ssArray = {table};
SsTable s = compact(ssArray, newName, server.getServerId(), server.getCoordinator(), true, keyspace);
//todo delete old
} catch (IOException e) {
throw new RuntimeException(e);

Testing time

For our coordinator, we made more of an integration test by launching a second server and joining it to the first. I typically like to be about 20% integration tests, 80% unit tests, and 0% mock tests. Why do I take this approach?

First, I believe mock tests are cheating. That is not say that mocking does not have it's uses, but I feel it is used to cover up code smells. If you have a good design and good API, not much mocking should not be needed. Integration tests are good at proving the entire process can run end-to-end, but they are long and redundant.

Unit tests do two some imporant for me: they test things, and work like tripwire for bad code and bad assumptions, they document things. They document things because they show what components should do. They tell a story.

The story for cleanup is simple: A system has some data on disk. After topology changes (like node join or leave or change of replication factor) some of that data is no longer required to be on a given node and can be removed.

I wrote this test by hiding code in methods with friendly names that say what they are doing. It is a little cute I know but why not? We insert 10 rows directly to the server to start things off. When the test is done only 1 of the 10 rows should still be on disk.

public void cleanupTest() throws IOException, InterruptedException, ClientException {
for (int i = 0; i < 9; i++) {
server.put(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, i+"", "age", "4", 1);

forceFlushAndConfirmFilesOnDisk(server); //flush memtables to disk
changeTheRouter(server); // change the routing information to simulate topology changes
assertSomeDatum(server); // assert data is on disk
runCleanup(server); // run cleanup
assertDatumAfterCompaction(server); //assert some data has been removed

Rather than writing a long involved integration test to move data off the node, we implement a router that routes token "1" locally and routes everything else nowhere! This way when we Cleanup the data everything else should go. (No need for mocking libraries, just good old Object Oriented Design)

  public static class OnlyTheBestRouter implements Router {
public List<Destination> routesTo(ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
if (token.getRowkey().equals("1")){
Destination d = new Destination();
return Arrays.asList(d);
return Arrays.asList();

 This method installs the router in place of the default router which writes everything locally.

  private void changeTheRouter(Server s) throws ClientException{
MetaDataClient metaDataClient = new MetaDataClient(s.getConfiguration().getTransportHost(), s
new Response().withProperty(KeyspaceMetaData.ROUTER_CLASS, OnlyTheBestRouter.class.getName()), true);

This method runs the cleanup.

  private void runCleanup(Server s){
CompactionManager cm = ((CompactionManager) s.getPlugins().get(CompactionManager.MY_NAME));
cm.cleanupCompaction(s.getKeyspaces().get(TestUtil.DATA_KEYSPACE), (DefaultColumnFamily)

After the cleanup only columns for the row key/token "1" should be present and all others should be missing.

  private void assertDatumAfterCompaction(Server s){
Assert.assertEquals(null, s.get(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, "3", "age")); //gone!
String res = ((ColumnValue) s.get(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, "1", "age")).getValue(); //still there!
Assert.assertEquals("4", res);

Wrap up

There you have it, cleanup, the name say it all. Sexy and automatic!

Thursday Apr 02, 2015

Elastic node scale up - Building a NoSQL store - Part 12

The last major feature I added to Nibiru was adding Hinted Handoff, an optimization used to re-deliver lost messages and reduce entropy from some natural endpoints missing writes. I sat a while trying to decide what to do next...

I thought to myself, "Nibiru has code to route request, and a CLI to read and write data, but is it web scale? After all anyone could just write a library that hashes data around!"

The answer was no and I decided it was time to change that. Lets make this biznotch auto-scale.


Every NoSQL has a different wrinkle on how it does things. The Dynamo style databases like Cassandra and Riak create a Token(by a hash) for the data and use that information to route requests. A previous blog demonstrated how Nibiru implements this type of Consistent Hashing using  Request Routing.

Thus far we have implemented an Eventually Consistent, ColumnFamily store in Nibiru which does not use shared storage. This means that growing the cluster involves physically moving data between nodes in a way that no data is unavailable during the transition period.

How can we do this? If a token is calculated from a user data with a hash you might expect that changing the number of nodes results in almost all the data having to be moved around, like when a hash map is resized. But luckily we can avoid moving the majority of data by using something like consistent hashing. You can easily find literature on this and even cool interactive presentations, but I will explain it my way, with ascii art and crude examples.

Things hash as I explained here:

10 MOD 7. Replication 3

But lets forget about MOD 7. Instead we hash something into a fixed space. The size of this hash space will never change. To keep it simple assume the space is 0 (inclusive)  to 100 (exclusive).

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100

If you have one node :

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1|

If you add a second node at position 50:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 | node2 node2 node2 node2 node2   |

Half of the data stays in place, and half needs to move to the new node. Lets add a third node at position 25

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node3 node3 node3 | node1 node1 node1 | node2 node2 node2 node2 node2  |

Node 1 'gave away' some of it's range, but node2 did not. The lesson is each time we add nodes we split the hash space, not rehash.

This is how we scale, each new node reduces the burden on the rest of the nodes.


Let's call the new node that is joining a PROTEGE. These are things to consider for a PROTEGE.

  1. DO NOT want to send reads to that node (because new data may still be moving to it)
  2. DO want it receive new writes/deletes/updates for the section of data it will be responsible for

The protege is taking data from another node. Let's call that node the SPONSOR. For a sponsor:

  1. DO want to write locally until the PROTEGE is fully joined because if the join fails we do not want to lose data
  2. DO want to send portions of locally data to protege

(Another way to implement this would be to track separately the JOINING node(s). On the write path the system would be writing to (Replication Factor ) + 1 nodes but only reading from (REPLICATION FACTOR) nodes)


First we build an internode client. The protege uses it to initiate the join request. Basically the request says, "Hey node [$Sponsor]! I notice you have a lot of data and I would like you to dived your data by  $[token] and I will take over one part of that"

  public void join(String keyspace, String sponsorHost, String wantedToken){
InternodeClient internodeClient = new InternodeClient(sponsorHost, configuration.getTransportPort());
internodeClient.join(keyspace, sponsorHost, serverId, wantedToken, configuration.getTransportHost());

That message gets transmitted to the sponsor and makes its way to a handler class. The way I implemented this we only handle a single protege at a time to keep the process sane. At a high level we need to:

  1. Atomically set the protege
  2. Start a thread that will
    1. replicate meta data (keyspaces and store definitions) to the new node
    2. replicate data (data inside keyspaces and store definitions) to the new node
    3. update the metadata so the cluster is aware of the new node
    4. remove the protege
   public Response handleSponsorRequest(final Message message){
final String requestId = (String) message.getPayload().get("request_id");
final String joinKeyspace = (String) message.getPayload().get("keyspace");
final String wantedToken = (String) message.getPayload().get("wanted_token");
final String protogeHost = (String) message.getPayload().get("transport_host");
final Destination protegeDestination = new Destination();
final MetaDataClient metaDataClient = getMetaClientForProtege(protegeDestination);

boolean res = protege.compareAndSet(null, protegeDestination);
if (!res){
return new Response().withProperty("status", "fail")
.withProperty("reason", "already sponsoring") ;

Thread t = new Thread(){
public void run(){
InternodeClient protegeClient = new InternodeClient(protogeHost, server.getConfiguration().getTransportPort());
Keyspace ks = server.getKeyspaces().get(joinKeyspace);
replicateData(protegeClient, ks);
updateTokenMap(ks, metaDataClient, wantedToken, requestId);
try {
Thread.sleep(10000); //wait here for propagations
} catch (InterruptedException e) { }
protege.compareAndSet( protegeDestination, null);

return new Response().withProperty("status", "ok");

So easy right? JK this process was a beast to write. It was only after I had it all done that I made it look purdy like this.

Lets dive into the piece that replicates the data: We need to move data from the sponsor to the protege. To do this we:

  1. Flush memtables to sstables (memtables and sstable are described here)
  2. For each Store in keyspace
    1. For each SStable in store
      1. Copy table

  private void replicateData(InternodeClient protegeClient, Keyspace ks){
for (Entry<String, Store> storeEntry : ks.getStores().entrySet()){
if (storeEntry.getValue() instanceof DefaultColumnFamily){
DefaultColumnFamily d = (DefaultColumnFamily) storeEntry.getValue();
String bulkUid = UUID.randomUUID().toString();
for (SsTable table : d.getSstable()){
protegeClient.createSsTable(ks.getKeyspaceMetaData().getName(), d.getStoreMetadata().getName(), bulkUid);
try {
SsTableStreamReader stream = table.getStreamReader();
Token token = null;
while ((token = stream.getNextToken()) != null){
SortedMap<AtomKey,AtomValue> columns = stream.readColumns();
protegeClient.transmit(ks.getKeyspaceMetaData().getName(), storeEntry.getKey(), token, columns, bulkUid);
} catch (IOException e) {
throw new RuntimeException (e);
protegeClient.closeSsTable(ks.getKeyspaceMetaData().getName(), d.getStoreMetadata().getName(), bulkUid);

The protegeClient is a low level way to transmit SsTables. It is fairly interesting why we need this. In a structure-log-merge system with write once tables deletes are actually a special write called a tombstone. Tombstones mask other columns. No user facing API like (get, or slice) can return a tombstone, so we needed a lower API to get the SsTable files.

We could have gone even lower and simply moved bytes (save that for another day) but this interface also made an attractive bulk load API. WIN! WIN!

The next piece is called after the join is complete. Once we have moved all the data to this new node, we add this node into the token map and send that update around. (*Note assuming one node added at once and no racing meta-data changes. )

  private void updateTokenMap(Keyspace ks, MetaDataClient metaDataClient, String wantedToken, String requestId){
ObjectMapper om = new ObjectMapper();
TreeMap<String,String> t = om.convertValue(ks.getKeyspaceMetaData().getProperties().get(TokenRouter.TOKEN_MAP_KEY),
t.put(wantedToken, requestId);
Map<String,Object> send = ks.getKeyspaceMetaData().getProperties();
send.put(TokenRouter.TOKEN_MAP_KEY, t);
try {
metaDataClient.createOrUpdateKeyspace(ks.getKeyspaceMetaData().getName(), send, true);
} catch (ClientException e) {
throw new RuntimeException(e);

Great! so this covers getting the already existing data to the new node. But what about the new mutations (writes and deletes) that happen while the join is going on? This is handled by a small addition in our normal write path. If we have a protege and the write is going to us we also send it to the protege!

    if (sponsorCoordinator.getProtege() != null && destinations.contains(destinationLocal)){
String type = (String) message.getPayload().get("type");
if (type.equals("put") || type.equals("delete") ){

This all seems bad ass are you sure this actually works?

I am sure there are some kinks to work out :), but yes it can be demonstrated in testing.

What we did here is:

  1. start a single node
  2. Insert 10 rows
  3. join a second node
  4. insert some more rows
  5. check that data is being divided across nodes
ColumnFamilyClient c = new ColumnFamilyClient(servers[0].getConfiguration().getTransportHost(), servers[0]
Session session = c.createBuilder().withKeyspace("abc")
.withWriteConsistency(ConsistencyLevel.ALL, new HashMap())
.withReadConsistency(ConsistencyLevel.ALL, new HashMap())
for (int k = 0; k < 10; k++) {
session.put(k+"", k+"", k+"", 1);

Assert.assertEquals(servers[0].getClusterMembership().getLiveMembers().size(), 1);

servers[1].join("abc", "", "5");



private void insertDataOverClient(Session session) throws ClientException {
session.put("1", "1", "after", 8);
session.put("7", "7", "after", 8);
session.put("11", "11", "after", 8);

private void assertDataIsDistributed(Server [] servers){
Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "11" , "11")).getValue());
Assert.assertEquals("after", ((ColumnValue) servers[0].get("abc", "def", "1" , "1")).getValue());

Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "11" , "11")).getValue());
Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "1" , "1")).getValue());


Whew!  There you go! Web scale! Elastic! Auto Resharding NoSQL action!

Tuesday Mar 17, 2015

Nibiru has a CLI!

Ow yea!

connect connect 7070
createcolumnfamily data pets
describekeyspace data
use data pets
set jack type bunny
get jack type
Val [value=bunny, time=1426649553036000, createTime=1426649553120, ttl=0]

Ow yea!