Edward Capriolo

Sunday Dec 14, 2014

Memtable, Flushing, and SSTables - Building a column family store - Part 4

Up until this point the past blogs memtables, sstables, Key cache have been built and tested as discrete components. To build a Column Family store they need to work in concert. This means we need to talk about the write path.

Our SSTables and associated IndexFiles are write once. This has some nice characteristics. First, the data store does not do any random write. This is great for rotational disks and for SSD disks. We want our write path to look like this:

set-request -> server -> memtable 

Eventually the Memtable will grow, and if it grows unbounded the system will run out of memory.  We want to flush the Memtable to disk as SStable before we have a catastrophic OutOfMemory error, but we also want the Memtable to be as large as possible so we do less small writes. (and require less compaction to merge these tables).

It would just be nice to do something like:
memoryInUse += size_of(column)

Unfortunately that is not an easy task. In the Java world counting how much memory structures use is a hard problem. There are several ways to go about counting used memory but the approach I am going to take is avoiding the hard problem and solving an easier one:

Flush when the number of row keys is > X.

In the end we likely want to build numerous criteria:

  1. Flush if memtable is very old
  2. Flush if memtable > certain size
  3. Flush if memtabe hash had more then X operations

All of these things should be easy to add incrementally.

public class ColumnFamilyMetadata {
private String name;
private long tombstoneGraceMillis;
private int flushNumberOfRowKeys = 10000;

Next shell out a method for flushing

public class ColumnFamily {
void considerFlush(){
// flush memtable to sstable

Any method that modifies or creates data should consider flushing

  public void delete(String rowkey, String column, long time){
memtable.get().delete(keyspace.getKeyspaceMetadata().getPartitioner().partition(rowkey), column, time);
Flushing a memory table to disk turns out to be a tricky concurrency problem. We do not want to block writes. Even thought writing to disk linearly is fast, it is not as fast as writing to the memtable. We do not want to be forced to acquire a synchronization lock every operation to check if a flush is in progress. After the sstable is flushed we can not clear the memtable until the sstable loaded back up. Otherwise data could temporarily vanish during the period between the time the sstable is written but before it is loaded back into memory.

Here is my solution. Maybe aphyr will, call me maybe, out on it, but for a 0 follower github project this is fine :). The good news is that data here is idempotent, thus having the data in a memtable and an sstable at the same will not produce an incorrect result.

First, we create MemtableFlusher a Runnable per column family to flush memtable instances.

public class MemtableFlusher implements Runnable {
private ConcurrentSkipListSet<Memtable> memtables = new ConcurrentSkipListSet<>();

This piece is tightly coupled with the ColumnFamily instance. Our read operations need to be able to access the current memtable, all the memtables pending flush to disk, and the already flushed sstables.

public class ColumnFamily {

private AtomicReference<Memtable> memtable;
private MemtableFlusher memtableFlusher;
private Set<SSTable> sstable = new ConcurrentSkipListSet<>();

Here is the implementation of the considerFlush() method we talked about above. Namely, if size of memtable is larger than the threshold, atomically move the current memtable onto the flush queue and put a new memtable in it's place.

 void considerFlush(){
Memtable now = memtable.get();
if (columnFamilyMetadata.getFlushNumberOfRowKeys() != 0
&& now.size() > columnFamilyMetadata.getFlushNumberOfRowKeys()){
Memtable aNewTable = new Memtable(this);
boolean success = memtableFlusher.add(now);
if (success){
boolean swap = memtable.compareAndSet(now, aNewTable);
if (!swap){
throw new RuntimeException("race detected");

Now that the memtable is placed onto the flush queue. The MemtableFlusher should:

  1. flush memtable to sstable
  2. load the sstable (memory map it)
  3. remove any reference to the memtable
public void run() {
while (true){
for (Memtable memtable : memtables){
SSTableWriter ssTableWriter = new SSTableWriter();
try {
//TODO: a timeuuid would be better here
String tableId = String.valueOf(System.nanoTime());
ssTableWriter.flushToDisk(tableId, columnFamily.getKeyspace().getConfiguration(), memtable); //1
SSTable table = new SSTable();
table.open(tableId, columnFamily.getKeyspace().getConfiguration());
columnFamily.getSstable().add(table); //2
memtables.remove(memtable); //3
} catch (IOException e) {
//TODO: catch this and terminate server?
throw new RuntimeException(e);
try {
} catch (InterruptedException e) { }

I have written a small test. Obviously this will not find concurrency, bugs but at least it certifies that the complements are working together as designed in a simple case. Here we set the flush size to 2 and insert 3 row keys. The third row key inserted should cause a flush.

  public void aTest() throws IOException, InterruptedException{
    File tempFolder = testFolder.newFolder("sstable");
    Configuration configuration = new Configuration();
    Server s = new Server();
    s.createColumnFamily(ks, cf);
    s.put(ks, cf, "jack", "name", "bunnyjack", 1);
    s.put(ks, cf, "jack", "age", "6", 1);
    Val x = s.get(ks, cf, "jack", "age");
    Assert.assertEquals("6", x.getValue());
    s.put(ks, cf, "ziggy", "name", "ziggyrabbit", 1);
    s.put(ks, cf, "ziggy", "age", "8", 1);
    s.put(ks, cf, "dotty", "age", "4", 1);
    Assert.assertEquals(1, s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getMemtableFlusher().getFlushCount());
    x = s.get(ks, cf, "jack", "age");
    Assert.assertEquals("6", x.getValue());

We are getting closer and closer to being fully functional. There are two big pieces left: Currently we are writing files to disk, but to run continuously we will need to merge these tables together. This process is known as compaction. After that a commit log will be needed to add durability.

This is getting exciting!


Post a Comment:
Comments are closed for this entry.