Edward Capriolo

Monday Dec 22, 2014

SSTables and Compaction - Building a column family store - Part 5

In the last blog we introduced a threshold based system that flushed Memtables to SSTables. In a Structured Log Merge storage system having more SSTables will make the read path slower. Also the system needs a way to merge the writes and updates so we do not run out of disk space.

Compaction is a process that merges multiple SSTables together. Compaction removes older columns and can delete entire rows. It also optimizes the data by putting all the columns of the row close together on disk.

Imagine if each of the three columns here was an SSTable.

 
 rowkey column value
 myrow a 5
 rowkey
 column value
 myrow a 2
 myrow b b
 myrow c c
 
 rowkey column value
 row1 a b
 row1 b c
   
 
 

After a compaction the resulting SSTable would look like this:

 rowkey column value
 myrow a 2
 myrow b b
 myrow c c
 row1 a b
 row1 b c

Because the SSTables are sorted by rowkey and secondly column, merging multiple together is straight forward. We iterate each SStable at the same time and merge columns into a new table.

Note: If a row has a large number of columns, merging in memory would likely cause an out-of-memory error. We would need a strategy to detect when the merge size gets too large and then start doing on disk merging. For now we will pretend like this is not an issue.

I assumed I was going to struggle with this algorithm a bit more than I did. Either I am getting good or, more than likely, there are some glaring holes in the implementation I do not see yet.

The approach I took was to create a method that can merge multiple SSTables into a single new one. I re-factored the SSTableWriter to write SSTables incrementally.

Then the algorithm I came up with was:

  1. Read a token from each sstable being compacted
  2. Find the lowest token
  3. Merge the columns of only the lowest token
  4. Advance the readers that only had the lowest token
public static SsTable compact(SsTable [] ssTables, String newName) throws IOException {
ColumnFamily columnFamily = ssTables[0].getColumnFamily();
SsTableStreamReader[] readers = new SsTableStreamReader[ssTables.length];
SsTableStreamWriter newSsTable = new SsTableStreamWriter(newName,
columnFamily.getKeyspace().getConfiguration());

newSsTable.open();
Token[] currentTokens = new Token[ssTables.length];
for (int i = 0; i < ssTables.length; i++) {
readers[i] = ssTables[i].getStreamReader();
}
for (int i = 0; i < currentTokens.length; i++) {
currentTokens[i] = readers[i].getNextToken();//1

}
while (!allNull(currentTokens)){
Token lowestToken = lowestToken(currentTokens);
SortedMap<String,Val> allColumns = new TreeMap<>();
for (int i = 0; i < currentTokens.length; i++) {
if (currentTokens[i] != null && currentTokens[i].equals(lowestToken)) {
SortedMap<String, Val> columns = readers[i].readColumns();
merge(allColumns, columns);//3
}
}
newSsTable.write(lowestToken, allColumns);
advance(lowestToken, readers, currentTokens);//4
}
newSsTable.close();
SsTable s = new SsTable(columnFamily);
s.open(newName, columnFamily.getKeyspace().getConfiguration());
return s;
}

I always try to write functional code. By functional I do not mean super sexy, complicated scala, and function pointers, I mean methods that to not modify and shared state. When you write code like this it makes testing easier. I wrote a quick test that would merge two SSTables:

  @Test
public void test() throws IOException{
...
Keyspace ks1 = MemtableTest.keyspaceWithNaturalPartitioner();
SsTable s = new SsTable(cf);
{
Memtable m = new Memtable(cf);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2", "c", 1, 0L);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2", "d", 2, 0L);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column3", "e", 2, 0L);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row2"), "column1", "e", 2, 0L);
SSTableWriter w = new SSTableWriter();
w.flushToDisk("1", configuration, m);
s.open("1", configuration);
}
SsTable s2 = new SsTable(cf);
{
Memtable m2 = new Memtable(cf);
m2.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column1", "c", 1, 0L);
m2.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row2"), "column1", "f", 3, 0L);
SSTableWriter w2 = new SSTableWriter();
w2.flushToDisk("2", configuration, m2);
s2.open("2", configuration);
}
CompactionManager.compact(new SsTable[] { s, s2 }, "3");
SsTable ss = new SsTable(cf);
ss.open("3", configuration);
Assert.assertEquals("e", ss.get("row1", "column3").getValue());
}

Great! Now we can merge multiple SSTables into a single one, but have to build the state machine right. In the last blog we flushed memtables to disk and reloaded them up before removing the memtable. We need to do something similar here.

What we want to do is detect a trigger for compaction. In this case if we have more then 4 SSTables we want to compact them into a single table. To do this we have a background thread iterate all keyspaces. In each keyspace we iterate all the ColumnFamily instances. In each column family if there are more then 4 SSTables compact them.

@Override
public void run() {
while (true){
for (Entry<String, Keyspace> keyspaces : server.getKeyspaces().entrySet()){
Keyspace keyspace = keyspaces.getValue();
for (Map.Entry<String,ColumnFamily> columnFamilies : keyspace.getColumnFamilies().entrySet()){
Set<SsTable> tables = columnFamilies.getValue().getSstable();
if (tables.size() >= 4){
//...do compaction here

 

Like the memtable flushing, we want to make sure we insert the new SSTable first and later remove the older SSTables in a concurrent way. Again, we can leverage the fact that having the data in multiple files will not change the end result.

          Set<SsTable> tables = columnFamilies.getValue().getSstable();
if (tables.size() >= 4){
SsTable [] ssArray = tables.toArray(new SsTable[] {});
try {
String newName = getNewSsTableName();
SsTable s = compact(ssArray, newName);
tables.add(s);
for (SsTable table : ssArray){
tables.remove(table);
}
numberOfCompactions.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

To show the entire process in action I cooked up this end-to-end test. What we are doing here:

  1. Set the memtable to flush at 2 rows
  2. Insert 9 rows (this should cause 4 flushes)
  3. Assert the flushes are as expected
  4. Assert the compaction thread had 1 event
  5. Check that all the rows still exist
  @Test
public void compactionTest() throws IOException, InterruptedException{
...
Configuration configuration = new Configuration();
configuration.setSstableDirectory(tempFolder);
Server s = new Server();
s.setConfiguration(configuration);
s.init();
s.createKeyspace(ks);
s.createColumnFamily(ks, cf);
s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setFlushNumberOfRowKeys(2);//1
for (int i = 0; i < 9; i++) {
s.put(ks, cf, i+"", "age", "4", 1);//2
Thread.sleep(1);
}
Val x = s.get(ks, cf, "8", "age");
Thread.sleep(1000);
Assert.assertEquals(4, s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getMemtableFlusher().getFlushCount());//3
Assert.assertEquals(1, s.getCompactionManager().getNumberOfCompactions());//4
Assert.assertEquals("4", x.getValue());
for (int i = 0; i < 9; i++) {
Val y = s.get(ks, cf, i+"", "age");
Assert.assertEquals("4", y.getValue());//5
}
}

Sweet! Now we have a system with Memtables will flush to SSTables. We also have a background thread optimizing the SSTables on disk by compaction! We are getting really really close to have a fully functional single node ColumnFamily store. We only need two more critical components, Bloom Filters and CommitLogs. (There are some little things we need to do like have compaction remove deleted rows which are easier once we have bloom filters). Awesome sauce!

Comments:

Post a Comment:
Comments are closed for this entry.

Calendar

Feeds

Search

Links

Navigation