Edward Capriolo

Friday Jan 22, 2016

My day

[edward@bjack event-horizon-app]$ git log
commit 9de21fbc97a7f573f6b0564daff20f5ce23c723e
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 16:14:20 2016 -0500

    Ow yes yaml cares about spaces...beacause ansible

commit de07401a0087e86253cbf9c0369010e21d248eb9
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 16:10:57 2016 -0500

    Why not

commit 0be598151962f647528406bad21b3b8c8e887ffd
Author: Edward Capriolo <edward.capriolo@com>
Date:   Fri Jan 22 16:05:06 2016 -0500

    This is soo much better than just writing a shell script

commit 4f4ea0b8b462a61e3ecde71ff656da9e1324095b
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 16:01:53 2016 -0500

    Why dont we have a release engineer

commit b77264618f2fbe689ecc09e4575e10935ba20600
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 15:57:56 2016 -0500

    bla

commit 912597f1ba4284a5312398ad770f6fd1d76301a1
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 15:52:21 2016 -0500

    The real yaml apparently

commit ee64c5c4340202b95a0f05784f30b63abd755d2d
Author: Edward Capriolo <edward.capriolo@.com>
Date:   Fri Jan 22 15:32:28 2016 -0500

    Always asume kill worked. so we can start if nothing is running

Tuesday Jan 12, 2016

'No deal' is better than a 'Bad Deal'

After working for a few companies a few things have become clear to me. Some background, I have been at small companies with no code, large companies with little code, small companies with a lot of code, and large companies where we constantly re-write the same code. 

I was watching an episode of 'shark tank'. Contestant X had a product, call it 'Product X', and four of the five sharks offered nothing. The 5th shark, being very shark like, used this opportunity to offer a 'bad' deal. The maker of 'Product X' thought it over, refused the deal, and left with no deal. The other sharks were more impressed with 'Contestant X' than Product X'. They remarked that , "No deal is better than a Bad Deal". This statement is profound and software products should be managed the same way.

Think about the phrase tech-debt. People might say tech-debt kills your agility. But it is really not the tech-debt alone that kills your agility, it is 'bad deals' that lead to tech debt. As software gets larger it becomes harder to shape and harder to manage. At some point software becomes very big, and change causes a cascade of tech debt. Few people want to remove a feature. Think about Mokeys on a Ladder, and compare this to your software. Does anyone ever ask you to remove a feature? Even if something is rarely used or never used someone might advocate keeping it, as it might be used later. Removing something is viewed as a loss, even if it really is addition by subtraction. Even if no one knows who asked for this rule people might advocate keeping it anyway! Heck even if you find the person who wanted the feature and they are no longer at the company, and no one else uses it, people might advocate keeping it anyway!

The result of just-keep-it thinking is you end up keeping around code you won't use, which prevents you from easily adding new code. How many times have your heard someone say, 'Project X (scoff)!? That thing is a mess! I can re-write that in scala-on-rails in 3 days'. 4 weeks later when Project X on-scala-on-rails is released a customer contacts you about how they were affected because some small business rule was not ported correctly due to an over-site.

The solution to these over-sites is not test-coverage or sprints dedicated to removing tech-dept. The solution is never to make a bad deal. Do not write software with niche cases. Do not write software with surprising rules. The way I do this is a mental litmus test: Take the exit criteria of an issue and ask yourself, "Will I remember this rule in one year". If someone asks you to implement something and you realize it was implemented a year ago and no one ever used it, push back let them know the software has already gone in this direction and it led no where. If your a business and your struggling to close deals because the 'tech people' can not implement X in time, close a deal that does not involve X.

'No deal' is better than a 'Bad Deal'

'No code' is better than 'Bad Code'

'No feature' is better than 'Bad Feature' 

 

 

Saturday Dec 26, 2015

Introducting TUnit

Some of my unit tests have annoying sleep statements in them. I open sourced TUnit for changing this.

The old way:

Thread.sleep(10000);
Assert.assertEquals(2 , s[2].getClusterMembership().getLiveMembers().size());

The new way:

TUnit.assertThat( new Callable(){
public Object call() throws Exception {
return s[2].getClusterMembership().getLiveMembers().size();
}}).afterWaitingAtMost(11, TimeUnit.SECONDS).isEqualTo(2);  

You can see this in action here.

Tuesday Dec 15, 2015

I am highly available

 

https://www.linkedin.com/in/edwardcapriolo


Monday Dec 14, 2015

Mounting a come back!

Hey all! It has been a long time. Well if you don't know, my wife Stacey and I had a baby boy Ian! 



Well besides that I am gearing up for the next teknek release. With some cleanups I also replaced the crappy zookeeper lock recipe and added curator

Sunday Jul 12, 2015

Why Hive on Cloudera is like Python on Redhat

I used to be fairly anti-cloudera. I was never really convinced you needed someone to package up hadoop for you and your admins should just learn it. These days Hadoop is N degrees harder and I don't really have as much give-a-crap for learning to configure all the nobs that change names all the time. Thus I am more or less happy to let cloudera handle installing the 9000 hadoop components.

But really cloudera's testing is not that great. In my last version of cdh, decomissioning NodeManagers causes yarn to stop accepting jobs. ::Major fail:: Upgrade and in the new version the version hive can not support custom hive serde's because of an upstream Hive bug.

Filed this to CDH user:

https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/tTHw8kfanqQ

Got the ::cricket:: response

Thinking of getting away from CDH hive at this point. Why?

  1. Waited a long time for this so I could easily build in tez support
  2. Still no out of the box tez support even though its clearly the way forward (and would make everything umpteeth times faster)
  3. Does not really look like cloudera can/wants to keep up with Hive's release cycle
  4. Sabotaging features by adding check boxes and disabling things that work out of the box "Check the box for Enable Hive on Spark (Unsupported)."
  5. Constant complaints in manager that you should have a metastore server or should have zookeeper when truth is most users wont need either. (and I sure do not need this)
  6. N day wait to cofirm bugs, "Whenever we get to it" fixes
  7. 1 zillion unneeded jars in classpath , hbase etc that Im not actually using with hive.

Im tired of dealing with backreved revsions and cloudera's "Why aren't you just using impala" type stance.

I am going back to rolling my own. I will still use cdh to manager hdfs proper and YAWN, but this hive situation is unmanagable. Hive on cloudera is like Python on Redhat 5. You are painted into an annoying box and you have no direct way to make it better other than ignoring it entirely and rolling your own!

Friday Apr 17, 2015

Triggers and Coprocessors - Building a NoSQL store - Part 14

Hello again! The last blog in this series was about cleanup compaction. While cleanup and compaction is interesting I do not think it has that web scale 'pop'. Definitely not sexy enough for Nibiru, the worlds first Internet of Things NoSql. I decided to treat myself and do something fun, so I decided now would be a good time to build triggers/coprocessor support.

