Edward Capriolo

Wednesday Apr 08, 2015

Cleanup compaction - Building a NoSQL store - Part 13

In our last blog we showed how we can have nodes dynamically join our cluster to achieve a web scale, Internet Of Things, (Internet Of Things is now a buzzword I say once an hour) NoSql database. That blog had an incredible info graphic that demonstrated what happens when a node joins our cluster. Here it is again in techNoSQLcolor:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1 node1|

If you add a second node at position 50:

0 === 10 === 20 === 30 === 40 === 50 === 60 === 70 === 80 === 90 === 100
|node1 node1 node1 node1 node1 node1 | node2 node2 node2 node2 node2   |

Now, remember our data files are write once so we can't change them after the fact. After the split, requests that get sent to node1 are cut in half, but the data files on node1 contain more data than they need to. What we need is a way to remove all the data that is on a node that is no longer needed. 

Cassandra has a command for this called 'cleanup' that needs to be run on each node. The theory, in the olden days, a node join could go bad in some way and the system could be "recovered" by manually adjusting the tokens on each nodes and doing various repair process. In practice not many people (including myself) know exactly what to do when node joins go wrong, adjust tokens, move files, run repairs? The system SHOULD be able to automatically remove the old data, but no one has gotten to this yet as far as I can tell.

To handle cleanup we need two things:

  1. A command that can iterate the data files (SsTables) and remove data that no longer belongs on the node.
  2. A variable that can control allow normal compaction processes to cleanup data automatically.

You may want to look back at our previous blog on compaction to get an idea of how we merge SsTables.

Lets get to it

We are going to enhance the compaction process to handle this special case. First, we have a boolean that controls cleanup. If the token does not belong on this node we do not write it during compaction.

      if (cleanOutOfRange){
if (coordinator.destinationsForToken(lowestToken, keyspace).contains(coordinator.getDestinationLocal())){
newSsTable.write(lowestToken, allColumns);
} else {
newSsTable.write(lowestToken, allColumns);

Cleanup is simple, unlike normal compaction we do not have to merge multiple tables together (we could however). One table in makes one table out.

  public void cleanupCompaction(Keyspace keyspace, DefaultColumnFamily defaultColumnFamily){
Set<SsTable> tables = new TreeSet<>(defaultColumnFamily.getSstable());//duplicate because we will mutate the collection
for (SsTable table : tables){
String newName = getNewSsTableName();
try {
SsTable [] ssArray = {table};
SsTable s = compact(ssArray, newName, server.getServerId(), server.getCoordinator(), true, keyspace);
//todo delete old
} catch (IOException e) {
throw new RuntimeException(e);

Testing time

For our coordinator, we made more of an integration test by launching a second server and joining it to the first. I typically like to be about 20% integration tests, 80% unit tests, and 0% mock tests. Why do I take this approach?

First, I believe mock tests are cheating. That is not say that mocking does not have it's uses, but I feel it is used to cover up code smells. If you have a good design and good API, not much mocking should not be needed. Integration tests are good at proving the entire process can run end-to-end, but they are long and redundant.

Unit tests do two some imporant for me: they test things, and work like tripwire for bad code and bad assumptions, they document things. They document things because they show what components should do. They tell a story.

The story for cleanup is simple: A system has some data on disk. After topology changes (like node join or leave or change of replication factor) some of that data is no longer required to be on a given node and can be removed.

I wrote this test by hiding code in methods with friendly names that say what they are doing. It is a little cute I know but why not? We insert 10 rows directly to the server to start things off. When the test is done only 1 of the 10 rows should still be on disk.

public void cleanupTest() throws IOException, InterruptedException, ClientException {
for (int i = 0; i < 9; i++) {
server.put(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, i+"", "age", "4", 1);

forceFlushAndConfirmFilesOnDisk(server); //flush memtables to disk
changeTheRouter(server); // change the routing information to simulate topology changes
assertSomeDatum(server); // assert data is on disk
runCleanup(server); // run cleanup
assertDatumAfterCompaction(server); //assert some data has been removed

Rather than writing a long involved integration test to move data off the node, we implement a router that routes token "1" locally and routes everything else nowhere! This way when we Cleanup the data everything else should go. (No need for mocking libraries, just good old Object Oriented Design)

  public static class OnlyTheBestRouter implements Router {
public List<Destination> routesTo(ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
if (token.getRowkey().equals("1")){
Destination d = new Destination();
return Arrays.asList(d);
return Arrays.asList();

 This method installs the router in place of the default router which writes everything locally.

  private void changeTheRouter(Server s) throws ClientException{
MetaDataClient metaDataClient = new MetaDataClient(s.getConfiguration().getTransportHost(), s
new Response().withProperty(KeyspaceMetaData.ROUTER_CLASS, OnlyTheBestRouter.class.getName()), true);

This method runs the cleanup.

  private void runCleanup(Server s){
CompactionManager cm = ((CompactionManager) s.getPlugins().get(CompactionManager.MY_NAME));
cm.cleanupCompaction(s.getKeyspaces().get(TestUtil.DATA_KEYSPACE), (DefaultColumnFamily)

After the cleanup only columns for the row key/token "1" should be present and all others should be missing.

  private void assertDatumAfterCompaction(Server s){
Assert.assertEquals(null, s.get(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, "3", "age")); //gone!
String res = ((ColumnValue) s.get(TestUtil.DATA_KEYSPACE, TestUtil.PETS_COLUMN_FAMILY, "1", "age")).getValue(); //still there!
Assert.assertEquals("4", res);

Wrap up

There you have it, cleanup, the name say it all. Sexy and automatic!


Post a Comment:
Comments are closed for this entry.