Edward Capriolo

Wednesday Dec 31, 2014

Commitlog - Building a Column Family store - Part 6

In previous entries we have built Memtables and SSTables. For a quick recap, Memtables allow us to make the write path fast by writing to memory and periodically flushing to on disk SSTables. Fast is great but durability is important as well. Imagine if a memtable contains 1000 writes and delete tombstones and then the server process is killed. These entries are only in memory and would be lost.

The answer for this (in a single node scenario) is a Commit log. The Commitlog is a flat log of all mutation (write/delete) operations.

If the server were to crash we can replay the commitlog and recover those operations. In fact, the only time we need to use the commitlog files are on start up. Once a Memtable is safely flushed to an SStable the Commitlog can be deleted.

Because a Commitlog is only written sequentially it does not de-duplicate like a Memtable. We do not need to memory map or index the Commitlog because we will only ever read them sequentially.

Let's talk about implementing a Commitlog.


Server -> Memtable -> SStable
-> Commitlog

I will stub out the important methods in the Commitlog class.

public class CommitLog {

private final ColumnFamily columnFamily;
private CountingBufferedOutputStream ssOutputStream;

public CommitLog(ColumnFamily cf);

//Open a commit log before calling write
public void open() throws FileNotFoundException;

//write a value to the commit log
public synchronized void write(Token rowkey, String column, String value, long stamp, long ttl) ;

//delete commit log from disk it is no longer needed
public void delete() throws IOException;

//close the log for writing but do not delete it
public void close() throws IOException;
}

After pondering the design I determined I would make the Commitlog a member of the Memtable. The relationship does not need to be a has-a relation like this, but it makes it fairly easy to track the commit logs and later delete them only after we are sure the memtable is flushed.


public DefaultColumnFamily(Keyspace keyspace, ColumnFamilyMetadata cfmd){
super(keyspace,cfmd);
CommitLog commitLog = new CommitLog(this);
try {
commitLog.open();
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
memtable = new AtomicReference<Memtable>(new Memtable(this, commitLog));
memtableFlusher = new MemtableFlusher(this);
memtableFlusher.start();
}

Next, we take any mutation and make sure it is also applied to the commitlog as well as the memtable.

  public void put(String rowkey, String column, String value, long time, long ttl){
try {
memtable.get().getCommitLog().write(keyspace.getKeyspaceMetadata().getPartitioner().partition(rowkey), column, value, time, ttl);
} catch (IOException e) {
throw new RuntimeException (e);
}
memtable.get().put(keyspace.getKeyspaceMetadata().getPartitioner().partition(rowkey), column, value, time, ttl);
considerFlush();
}

Great! Next there are two cases to deal with:

Case 1: The memtable is flushed to an sstable and we can safely delete the associated commitlog.

Because of our decision to attach the commit log to the sstable this was a trivial change to the MemtableFlusher.

      for (Memtable memtable : memtables){
SSTableWriter ssTableWriter = new SSTableWriter();
try {
//TODO: a timeuuid would be better here
String tableId = String.valueOf(System.nanoTime());
ssTableWriter.flushToDisk(tableId, columnFamily.getKeyspace().getConfiguration(), memtable);
SsTable table = new SsTable(columnFamily);
table.open(tableId, columnFamily.getKeyspace().getConfiguration());
columnFamily.getSstable().add(table);
memtables.remove(memtable);
memtable.getCommitLog().delete(); //Delete the commit log because we know the data is now durable
flushes.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

Case 2: On start up replay any commit logs, by reading them and applying the mutations to a memtable.

Start up is a fairly simple process. Open and load up all SStables, replay all commit logs, and done.


public void init() throws IOException {
for(File ssTable: keyspace.getConfiguration().getSstableDirectory().listFiles()){
String [] parts = ssTable.getName().split("\\.");
if (parts.length == 2){
if ("ss".equalsIgnoreCase(parts[1])){
String id = parts[0];
SsTable toOpen = new SsTable(this);
toOpen.open(id, keyspace.getConfiguration());
sstable.add(toOpen);
}
}
}

for(File commitlog: CommitLog.getCommitLogDirectoryForColumnFamily(this).listFiles()){
String [] parts = commitlog.getName().split("\\.");
if (parts.length == 2){
if (CommitLog.EXTENSION.equalsIgnoreCase(parts[1])){
processCommitLog(parts[0]);
}
}
}
}

For the actual format of the commitlog I used the same format as the SsTable. This made it simple to borrow the code used by compaction that reads the SStable linearly.


void processCommitLog(String id) throws IOException {
CommitLogReader r = new CommitLogReader(id, this);
r.open();
Token t;
while ((t = r.getNextToken()) != null){
SortedMap<String,Val> x = r.readColumns();
for (Map.Entry<String,Val> col: x.entrySet()){
memtable.get().put(t, col.getKey(), col.getValue().getValue(), col.getValue().getTime(), col.getValue().getTtl());
}
}
}

Cool so how do we test this beast? Well how about this?

  1. Start up a server
  2. Set the Memtable flush at 2 rowkeys.
  3. Insert 3 records.
  4. Shutdown the server.

This should leave a commitlog with one row unflushed. Next:

  1. Startup the server (which will read in commit logs)
  2. Read the unflushed row and assert it exists

@Test
public void commitLogTests() throws IOException, InterruptedException {
String ks = "data";
String cf = "pets";
Configuration configuration = aBasicConfiguration(testFolder);
Server s = new Server(configuration);
s.init(); //1
s.createKeyspace(ks);
s.createColumnFamily(ks, cf);
s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setFlushNumberOfRowKeys(2); //2
s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setCommitlogFlushBytes(1);
for (int i = 0; i < 3; i++) {
s.put(ks, cf, i+"", "age", "4", 1);//3
Thread.sleep(1);
}
Val x = s.get(ks, cf, "0", "age");
Assert.assertEquals("4", x.getValue());
{
Val y = s.get(ks, cf, "2", "age");
Assert.assertEquals("4", y.getValue());
}
Thread.sleep(1000);
s.shutdown();//4
Thread.sleep(1000);
{
Server j = new Server(configuration);
j.init();//1
Assert.assertNotNull(j.getKeyspaces().get(ks).getColumnFamilies().get(cf));
Val y = j.get(ks, cf, "2", "age");
Assert.assertEquals("4", y.getValue());//2
j.shutdown();
}
}

Some notes about commitlog's and durability: Disk file systems have synchronous and asynchronous operations. Virtualization is particularly hard to enforce durability because the disk systems are abstracted and even operations like flush, or sync could be virtualized. We went with the option of doing a synchronized write to a buffered stream and periodic flushing in this example.  In a multi-node scenario there is added durability as write operations could be sent to multiple nodes. This solution is "probably ok". Durability is always a fun problem and there are other ways to construct this commitlog for sure.

Awesomesause! We now have a commitlog, memtables, sstables, keycache, index files, and compaction. We have a very very functional Column Family store implementation! All that is left is bloom filters and compaction removing tombstones.

Nibiru is getting very fun at this point. It may be turning from example code in blog to the next NoSQL. You may want to fork it and play.

Comments:

Post a Comment:
Comments are closed for this entry.

Calendar

Feeds

Search

Links

Navigation