We might as well start by defining some terminology. Many databases have trigger support, typically a trigger is a type of insert or update query that happens inside the RDBMS as a result of another insert or update operation. I first saw the term co-processor used in Google's BigTable white paper. Hbase is an open source implementation based on the BigTable spec has different types of coprocessors.

HBase has a region server that serves the region (shard), and replication is provided by the file system. In the Cassandra/Dynamo style a row key has multiple natural endpoints, no replicated file system, and the system needs to actively execute the operation across N replicas as we showed here.

Triggers/CoProcessors were batted around with Cassandra for a while. The implementation can be debated, for example should the trigger run be closer to the storage layer or closer to the coordinator level? Unlike Hbase where we can be sure one region server is "in charge" of a key, we would need a distributed locking mechanism to be "in charge" of a key in Cassandra and distributed locking is "heavy". Another potential implementation would be leveraging idempotent and retry-able operations like writes and deletes with timestamps. There are probably other ways to go about triggers as well.

Pick your poison

I decided to take the approach of coordinator triggers. In a previous blog we showed the coordinator is the piece that receives the request from the client and dispatches it to multiple servers. The good parts of this implementation are we can easily hook into the code before the result is returned to the client. The downside is that the trigger could timeout after the initial user operation (and it can not be easily unrolled if we wanted to try that). Maybe in a later blog we can build triggers closer to the storage layer.

The next code samples are all part of this commit.

Let's create an emum to describe the trigger levels.

public enum TriggerLevel {
/** Request will block while trigger is executing, trigger can timeout, **/
BLOCKING,
/** Request will not block while trigger is executing.
* Triggers operations may be dropped if back pressure**/
NON_BLOCKING_VOLATILE,
/** Request will not block while trigger is executing.
* Trigger operations retry, potentially later */
NON_BLOCKING_RETRYABLE
}

public class TriggerDefinition {
private TriggerLevel triggerLevel;
private String triggerClass;
}

Next, the user needs an interface to plug the trigger logic into. We give the user access the message, the response, and the server. In most cases we have avoided passing the Server to make interfaces very discrete, but here we are going for flexible.


public interface CoordinatorTrigger {
void exec(Message message, Response response, Server server);
}

Next, we can build a component to execute triggers. We are only going to build the blocking case for now, but implementing the non blocking case will not be a hard since we are using a callable.

package io.teknek.nibiru.trigger;

public class TriggerManager {

private ExecutorService executor;
private final Server server;

public TriggerManager(Server server){
this.server = server;
}

public Response executeTriggers(final Message message, final Response response, Keyspace keyspace,
Store store, long timeoutInMs, long requestStart){
long now = System.currentTimeMillis();
for (TriggerDefinition d : store.getStoreMetadata().getCoordinatorTriggers()){
if (d.getTriggerLevel() == TriggerLevel.BLOCKING){
long remaining = (requestStart + timeoutInMs) - now;
if (remaining > 0){
final CoordinatorTrigger ct = getReusableTrigger(d);
Callable<Boolean> c = new Callable<Boolean>(){
public Boolean call() throws Exception {
ct.exec(message, response, server);
return Boolean.TRUE;
}
};

Future<Boolean> f = null;
try {
f = executor.submit(c);
Boolean b = f.get(remaining, TimeUnit.MILLISECONDS);
if (!b.equals(Boolean.TRUE)){
return new Response().withProperty("exception", "trigger returned false");
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
f.cancel(true);
return new Response().withProperty("exception", "trigger exception " + e.getMessage());
}
}
}
}
return response;
}

}

The last piece is to call this code from the coordinator, only after a successful request.

