Edward Capriolo

Sunday Dec 07, 2014

Index Files - Building a column store - Part 2

In a previous blog post I demonstrated how to build an SSTable. For reference the sstable data on disk looks like this:

^@00000^A00000^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc
^@00001^A00001^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc
^@00002^A00002^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc

Another way to visualize the data:

Token(rowkey)
rowkey
columns
0000100001
name value time
 column2 c1417966551490
 column3 c1417966551490

Side note: In this case the token and rowkey are the same because we are using a natural partitioner that does not hash the key.

An astute reader of the last blog may have noticed that even though are tables are sorted by token and our get (key, column) operation is searching via token, we were using an inefficient linear search. One way to optimize this type of search is to use a binary search. However this is not the primary way Cassandra optimizes the problem.

Enter IndexInterval

The concept of IndexInterval is that every Nth rowkey we record an index of that row key and the byte offset the row key exists at in the file. In this way we build a relatively small fast lookup table of rowkeys. Even if the row key is not found in the index the database should get close to the rowkey and only have to page at most index-interval-1 keys.

Depending on the use case small rows vs larger rows a larger or smaller index size might be right for you.

Writing our own Index

Since cassandra's data files are write-once this make the process straight forward: As we are flushing our memtable to an sstable we also record the byte offset of every Nth row in a separate file.

I was not aware of any output stream that could count it's position and I was not wanting to track it out of band so I whipped up an output stream that counted as it outputted.

package io.teknek.nibiru.engine;
import java.io.BufferedOutputStream;

public class CountingBufferedOutputStream extends BufferedOutputStream {
  private long writtenOffset;
  public CountingBufferedOutputStream(OutputStream out) {
    super(out);
  }
  public synchronized void writeAndCount(int b) throws IOException {
    super.write(b);
    writtenOffset++;
  }
  public void writeAndCount(byte[] b) throws IOException {
    super.write(b);
    writtenOffset += b.length;
  }
  public long getWrittenOffset() {
    return writtenOffset;
  }
}

I could have buried the index creating logic inside the sstable writing logic, but I decided to use OOD and design a separate class to write index files rather than intermingle the logic of index writing with sstable writing.

public class IndexWriter {

  private final String id;
  private final Configuration conf;
  private BufferedOutputStream indexStream;
  private long rowkeyCount;
 
  public IndexWriter(String id, Configuration conf){
    this.id = id;
    this.conf = conf;
  }
 
  public void open() throws FileNotFoundException {
    File indexFile = new File(conf.getSstableDirectory(), id + ".index");
    indexStream = new CountingBufferedOutputStream(new FileOutputStream(indexFile));
  }
 
  public void handleRow(long startOfRecord, String token) throws IOException {
    if (rowkeyCount++ % conf.getIndexInterval() == 0){
      indexStream.write(SSTable.START_RECORD);
      indexStream.write(token.getBytes());
      indexStream.write(SSTable.END_TOKEN);
      indexStream.write(String.valueOf(startOfRecord).getBytes());
      indexStream.write(SSTable.END_ROW);
    }
  }
 
  public void close () throws IOException {
    indexStream.close();
  }
}

Now we make some minor changes to the sstable writer to also write the index as we write the sstable.

 public void flushToDisk(String id, Configuration conf, Memtable m) throws IOException{
    File sstableFile = new File(conf.getSstableDirectory(), id + ".ss");
    CountingBufferedOutputStream ssOutputStream = null;
    IndexWriter indexWriter = new IndexWriter(id, conf);

    try {
      ssOutputStream = new CountingBufferedOutputStream(new FileOutputStream(sstableFile));
      indexWriter.open();
      for (Entry<Token, ConcurrentSkipListMap<String, Val>> i : m.getData().entrySet()){
        long startOfRecord = ssOutputStream.getWrittenOffset();
        ssOutputStream.writeAndCount(START_RECORD);
        ssOutputStream.writeAndCount(i.getKey().getToken().getBytes());
        ssOutputStream.writeAndCount(END_TOKEN);
        ssOutputStream.writeAndCount(i.getKey().getRowkey().getBytes());
        ssOutputStream.writeAndCount(END_ROWKEY);
        indexWriter.handleRow(startOfRecord, i.getKey().getToken());
        boolean writeJoin = false;

When we write a sstable with rows 00000 to 10000 the following following index is produced:

^@00000^A0
^@01000^A69000
^@02000^A138000
^@03000^A207000
^@04000^A276000
^@05000^A345000
^@06000^A414000
^@07000^A483000
^@08000^A552000
^@09000^A621000
 

Sweet! Now let's think about the use case of this index. It needs to provide one function: Given a rowkey return the byte offset for the sstable to begin searching at.

 public long findStartOffset(String token)

Here is most of the implementation:

  public long findStartOffset(String token) throws IOException {
    long offset = 0;
    do {
      if (bgIndex.dst[bgIndex.currentIndex] == SSTable.END_ROW) {
        bgIndex.advanceIndex();
      }
      readHeader(bgIndex);
      StringBuilder readToken = readToken(bgIndex);
      long thisOffset = readIndexSize(bgIndex);
      if(readToken.toString().equals(token)){
        return thisOffset;
      } else if (readToken.toString().compareTo(token) > 0) {
        return offset;
      } else {
        offset = thisOffset;
      }
    } while (bgIndex.currentIndex < bgIndex.dst.length - 1 || bgIndex.mbb.position()  < bgIndex.channel.size());
    return offset;
  }

Our sstable read only need a single change:

  public Val get (String row, String column) throws IOException{
    ...
    BufferGroup bg = new BufferGroup();
    bg.channel = ssChannel;
    bg.mbb = (MappedByteBuffer) ssBuffer.duplicate();
//bg.setStartOffset(0);
    bg.setStartOffset((int)index.findStartOffset(row));

With this change we start reading from a close offset rather than the beginning of the file. If we get lucky 1/1000 the index might help us even seek directly to the row!

What is next? I do not know maybe we can made the index and stable searching do a binary search. Maybe make bloom filters or even a key cache!

Comments:

Post a Comment:
Comments are closed for this entry.

Calendar

Feeds

Search

Links

Navigation