    if (ColumnFamilyPersonality.PERSONALITY.equals(message.getPersonality())) {
LocalAction action = new LocalColumnFamilyAction(message, keyspace, columnFamily);
ResultMerger merger = new HighestTimestampResultMerger();
Response response = eventualCoordinator.handleMessage(token, message, destinations,
timeoutInMs, destinationLocal, action, merger, getHinterForMessage(message, columnFamily));
if (!response.containsKey("exception")){
response = triggerManager.executeTriggers(message, response, keyspace, columnFamily, timeoutInMs, requestStart);
}

return response;

Lets get testing

A typical use case for triggers is building a reverse index during an insert. For each insert to a column family named Pets we will check to see if the column name is "age". If the column name matches we make another insert into another column family that organized the data by age.

  public static class PetAgeReverseTrigger implements CoordinatorTrigger {
@Override
public void exec(Message message, Response response, Server server) {
String column = (String) message.getPayload().get("column");
String value = (String) message.getPayload().get("value");
String rowkey = (String) message.getPayload().get("rowkey");
if ("age".equalsIgnoreCase(column)){
Message m = new Message();
m.setKeyspace("data");
m.setStore(PET_AGE_CF);
m.setPersonality(ColumnFamilyPersonality.PERSONALITY);
m.setPayload( new Response().withProperty("type", "put")
.withProperty("rowkey", value)
.withProperty("column", rowkey)
.withProperty("value", "")
.withProperty("time", System.currentTimeMillis())
);
server.getCoordinator().handle(m);
}
}
}

To test:

  1. create a column family for the reverse index
  2. Add the trigger to the pets column family
  3. Insert some entries with age columns
  4. verify the reverse index is now populated
  public void reverseIndexTrigger() throws ClientException{

MetaDataClient meta = new MetaDataClient(server.getConfiguration().getTransportHost(),
server.getConfiguration().getTransportPort());
meta.createOrUpdateStore(
TestUtil.DATA_KEYSPACE,
PET_AGE_CF,
new Response().withProperty(StoreMetaData.IMPLEMENTING_CLASS,
DefaultColumnFamily.class.getName())); //1

TriggerDefinition td = new TriggerDefinition();
td.setTriggerClass(PetAgeReverseTrigger.class.getName());
td.setTriggerLevel(TriggerLevel.BLOCKING);
List<TriggerDefinition> defs = server.getKeyspaces().get(TestUtil.DATA_KEYSPACE).getStores()
.get(TestUtil.PETS_COLUMN_FAMILY).getStoreMetadata().getCoordinatorTriggers();
defs.add(td); //2

ColumnFamilyClient client = new ColumnFamilyClient( new Client(server.getConfiguration().getTransportHost(),
server.getConfiguration().getTransportPort()));

Session s = client.createBuilder().withKeyspace(TestUtil.DATA_KEYSPACE).withStore(TestUtil.PETS_COLUMN_FAMILY).build();
s.put("rover", "age", "5", 1L);
s.put("sandy", "age", "3", 1L);
s.put("spot", "age", "5", 1L); //3


Session s1 = client.createBuilder().withKeyspace(TestUtil.DATA_KEYSPACE).withStore(PET_AGE_CF).build();
SortedMap<String,Val> res = s1.slice("5", "a", "zzzzzzzzzzzzzzzzz");
Assert.assertEquals(2, res.size());
Assert.assertEquals("rover", res.firstKey());
Assert.assertEquals("spot", res.lastKey()); //4

}

Conclusion

Triggers handle tasks without building logic into the client application. They can also optimize processes that would potentially involve multiple client server exchanges.

Monday Apr 13, 2015

Great NoSQL-isms of the 34th and a half century

So someone just sent me this page on elastic search.

One of the best NoSql-isms is when someone tells you about some elaborate feature, next they tell you NOT to use it. EVER!

Here is an example:

The second workaround is to add ?search_type=dfs_query_then_fetch to your search requests. The dfs stands for Distributed Frequency Search, and it tells Elasticsearch to first retrieve the local IDF from each shard in order to calculate the global IDF across the whole index

Sounds great! Until you read the next advice:

Don’t use dfs_query_then_fetch in production. It really isn’t required. Just having enough data will ensure that your term frequencies are well distributed. There is no reason to add this extra DFS step to every query that you run.

DONT USE IN PRODUCTION!

ROFLSCALE@! TIPS

Wednesday Apr 08, 2015

Cleanup compaction - Building a NoSQL store - Part 13

In our last blog we showed how we can have nodes dynamically join our cluster to achieve a web scale, Internet Of Things, (Internet Of Things is now a buzzword I say once an hour) NoSql database. That blog had an incredible info graphic that demonstrated what happens when a node joins our cluster. Here it is again in techNoSQLcolor:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1|

If you add a second node at position 50:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 | node2 node2 node2 node2 node2   |

Now, remember our data files are write once so we can't change them after the fact. After the split, requests that get sent to node1 are cut in half, but the data files on node1 contain more data than they need to. What we need is a way to remove all the data that is on a node that is no longer needed. 

------------------------------------------------
Cassandra has a command for this called 'cleanup' that needs to be run on each node. The theory, in the olden days, a node join could go bad in some way and the system could be "recovered" by manually adjusting the tokens on each nodes and doing various repair process. In practice not many people (including myself) know exactly what to do when node joins go wrong, adjust tokens, move files, run repairs? The system SHOULD be able to automatically remove the old data, but no one has gotten to this yet as far as I can tell.
------------------------------------------------

To handle cleanup we need two things:

  1. A command that can iterate the data files (SsTables) and remove data that no longer belongs on the node.
  2. A variable that can control allow normal compaction processes to cleanup data automatically.

You may want to look back at our previous blog on compaction to get an idea of how we merge SsTables.

Lets get to it

We are going to enhance the compaction process to handle this special case. First, we have a boolean that controls cleanup. If the token does not belong on this node we do not write it during compaction.

      if (cleanOutOfRange){
if (coordinator.destinationsForToken(lowestToken, keyspace).contains(coordinator.getDestinationLocal())){
newSsTable.write(lowestToken, allColumns);
}
} else {
newSsTable.write(lowestToken, allColumns);
}

Cleanup is simple, unlike normal compaction we do not have to merge multiple tables together (we could however). One table in makes one table out.

  public void cleanupCompaction(Keyspace keyspace, DefaultColumnFamily defaultColumnFamily){
Set<SsTable> tables = new TreeSet<>(defaultColumnFamily.getSstable());//duplicate because we will mutate the collection
for (SsTable table : tables){
String newName = getNewSsTableName();
try {
SsTable [] ssArray = {table};
SsTable s = compact(ssArray, newName, server.getServerId(), server.getCoordinator(), true, keyspace);
defaultColumnFamily.getSstable().add(s);
defaultColumnFamily.getSstable().remove(table);
//todo delete old
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

Testing time

For our coordinator, we made more of an integration test by launching a second server and joining it to the first. I typically like to be about 20% integration tests, 80% unit tests, and 0% mock tests. Why do I take this approach?

First, I believe mock tests are cheating. That is not say that mocking does not have it's uses, but I feel it is used to cover up code smells. If you have a good design and good API, not much mocking should not be needed. Integration tests are good at proving the entire process can run end-to-end, but they are long and redundant.

Unit tests do two some imporant for me: they test things, and work like tripwire for bad code and bad assumptions, they document things. They document things because they show what components should do. They tell a story.

The story for cleanup is simple: A system has some data on disk. After topology changes (like node join or leave or change of replication factor) some of that data is no longer required to be on a given node and can be removed.

I wrote this test by hiding code in methods with friendly names that say what they are doing. It is a little cute I know but why not? We insert 10 rows directly to the server to start things off. When the test is done only 1 of the 10 rows should still be on disk.

  @Test
public void cleanupTest() throws IOException, InterruptedException, ClientException {
for (int i = 0; i < 9; i++) {
server.put(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, i+"", "age", "4", 1);
}

forceFlushAndConfirmFilesOnDisk(server); //flush memtables to disk
changeTheRouter(server); // change the routing information to simulate topology changes
assertSomeDatum(server); // assert data is on disk
runCleanup(server); // run cleanup
assertDatumAfterCompaction(server); //assert some data has been removed
}

Rather than writing a long involved integration test to move data off the node, we implement a router that routes token "1" locally and routes everything else nowhere! This way when we Cleanup the data everything else should go. (No need for mocking libraries, just good old Object Oriented Design)

  public static class OnlyTheBestRouter implements Router {
@Override
public List<Destination> routesTo(ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
if (token.getRowkey().equals("1")){
Destination d = new Destination();
d.setDestinationId(local.getU().toString());
return Arrays.asList(d);
}
return Arrays.asList();
}
}

 This method installs the router in place of the default router which writes everything locally.

  private void changeTheRouter(Server s) throws ClientException{
MetaDataClient metaDataClient = new MetaDataClient(s.getConfiguration().getTransportHost(), s
.getConfiguration().getTransportPort());
metaDataClient.createOrUpdateKeyspace(TestUtil.DATA_KEYSPACE,
new Response().withProperty(KeyspaceMetaData.ROUTER_CLASS, OnlyTheBestRouter.class.getName()), true);
metaDataClient.shutdown();
}

This method runs the cleanup.

  private void runCleanup(Server s){
CompactionManager cm = ((CompactionManager) s.getPlugins().get(CompactionManager.MY_NAME));
cm.cleanupCompaction(s.getKeyspaces().get(TestUtil.DATA_KEYSPACE), (DefaultColumnFamily)
s.getKeyspaces().get(TestUtil.DATA_KEYSPACE).getStores().get(TestUtil.PETS_COLUMN_FAMILY));
}

After the cleanup only columns for the row key/token "1" should be present and all others should be missing.

  private void assertDatumAfterCompaction(Server s){
Assert.assertEquals(null, s.get(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, "3", "age")); //gone!
String res = ((ColumnValue) s.get(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, "1", "age")).getValue(); //still there!
Assert.assertEquals("4", res);
}

Wrap up

There you have it, cleanup, the name say it all. Sexy and automatic!

Thursday Apr 02, 2015

Elastic node scale up - Building a NoSQL store - Part 12

The last major feature I added to Nibiru was adding Hinted Handoff, an optimization used to re-deliver lost messages and reduce entropy from some natural endpoints missing writes. I sat a while trying to decide what to do next...

I thought to myself, "Nibiru has code to route request, and a CLI to read and write data, but is it web scale? After all anyone could just write a library that hashes data around!"

The answer was no and I decided it was time to change that. Lets make this biznotch auto-scale.

Preamble

Every NoSQL has a different wrinkle on how it does things. The Dynamo style databases like Cassandra and Riak create a Token(by a hash) for the data and use that information to route requests. A previous blog demonstrated how Nibiru implements this type of Consistent Hashing using  Request Routing.

Thus far we have implemented an Eventually Consistent, ColumnFamily store in Nibiru which does not use shared storage. This means that growing the cluster involves physically moving data between nodes in a way that no data is unavailable during the transition period.

How can we do this? If a token is calculated from a user data with a hash you might expect that changing the number of nodes results in almost all the data having to be moved around, like when a hash map is resized. But luckily we can avoid moving the majority of data by using something like consistent hashing. You can easily find literature on this and even cool interactive presentations, but I will explain it my way, with ascii art and crude examples.

Things hash as I explained here:

10 MOD 7. Replication 3
[][][][10][10][10][]

But lets forget about MOD 7. Instead we hash something into a fixed space. The size of this hash space will never change. To keep it simple assume the space is 0 (inclusive)  to 100 (exclusive).

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100

If you have one node :

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1|


If you add a second node at position 50:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 | node2 node2 node2 node2 node2   |

Half of the data stays in place, and half needs to move to the new node. Lets add a third node at position 25

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node3 node3 node3 | node1 node1 node1 | node2 node2 node2 node2 node2  |

Node 1 'gave away' some of it's range, but node2 did not. The lesson is each time we add nodes we split the hash space, not rehash.

This is how we scale, each new node reduces the burden on the rest of the nodes.

Terminology

Let's call the new node that is joining a PROTEGE. These are things to consider for a PROTEGE.

  1. DO NOT want to send reads to that node (because new data may still be moving to it)
  2. DO want it receive new writes/deletes/updates for the section of data it will be responsible for

The protege is taking data from another node. Let's call that node the SPONSOR. For a sponsor:

  1. DO want to write locally until the PROTEGE is fully joined because if the join fails we do not want to lose data
  2. DO want to send portions of locally data to protege

(Another way to implement this would be to track separately the JOINING node(s). On the write path the system would be writing to (Replication Factor ) + 1 nodes but only reading from (REPLICATION FACTOR) nodes)

Implementation

First we build an internode client. The protege uses it to initiate the join request. Basically the request says, "Hey node [$Sponsor]! I notice you have a lot of data and I would like you to dived your data by  $[token] and I will take over one part of that"

  public void join(String keyspace, String sponsorHost, String wantedToken){
InternodeClient internodeClient = new InternodeClient(sponsorHost, configuration.getTransportPort());
internodeClient.join(keyspace, sponsorHost, serverId, wantedToken, configuration.getTransportHost());
}


That message gets transmitted to the sponsor and makes its way to a handler class. The way I implemented this we only handle a single protege at a time to keep the process sane. At a high level we need to:

  1. Atomically set the protege
  2. Start a thread that will
    1. replicate meta data (keyspaces and store definitions) to the new node
    2. replicate data (data inside keyspaces and store definitions) to the new node
    3. update the metadata so the cluster is aware of the new node
    4. remove the protege
   public Response handleSponsorRequest(final Message message){
final String requestId = (String) message.getPayload().get("request_id");
final String joinKeyspace = (String) message.getPayload().get("keyspace");
final String wantedToken = (String) message.getPayload().get("wanted_token");
final String protogeHost = (String) message.getPayload().get("transport_host");
final Destination protegeDestination = new Destination();
protegeDestination.setDestinationId(requestId);
final MetaDataClient metaDataClient = getMetaClientForProtege(protegeDestination);

boolean res = protege.compareAndSet(null, protegeDestination);
if (!res){
return new Response().withProperty("status", "fail")
.withProperty("reason", "already sponsoring") ;
}
protogeToken.set(wantedToken);

Thread t = new Thread(){
public void run(){
InternodeClient protegeClient = new InternodeClient(protogeHost, server.getConfiguration().getTransportPort());
Keyspace ks = server.getKeyspaces().get(joinKeyspace);
replicateMetaData(metaDataClient);//
replicateData(protegeClient, ks);
updateTokenMap(ks, metaDataClient, wantedToken, requestId);
try {
Thread.sleep(10000); //wait here for propagations
} catch (InterruptedException e) { }
protege.compareAndSet( protegeDestination, null);
protogeToken.set(null);

}
};
t.start();
return new Response().withProperty("status", "ok");
}

So easy right? JK this process was a beast to write. It was only after I had it all done that I made it look purdy like this.

Lets dive into the piece that replicates the data: We need to move data from the sponsor to the protege. To do this we:

  1. Flush memtables to sstables (memtables and sstable are described here)
  2. For each Store in keyspace
    1. For each SStable in store
      1. Copy table

  private void replicateData(InternodeClient protegeClient, Keyspace ks){
for (Entry<String, Store> storeEntry : ks.getStores().entrySet()){
if (storeEntry.getValue() instanceof DefaultColumnFamily){
DefaultColumnFamily d = (DefaultColumnFamily) storeEntry.getValue();
d.doFlush();
d.getMemtableFlusher().doBlockingFlush();
String bulkUid = UUID.randomUUID().toString();
for (SsTable table : d.getSstable()){
protegeClient.createSsTable(ks.getKeyspaceMetaData().getName(), d.getStoreMetadata().getName(), bulkUid);
try {
SsTableStreamReader stream = table.getStreamReader();
Token token = null;
while ((token = stream.getNextToken()) != null){
SortedMap<AtomKey,AtomValue> columns = stream.readColumns();
protegeClient.transmit(ks.getKeyspaceMetaData().getName(), storeEntry.getKey(), token, columns, bulkUid);
}
} catch (IOException e) {
throw new RuntimeException (e);
}
protegeClient.closeSsTable(ks.getKeyspaceMetaData().getName(), d.getStoreMetadata().getName(), bulkUid);
}
}
}
}

The protegeClient is a low level way to transmit SsTables. It is fairly interesting why we need this. In a structure-log-merge system with write once tables deletes are actually a special write called a tombstone. Tombstones mask other columns. No user facing API like (get, or slice) can return a tombstone, so we needed a lower API to get the SsTable files.

We could have gone even lower and simply moved bytes (save that for another day) but this interface also made an attractive bulk load API. WIN! WIN!

The next piece is called after the join is complete. Once we have moved all the data to this new node, we add this node into the token map and send that update around. (*Note assuming one node added at once and no racing meta-data changes. )

  private void updateTokenMap(Keyspace ks, MetaDataClient metaDataClient, String wantedToken, String requestId){
ObjectMapper om = new ObjectMapper();
@SuppressWarnings("unchecked")
TreeMap<String,String> t = om.convertValue(ks.getKeyspaceMetaData().getProperties().get(TokenRouter.TOKEN_MAP_KEY),
TreeMap.class);
t.put(wantedToken, requestId);
Map<String,Object> send = ks.getKeyspaceMetaData().getProperties();
send.put(TokenRouter.TOKEN_MAP_KEY, t);
try {
metaDataClient.createOrUpdateKeyspace(ks.getKeyspaceMetaData().getName(), send, true);
} catch (ClientException e) {
throw new RuntimeException(e);
}
}

Great! so this covers getting the already existing data to the new node. But what about the new mutations (writes and deletes) that happen while the join is going on? This is handled by a small addition in our normal write path. If we have a protege and the write is going to us we also send it to the protege!

    if (sponsorCoordinator.getProtege() != null && destinations.contains(destinationLocal)){
String type = (String) message.getPayload().get("type");
if (type.equals("put") || type.equals("delete") ){
destinations.add(sponsorCoordinator.getProtege());
}
}

This all seems bad ass are you sure this actually works?

I am sure there are some kinks to work out :), but yes it can be demonstrated in testing.

What we did here is:

  1. start a single node
  2. Insert 10 rows
  3. join a second node
  4. insert some more rows
  5. check that data is being divided across nodes
...
ColumnFamilyClient c = new ColumnFamilyClient(servers[0].getConfiguration().getTransportHost(), servers[0]
.getConfiguration().getTransportPort());
Session session = c.createBuilder().withKeyspace("abc")
.withWriteConsistency(ConsistencyLevel.ALL, new HashMap())
.withReadConsistency(ConsistencyLevel.ALL, new HashMap())
.withStore("def").build();
for (int k = 0; k < 10; k++) {
session.put(k+"", k+"", k+"", 1);
}

servers[1].init();
Thread.sleep(10000);
Assert.assertEquals(servers[0].getClusterMembership().getLiveMembers().size(), 1);

servers[1].join("abc", "127.0.0.1", "5");
Thread.sleep(1000);
Assert.assertEquals(servers[1].getServerId().getU().toString(),
servers[0].getCoordinator().getSponsorCoordinator().getProtege().getDestinationId());


insertDataOverClient(session);
assertDataIsDistributed(servers);

...
}

private void insertDataOverClient(Session session) throws ClientException {
session.put("1", "1", "after", 8);
session.put("7", "7", "after", 8);
session.put("11", "11", "after", 8);
}

private void assertDataIsDistributed(Server [] servers){
Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "11" , "11")).getValue());
Assert.assertEquals("after", ((ColumnValue) servers[0].get("abc", "def", "1" , "1")).getValue());

Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "11" , "11")).getValue());
Assert.assertEquals("after", ((ColumnValue) servers[1].get("abc", "def", "1" , "1")).getValue());
}

Conclusion

Whew!  There you go! Web scale! Elastic! Auto Resharding NoSQL action!

Tuesday Mar 17, 2015

Nibiru has a CLI!

Ow yea!

connect connect 127.0.0.1 7070
ok>
createcolumnfamily data pets
{}
ok>
describekeyspace data
[pets]
ok>
use data pets
ok>
set jack type bunny
ok>
get jack type
Val [value=bunny, time=1426649553036000, createTime=1426649553120, ttl=0]
ok>

Ow yea!

Saturday Mar 14, 2015

On my fling with the ASF and Cassandra

Nothing good lasts forever your know. At one point I may have been the #1 Apache Cassandra fanboy, buy like many ASF projects they basically reach the point where they stop being a meritocracy and start becoming a bureaucracy. 

A while back I tried to implement this idea in Cassandra that was not even a super complicated one. Do server side code. Like many times in Apache Software open source even if you are 100% willing to do ALL the work and TESTING to see something get done, a crew of people manages to block your ideas. You can read the ticket and read the responses form your own opinion.

Well as usual it takes people a while to come around to an idea. For the ASF this concept is basically once they think it is an idea them came up with...They love it.

Well there you go. Here is another classic Cassandra-isms for you:

In memory column families...https://issues.apache.org/jira/browse/CASSANDRA-1657

Closed: Later

http://www.datastax.com/documentation/datastax_enterprise/4.0/datastax_enterprise/inMemory.html

Great idea! So good we will make everyone pay for it as a secret sauce closed source feature!

Thursday Mar 12, 2015

Battling entropy with Hinted Handoff - Building a NoSQL store - Part 11

It has been nearly a month since my last NoSQL blog. :( Sorry about that. I actually spent some time working on my stream processing platform teknek. And yes in my free time I work on my own stream processing platform and my own nosql, and no, there is no support group I know of that can help me.

My last blog covered eventual consistency and CAP, so it makes sense to move entropy resolution. CAP is typically described as a tradeoff off between the three features Consistency, Availability, and Partition Tolerance. Eventually Consistent systems favor A. If you remember we demonstrated a case where we wrote data at different Consistency Levels and we observed what is returned to a client.  Let's describe a situation where entropy happens:

Imagine we are writing at Consistency Level ONE. The coordinator needs only wait for 1 ACK before returning success to the client:

client ->  serverX --->|
server3 |
server4 <---|
server5 <---|
server6 <---|

This is the normal case:


client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 -------------------->|
server6 -------------------->|

But imagine this happens:


client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 (brain fart) ?|
server6 -------------------->|

The system now has entropy, because we do not know if server5 died permanently, or if someone tripped over the Ethernet cable in the data center and is rushing to plug it back in. The write could have completed and the server died before the reply or not.

One way to combat entropy is to read from multiple places like reading from 2 nodes, a quorum of nodes, or ALL the nodes that have the data. Reads tend to be expensive and reading from multiple places multiplies the expense.

Hinting

Another way to deal with failed write/delete operations is to store a Hint (commonly called Hinted Handoff). Remember the column with the highest timestamp wins, so if writes are delivered late or are of order the final result is the one with the highest timestamp. To implement Hints we need to track which writes fail or timeout and put them in a place where they can be redelivered later. You can almost think of Hints as a queue of delayed writes.


I have refactored the EventualCoordinator slightly from the last blog. The keys aspects are the same. We have an ExecutorCompletionService completionService that has futures handling writes to networked nodes.

    else if (c.getLevel() == ConsistencyLevel.N) {
      Response response = handleN(start, deadline, completionService, destinations, merger, message, c); //1
      if (hinter != null) {
        maybeSendHints(remote, remoteFutures, start, deadline, hinter); //2
      }
      return response;
    }  

This part is going to bend your mind a bit. For replication THREE we would have THREE futures. Each one handling the write operation on a different server in the cluster. Lets assume the user has specified Consistency N as ONE. This means that the method handleN will return as soon as 1 future is complete. The other futures may also have returned or they may still be in the process. We should not consider them failed and create a hint yet, because we should give them up to the normal timeout window to complete. We do not want to block this coordinator thread anymore because the consistency level is met.

Since we do not want to block the client request anymore we need to handle the events still pending. We made an executor service 'lastChance' for this. RemoteMessageCallable is an abstraction I made that tracks the original message, which destination it was sent to, and provides a field so we can tell if the future logically finished all its work.

Again, we do not want maybeSendHints to block, because we have to return to the client.

private ExecutorService lastChance;

private void maybeSendHints(List<RemoteMessageCallable> remotes,
List<Future<Response>> remoteFutures, long start, long deadline, final Hinter hinter) {
for (int i = 0; i < remotes.size(); i++) {
final RemoteMessageCallable remote = remotes.get(i);
final Future<Response> future = remoteFutures.get(i);
//the future is done but the method must have throw exception we need a hint here
if (!remote.isComplete() && future.isDone()) {
hinter.hint(remote.getMessage(), remote.getDestination());
}

if (!remote.isComplete() && !future.isDone()){
final long remaining = deadline - System.currentTimeMillis();
//Not complete and not exception but time is up we need a hint here
if (remaining <= 0){
hinter.hint(remote.getMessage(), remote.getDestination());
} else {
//Not complete and not exception but not past the deadline either
//put request on lastChance executor so we do not block
lastChance.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
//here we block, than hint on failure :)
future.get(remaining, TimeUnit.MILLISECONDS);
if (!remote.isComplete()){
hinter.hint(remote.getMessage(), remote.getDestination());
}
return null;
}
});
}
}
}
}

Whew! When I write something like that I never know if I am the smartest guy in the world or if it is a fork bomb. I think it is l33t hacking... The next pieces of the puzzle are a little easier to grock.

Hinter

The first thing we will do is automatically create a column family to store our hints.

private void addSystemKeyspace(){
    Keyspace system = new Keyspace(configuration);
    KeyspaceMetaData ksmd = new KeyspaceMetaData();
    ksmd.setName("system");
    ksmd.setPartitioner( new NaturalPartitioner());
    ksmd.setRouter(new LocalRouter());
    system.setKeyspaceMetadata(ksmd);
    {
      Map<String,Object> properties = new HashMap<>();
      properties.put("implementing_class", DefaultColumnFamily.class.getName());
      system.createColumnFamily("hints", properties);
    }
    server.getKeyspaces().put("system", system);
  }

We had writes that did not complete, and what we want to do is put the message on a queue to be delivered later. We use destination as a row key, make the column a uuid, and store the message as JSON text in the value.

public class Hinter {

private final AtomicLong hintsAdded = new AtomicLong();
private final ColumnFamilyPersonality hintsColumnFamily;
private ObjectMapper OM = new ObjectMapper();

public Hinter(ColumnFamilyPersonality person){
this.hintsColumnFamily = person;
}

public void hint(Message m, Destination destination) {
try {
hintsColumnFamily.put(destination.getDestinationId(), UUID.randomUUID().toString(),
OM.writeValueAsString(m), System.currentTimeMillis() * 1000L);
hintsAdded.getAndIncrement();
} catch (IOException e) {}
}

}

This allows us to ask the data store "give me all the hints for server X". (This can be modeled other ways) This shows you how simple, flexible, and eloquent the ColumnFamily model is.

Hint Replayer

I just happened to read a post that suggested that class names ending in 'er' are bad ideas. Well fuck that noise! jk. You may remember that we have built plugable cluster membership into Nibiru. ClusterMembership allows us know if the other nodes in the cluster are up or down. We will implement the HintReplayer as a thread that:

  1. scans the list of cluster members that are ALIVE.
  2. If the host is ALIVE we see if we have any Hints for that node.
  3. If there are hints deliver them and then delete the hint
public class HintReplayer extends AbstractPlugin implements Runnable {

ObjectMapper om = new ObjectMapper();

public static final String MY_NAME = "hint_replayer";
private volatile boolean goOn = true;
private ColumnFamilyPersonality hintCf;

public HintReplayer(Server server) {
super(server);
}

@Override
public String getName() {
return MY_NAME;
}

@Override
public void init() {
hintCf = Coordinator.getHintCf(server);
runThread = new Thread(this);
runThread.start();
}

@Override
public void run() {
while (goOn){
List<ClusterMember> live = server.getClusterMembership().getLiveMembers();//1
for (ClusterMember member : live){
SortedMap<AtomKey,AtomValue> results = hintCf.slice(member.getId(), "", "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"); //2
if (results.size() > 0){
for (Entry<AtomKey, AtomValue> i : results.entrySet()){
if (i.getValue() instanceof ColumnValue){
ColumnValue v = (ColumnValue) i.getValue();
ColumnKey c = (ColumnKey) i.getKey();
writeAndDelete(member, c, v);
}
}
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) { }
}
}


private void writeAndDelete(ClusterMember m, ColumnKey c, ColumnValue v){
Destination d = new Destination();
d.setDestinationId(m.getId());
Client client = this.clientForDestination(d);
try {
client.post(om.readValue(v.getValue(), Message.class));
hintCf.delete(m.getId(), c.getColumn(), System.currentTimeMillis() * 1000);
hintsDelivered.getAndIncrement();
} catch ( IOException | RuntimeException e) {
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
}
hintsFailed.getAndIncrement();
}
}

Testing

We already have a test that demonstrated how the coordinator works.  Testing this code is basic. We shut down some servers and do a write. The node that receives the write should store hints for the down nodes, when those nodes come back alive the hints should be re-delivered.

    //insert when some nodes are down
failIfSomeAreDown(s, clAll);
for (int i=0;i<2;i++){
s[i].shutdown();
}
//start up all nodes
for (int i = 0; i < s.length; i++) {
s[i] = new Server(cs[i]);
s[i].init();
}
//get the hint replayer from the first node.
HintReplayer h = (HintReplayer) s[0].getPlugins().get(HintReplayer.MY_NAME);
for (int tries = 0; tries < 10; tries++) {
//wait for the delivered counter to get to 3
long x = h.getHintsDelivered();
if (x == 3) {
break;
}
Thread.sleep(1000);
}
Assert.assertEquals(3, h.getHintsDelivered());
//Now go to the servers and ensure the new value has been delivered
int found = 0;
for (int i = 0; i < s.length; i++) {
ColumnValue cv = (ColumnValue) s[i].get("abc", "def", "a", "b");
if (cv != null && cv.getValue().equals("d")){
found++;
}
}
Assert.assertEquals(3, found);

Conclusion

Hints help us ensure writes and deletes do not get lost during random outages. They make the system more self-healing, and in CAP they can help you have a 'little more C'. Even though hinted handoff does not provide any additional guarantees that the Consistency Level does not.

What should Nibiru do next? Hit me up on twitter and let me know what NoSQL feature I should try to build next!


                    

Saturday Jan 31, 2015

Request coordination, eventual consistency, and CAP - Building a NoSQL store - Part 10

In the last blog we mentioned the CAP theorem but now it is time to dive in. Distributed systems face challenges that non distributed systems do not. If data is replicated the system needs some notion of where data is, how it is replicated for redundancy, how the data is kept in sync, and what operations are available to the user.

"Eventual consistency" was the "Affordable Care Act" of the Dynamo Style (Cassandra) vs Strongly Consistent (HBase) debate. So embattled was the term "Eventual consistency" that "Tunable consistency" was coined. Fast forward some time and Cassandra now supports "atomic compare and swap" and HBase has support for "sometimes relaxed consistency". ROFL. That, as they say, is politics ... I mean... vendor driven open source...

By the way, in case you think I am a drama queen/conspiracy theorist and surely there is no quasi-political system attempting to defame eventual consistency, I did some searches for reference links to put in this article. As a result I have fallen into an anti-eventual consistency add targeting campaign...



In any case, a point I will constantly beat to death Nibiru aims to be plugable-everything. A lofty goal is to support all (possible) consistency models. Can it be done? I don't know, but it is worth trying I guess. I do not have all the answers. That being said, a system can not be "half-atomic" or "kinda-consistent", but per Column Family/Table we should be able to chose what consistency model fits what we want. For example, one store "pets" is an eventually consistency ColumnFamily store while another store "credit_cards" is configured as a strongly consistent key value store.

Cassandra's implementation of CAP is to provide per-request Consistency Level. The consistency level allows the user to say what aspects they desire. For example:

WRITE.ALL: The write is acknowledged (durable) by ALL Natural Endpoints  (all replicas of the data) before a result is returned to the client.
READ.ALL: The read request is sent to ALL Natural Endpoints the results are compared and the highest time stamp wins. (Other logic for timestamp tie)

WRITE.ONE: The request is sent to all Natural Endpoints, after one acknowledged (durable) write completes the result is returned, other writes still continue on the server
READ.ONE: The request is sent to one (or more) Natural Endpoints, the first result is returned to the client

There are other Consistency Levels including QUORUM, LOCAL_ONE, Serial (For compare and swap), etc, but save those for another day.

As our last blog showed a router can return one or more Destination(s)/Natural Endpoints for a given row key. Given the destinations and the consistency level we can make different strategies (code paths) for the desired results.

Lets take a case that is easy to explain, WRITE.ALL. A client sends a write operation for Column Family pets
{ rowkey: rover , column: type, value: poodle }

The node that receives this request is called the coordinator. The coordinator computes the Natural Endpoints (one of which could be local) where the data lives. For replication factor N, N futures will be created, each future will forward the request. Since the request is as CONSISTENCY ALL, we want to wait for all the futures to finish before a result is returned to the client. We also have an upper limit on how long we wish to wait for the futures before we consider a timeout.

This process looks something like this:

client ->  serverX --->|
server3 |
server4 <---|
server5 <---|
server6 <---|

client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 -------------------->|
server6 -------------------->|

I took the approach of separating the EventualCoordinator (the thing managing the futures) from the Merger (the logic merging the results). I did this because HighestTimestampWins does not make sense for a key value store which has no timestamp.

Lets do this

The coordinator does a few things. First the request could have a local component. The LocalAction class contains the logic to physically carry out the operation locally. The merger is a class that produces one result from multiple responses. Because N nodes will return N results, but only a single result is returned to the client. Again we defined the LocationAction and the Merge here because the eventualCoordinator is message/response agnostic.

  public Response handle(Message message) {
...
Token token = keyspace.getKeyspaceMetadata().getPartitioner().partition((String) message.getPayload().get("rowkey"));
List<Destination> destinations = keyspace.getKeyspaceMetadata().getRouter()
.routesTo(message, server.getServerId(), keyspace, server.getClusterMembership(), token);
long timeoutInMs = determineTimeout(columnFamily, message);

if (ColumnFamilyPersonality.COLUMN_FAMILY_PERSONALITY.equals(message.getRequestPersonality())) {
LocalAction action = new LocalColumnFamilyAction(message, keyspace, columnFamily);
ResultMerger merger = new HighestTimestampResultMerger();
return eventualCoordinator.handleMessage(token, message, destinations,
timeoutInMs, destinationLocal, action, merger);

}
...
}

So let's dive into the eventual coordinator. If there is only one destination and that destination is here. We do the action and return the results.

  public Response handleMessage(Token token, final Message message, List<Destination> destinations,
long timeoutInMs, Destination destinationLocal, final LocalAction action, ResultMerger merger) {
...
if (destinations.size() == 1 && destinations.contains(destinationLocal)) {
return action.handleReqest();
}

This next part prevents us from 1 request turning into 3 turning into 9 ...turning into doom. Basically we tag messages with "reroute" so we know if it is a client or server that sent this message.

    if (message.getPayload().containsKey("reroute")){
return action.handleReqest();
}
if (!message.getPayload().containsKey("reroute")){
message.getPayload().put("reroute", "");
}

I'm back Doc. I'm back from the future!

Java's ExecutorCompletionService is pretty nifty. It reminds me of what ThreadGroup does for threads. For each Destination the data is either local or remote so we have a callable to deal with both cases. LocalActionCallable wraps the LocalAction and the RemoteMessageCallable forwards the request over the network.

    ExecutorCompletionService<Response> completionService = new ExecutorCompletionService<>(executor);
final ArrayBlockingQueue<Response> results = new ArrayBlockingQueue<Response>(destinations.size());
ArrayBlockingQueue<Future<Response>> futures = new ArrayBlockingQueue<Future<Response>>(destinations.size());
for (final Destination destination : destinations) {
Future<Response> f = null;
if (destination.equals(destinationLocal)) {
f = completionService.submit(new LocalActionCallable(results, action));
} else {
f = completionService.submit(new RemoteMessageCallable(results, clientForDestination(destination), message));
}
futures.add(f);
}

After the futures have been submitted we want to wait for some amount of time for all to return. If we hit the deadline we fail, if we do not get all results before the deadline we fail, and if one future fails we fail.

    long start = System.currentTimeMillis();
long deadline = start + timeoutInMs;
List<Response> responses = new ArrayList<>();
if (c.getLevel() == ConsistencyLevel.ALL) {
while (start <= deadline) {
Response r = null;
try {
Future<Response> future = completionService.poll(deadline - start, TimeUnit.MILLISECONDS);
r = future.get();
if (r != null){
responses.add(r);
}
} catch (InterruptedException | ExecutionException e) {
break;
}
if (r == null){
return new Response().withProperty("exception", "coordinator timeout");
}
if (r.containsKey("exception")){
return r;
}
if (responses.size() == destinations.size()){
//success condition
break;
}

start = System.currentTimeMillis();
}
if (responses.size() == destinations.size()){
return merger.merge(responses, message);
} else {
return new Response().withProperty("exception", "coordinator timeout");
}
}

Note at level ALL two success and one fail is still a fail. Write operations do not have anything fancy to merge but reads do. Here is the highest timestamp wins merge logic. Every time I write the "find the highest" algorithm I am reminded of my first CS class in fortan. Thanks Professor D! (But we aint fortran-in no more)

 
private static ObjectMapper OM = new ObjectMapper();

@Override
public Response merge(List<Response> responses, Message message) {
if ("put".equals(message.getPayload().get("type"))
|| "delete".equals(message.getPayload().get("type"))) {
return new Response();
} else if ("get".equals(message.getPayload().get("type"))) {
return highestTimestampResponse(responses);

} else {
return new Response().withProperty("exception", "unsupported operation " + message);
}
}

private Response highestTimestampResponse(List<Response> responses){
long highestTimestamp = Long.MIN_VALUE;
int highestIndex = Integer.MIN_VALUE;
for (int i = 0; i < responses.size(); i++) {
Val v = OM.convertValue(responses.get(i).get("payload"), Val.class);
if (v.getTime() > highestTimestamp) {
highestTimestamp = v.getTime();
highestIndex = i;
}
}
return responses.get(highestIndex);
}

}

Testing

This code is taken from the unit test here. (I should be doing perma-links I am so lazy.) We should be able to test two things:

  1. If all nodes are up, operations at ALL should pass
  2. If at least one node is down, operations at ALL should fail
    ColumnFamilyClient cf = new ColumnFamilyClient(s[0].getConfiguration().getTransportHost(), s[0]
.getConfiguration().getTransportPort());
Session clAll = cf.createBuilder().withKeyspace("abc").withColumnFamily("def")
.withWriteConsistency(ConsistencyLevel.ALL, new HashMap())
.withReadConsistency(ConsistencyLevel.ALL, new HashMap()).build();
passIfAllAreUp(s, clAll);
failIfSomeAreDown(s, clAll);

public void passIfAllAreUp(Server [] s, Session sb) throws ClientException {
Response r = sb.put("a", "b", "c", 1);
Assert.assertFalse(r.containsKey("exception"));
int found = 0 ;
for (int i = 0; i < s.length; i++) {
ColumnFamilyPersonality c = (ColumnFamilyPersonality) s[i].getKeyspaces().get("abc").getColumnFamilies().get("def");
Val v = c.get("a", "b");
if (v != null){
//assert data is in three places
Assert.assertEquals("c", c.get("a", "b").getValue());
found ++;
}
}
Assert.assertEquals(3, found);
Assert.assertEquals("c", sb.get("a", "b").getValue());

}

public void failIfSomeAreDown(Server [] s, Session sb) throws ClientException {
for (int i = 2; i < s.length; i++) {
s[i].shutdown();
try {
sb.put("a", "b", "c", 1);

} catch (ClientException ex){
Assert.assertTrue(ex.getMessage().equals("coordinator timeout"));
}

}
}

Sweet! Dude! There you have it! Eventual Consistency, I mean Tunable Consistency, I mean Sometimes Relaxed Consistency :) Basically from here implementing other consistency levels is adding different case logic and pulling results from the ExecutorCompletionService. What is coming next? Hinted Handoff I think :) Stay tuned!

Wednesday Jan 28, 2015

The curios case of the never joining node.

[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h  10.9.54.233 status
Note: Ownership information does not include topology; for complete information, specify a keyspace
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address      Load       Tokens  Owns   Host ID                               Rack
UN  10.9.55.181  47.47 GB   256     10.4%  5a635595-1cc9-4b43-8e6b-12ef4639c1db  rack1
UN  10.9.58.53   42.12 GB   256     8.9%   bca09da7-85a9-476b-9a60-1b6a0973f375  rack1
UN  10.9.55.58   44.71 GB   256     10.0%  06b6a87e-7c3e-42ae-be28-20efe8920615  rack1
UN  10.9.52.15   48.01 GB   256     9.9%   d5746748-457e-4821-8f3d-bdac850cfcd3  rack1
UN  10.9.57.215  45.47 GB   256     10.3%  1d3bdec7-6f0b-4bc9-9329-72cd41cf5934  rack1
UJ  10.9.54.233  29.35 GB   256     ?      e973ef19-bd06-4a2a-bff1-661742b5ca14  rack1
UN  10.9.59.47   43.39 GB   256     11.3%  ea95e65d-8365-402c-aab9-3658fe5cc69f  rack1
UN  10.9.48.198  46.3 GB    256     10.7%  007c3169-dde4-4d81-acb9-3bbeda76723a  rack1
UN  10.9.49.224  48.15 GB   256     9.6%   0ed47a3d-cf70-4e08-99a3-3fdbe5ea4c72  rack1
UN  10.9.58.78   48.39 GB   256     9.4%   ca0144ac-948e-49ce-a3b9-857827c3cf5b  rack1
UN  10.9.49.13   43.54 GB   256     9.6%   59d23ac8-50ab-45dc-9ddc-d7f2825e1169  rack1


[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h  10.9.54.233 netstats
Mode: JOINING
Bootstrap d4ab87c0-a6ff-11e4-82bf-6f05f2e5b99d
    /10.9.55.181
    /10.9.48.198
    /10.9.59.47
    /10.9.58.78
    /10.9.57.215
    /10.9.52.15
    /10.9.49.13
    /10.9.58.53
    /10.9.55.58
Read Repair Statistics:
Attempted: 0
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name                    Active   Pending      Completed
Commands                        n/a         0              0
Responses                       n/a         0        4248573

Can this be expla

[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h  10.9.54.233 join
This node has already joined the ring.

Yea you could have fucking fooled me.

[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h  10.9.54.233 gossipinfo
production-cassandra-54-233.use1.huffpo.net/10.9.54.233
  generation:1422457813
  heartbeat:12652
  LOAD:3.1492231438E10
  RELEASE_VERSION:2.0.10
  DC:datacenter1
  HOST_ID:e973ef19-bd06-4a2a-bff1-661742b5ca14
  NET_VERSION:7
  RPC_ADDRESS:10.9.54.233
  SCHEMA:a948f8b3-dbc3-37f7-b121-c8eacfe9a772
  SEVERITY:0.0
  STATUS:BOOT,-2466632499206578162
  RACK:rack1

Calendar

Feeds

Search

Links

Navigation