Edward Capriolo

Sunday Dec 14, 2014

Memtable, Flushing, and SSTables - Building a column family store - Part 4

Up until this point the past blogs memtables, sstables, Key cache have been built and tested as discrete components. To build a Column Family store they need to work in concert. This means we need to talk about the write path.

Our SSTables and associated IndexFiles are write once. This has some nice characteristics. First, the data store does not do any random write. This is great for rotational disks and for SSD disks. We want our write path to look like this:

set-request -> server -> memtable 

Eventually the Memtable will grow, and if it grows unbounded the system will run out of memory.  We want to flush the Memtable to disk as SStable before we have a catastrophic OutOfMemory error, but we also want the Memtable to be as large as possible so we do less small writes. (and require less compaction to merge these tables).

It would just be nice to do something like:
memoryInUse += size_of(column)


Unfortunately that is not an easy task. In the Java world counting how much memory structures use is a hard problem. There are several ways to go about counting used memory but the approach I am going to take is avoiding the hard problem and solving an easier one:

Flush when the number of row keys is > X.

In the end we likely want to build numerous criteria:

  1. Flush if memtable is very old
  2. Flush if memtable > certain size
  3. Flush if memtabe hash had more then X operations

All of these things should be easy to add incrementally.

public class ColumnFamilyMetadata {
private String name;
private long tombstoneGraceMillis;
private int flushNumberOfRowKeys = 10000;

Next shell out a method for flushing

public class ColumnFamily {
void considerFlush(){
// flush memtable to sstable
}
}

Any method that modifies or creates data should consider flushing

  public void delete(String rowkey, String column, long time){
memtable.get().delete(keyspace.getKeyspaceMetadata().getPartitioner().partition(rowkey), column, time);
considerFlush();
}
Flushing a memory table to disk turns out to be a tricky concurrency problem. We do not want to block writes. Even thought writing to disk linearly is fast, it is not as fast as writing to the memtable. We do not want to be forced to acquire a synchronization lock every operation to check if a flush is in progress. After the sstable is flushed we can not clear the memtable until the sstable loaded back up. Otherwise data could temporarily vanish during the period between the time the sstable is written but before it is loaded back into memory.

Here is my solution. Maybe aphyr will, call me maybe, out on it, but for a 0 follower github project this is fine :). The good news is that data here is idempotent, thus having the data in a memtable and an sstable at the same will not produce an incorrect result.

First, we create MemtableFlusher a Runnable per column family to flush memtable instances.

public class MemtableFlusher implements Runnable {
private ConcurrentSkipListSet<Memtable> memtables = new ConcurrentSkipListSet<>();

This piece is tightly coupled with the ColumnFamily instance. Our read operations need to be able to access the current memtable, all the memtables pending flush to disk, and the already flushed sstables.

public class ColumnFamily {

private AtomicReference<Memtable> memtable;
private MemtableFlusher memtableFlusher;
private Set<SSTable> sstable = new ConcurrentSkipListSet<>();


Here is the implementation of the considerFlush() method we talked about above. Namely, if size of memtable is larger than the threshold, atomically move the current memtable onto the flush queue and put a new memtable in it's place.

 void considerFlush(){
Memtable now = memtable.get();
if (columnFamilyMetadata.getFlushNumberOfRowKeys() != 0
&& now.size() > columnFamilyMetadata.getFlushNumberOfRowKeys()){
Memtable aNewTable = new Memtable(this);
boolean success = memtableFlusher.add(now);
if (success){
boolean swap = memtable.compareAndSet(now, aNewTable);
if (!swap){
throw new RuntimeException("race detected");
}
}
}
}

Now that the memtable is placed onto the flush queue. The MemtableFlusher should:

  1. flush memtable to sstable
  2. load the sstable (memory map it)
  3. remove any reference to the memtable
  @Override
public void run() {
while (true){
for (Memtable memtable : memtables){
SSTableWriter ssTableWriter = new SSTableWriter();
try {
//TODO: a timeuuid would be better here
String tableId = String.valueOf(System.nanoTime());
ssTableWriter.flushToDisk(tableId, columnFamily.getKeyspace().getConfiguration(), memtable); //1
SSTable table = new SSTable();
table.open(tableId, columnFamily.getKeyspace().getConfiguration());
columnFamily.getSstable().add(table); //2
memtables.remove(memtable); //3
flushes.incrementAndGet();
} catch (IOException e) {
//TODO: catch this and terminate server?
throw new RuntimeException(e);
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) { }
}
}

I have written a small test. Obviously this will not find concurrency, bugs but at least it certifies that the complements are working together as designed in a simple case. Here we set the flush size to 2 and insert 3 row keys. The third row key inserted should cause a flush.

  @Test
  public void aTest() throws IOException, InterruptedException{
    File tempFolder = testFolder.newFolder("sstable");
    Configuration configuration = new Configuration();
    configuration.setSstableDirectory(tempFolder);
    Server s = new Server();
    s.setConfiguration(configuration);
    s.createKeyspace(ks);
    s.createColumnFamily(ks, cf);
    s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setFlushNumberOfRowKeys(2);
    s.put(ks, cf, "jack", "name", "bunnyjack", 1);
    s.put(ks, cf, "jack", "age", "6", 1);
    Val x = s.get(ks, cf, "jack", "age");
    Assert.assertEquals("6", x.getValue());
    s.put(ks, cf, "ziggy", "name", "ziggyrabbit", 1);
    s.put(ks, cf, "ziggy", "age", "8", 1);
    s.put(ks, cf, "dotty", "age", "4", 1);
    Thread.sleep(2000);
    Assert.assertEquals(1, s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getMemtableFlusher().getFlushCount());
    x = s.get(ks, cf, "jack", "age");
    Assert.assertEquals("6", x.getValue());
  } 

We are getting closer and closer to being fully functional. There are two big pieces left: Currently we are writing files to disk, but to run continuously we will need to merge these tables together. This process is known as compaction. After that a commit log will be needed to add durability.

This is getting exciting!

Wednesday Dec 10, 2014

Key Cache - Building a column family store - Part 3

In my last entry we discussed how to build a data structure similar to that of Cassandra's Index File. If you remember the Index Interval setting builds a structure that indexes every N keys. In a best case scenario a look up may hit this index, but in a worse case scenario our linear search may have had to page through (Index Interval - 1) rows to locate data.

In one of my unit tests I captured the effect that this has:

    {
      long x = System.currentTimeMillis();
      for (int i = 0 ; i < 50000 ; i++) {
        Assert.assertEquals("c", s.get("00001", "column2").getValue());
      }
      System.out.println("index match " + (System.currentTimeMillis() - x));
    }
    {
      long x = System.currentTimeMillis();
      for (int i = 0 ; i < 50000 ; i++) {
        Assert.assertEquals("c", s.get("08999", "column2").getValue());
      }
      System.out.println("far from index " +(System.currentTimeMillis() - x));
    }
We can see the performance difference between a perfect index hit to a key far from the index:
Test folder: /tmp/junit4018208685678427527
index match 1088
far from index 11166

Note: Our SStable format and decision to use String (and not ByteBuffer) means there are several micro-optimizations that can be done.

If the requests have a normal distribution the difference in access speed may not be noticeable. Still, it seems unfair that some key requests will always be faster then others.

Key Cache to the rescue!

The Key Cache is like the Index Interval in that it helps avoid seeking for keys. It works by storing the offset of frequently used keys in memory. The less random the key access is the more benefit this type of caching delivers. Also like the Index format we can store a large cache in a small amount of storage.

Let's build our own Key Cache and add it to our increasingly more sophisticated Column Family store. There is nothing spectacular being done here. We are wrapping a guava cache that will handle eviction with a small facade.

package io.teknek.nibiru.engine;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

public class KeyCache {
private Cache<String,Long> lc;
public KeyCache(int cacheSize){
lc = CacheBuilder.newBuilder().maximumSize(cacheSize).recordStats().build();
}

public void put(String key, long value){
lc.put(key, value);
}

public long get(String key) {
long res = -1;
Long l = lc.getIfPresent(key);
if (l != null) {
res = l.longValue();
}
return res;
}
}

Note: We would be better off using something like this library that does not have to box and un-box primitives, but again our goal is to show relative performance speed up from the Key Cache.

Now we only need to make a few small changes to our sstable reader.

-    bg.setStartOffset((int)index.findStartOffset(row));
+ long startOffset = keyCache.get(row);
+ if (startOffset == -1){
+ startOffset = index.findStartOffset(row);
+ }
+ bg.setStartOffset((int)startOffset);
String searchToken = row;//this is not correct
do {
if (bg.dst[bg.currentIndex] == END_ROW){
bg.advanceIndex();
}
+ long startOfRow = bg.mbb.position() - bg.getBlockSize() + bg.currentIndex;
readHeader(bg);
StringBuilder token = readToken(bg);
if (token.toString().equals(searchToken)){
StringBuilder rowkey = readRowkey(bg);
if (rowkey.toString().equals(row)){
+ keyCache.put(row, startOfRow);

We can see the difference if we re run our tests:

Test folder: /tmp/junit6910931010171014079
index match 840
far from index 171
index match 219
far from index 137

Notice here that I ran the tests multiple times. This is because the Java virtual machine is much like an old car. You do not drive it fast right away; you let it sit in your driveway and warm up. :) Just kidding, the reality is with Just In Time compilation and multiple garbage collection threads timing code exactly can be difficult.

Even though the numbers fluctuate, we see the Key Cache has given us much better performance by helping us get data without having to seek on disk (memory mapped disk) as much. We went from 11 seconds for 50,000 look ups to around 200ms!

Awesome! What is next? Bloom filters, or Row Cache? Maybe start building a system to compact multiple SSTables together? Stay tuned.


Sunday Dec 07, 2014

Index Files - Building a column store - Part 2

In a previous blog post I demonstrated how to build an SSTable. For reference the sstable data on disk looks like this:

^@00000^A00000^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc
^@00001^A00001^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc
^@00002^A00002^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc

Another way to visualize the data:

Token(rowkey)
rowkey
columns
0000100001
name value time
 column2 c1417966551490
 column3 c1417966551490

Side note: In this case the token and rowkey are the same because we are using a natural partitioner that does not hash the key.

An astute reader of the last blog may have noticed that even though are tables are sorted by token and our get (key, column) operation is searching via token, we were using an inefficient linear search. One way to optimize this type of search is to use a binary search. However this is not the primary way Cassandra optimizes the problem.

Enter IndexInterval

The concept of IndexInterval is that every Nth rowkey we record an index of that row key and the byte offset the row key exists at in the file. In this way we build a relatively small fast lookup table of rowkeys. Even if the row key is not found in the index the database should get close to the rowkey and only have to page at most index-interval-1 keys.

Depending on the use case small rows vs larger rows a larger or smaller index size might be right for you.

Writing our own Index

Since cassandra's data files are write-once this make the process straight forward: As we are flushing our memtable to an sstable we also record the byte offset of every Nth row in a separate file.

I was not aware of any output stream that could count it's position and I was not wanting to track it out of band so I whipped up an output stream that counted as it outputted.

package io.teknek.nibiru.engine;
import java.io.BufferedOutputStream;

public class CountingBufferedOutputStream extends BufferedOutputStream {
  private long writtenOffset;
  public CountingBufferedOutputStream(OutputStream out) {
    super(out);
  }
  public synchronized void writeAndCount(int b) throws IOException {
    super.write(b);
    writtenOffset++;
  }
  public void writeAndCount(byte[] b) throws IOException {
    super.write(b);
    writtenOffset += b.length;
  }
  public long getWrittenOffset() {
    return writtenOffset;
  }
}

I could have buried the index creating logic inside the sstable writing logic, but I decided to use OOD and design a separate class to write index files rather than intermingle the logic of index writing with sstable writing.

public class IndexWriter {

  private final String id;
  private final Configuration conf;
  private BufferedOutputStream indexStream;
  private long rowkeyCount;
 
  public IndexWriter(String id, Configuration conf){
    this.id = id;
    this.conf = conf;
  }
 
  public void open() throws FileNotFoundException {
    File indexFile = new File(conf.getSstableDirectory(), id + ".index");
    indexStream = new CountingBufferedOutputStream(new FileOutputStream(indexFile));
  }
 
  public void handleRow(long startOfRecord, String token) throws IOException {
    if (rowkeyCount++ % conf.getIndexInterval() == 0){
      indexStream.write(SSTable.START_RECORD);
      indexStream.write(token.getBytes());
      indexStream.write(SSTable.END_TOKEN);
      indexStream.write(String.valueOf(startOfRecord).getBytes());
      indexStream.write(SSTable.END_ROW);
    }
  }
 
  public void close () throws IOException {
    indexStream.close();
  }
}

Now we make some minor changes to the sstable writer to also write the index as we write the sstable.

 public void flushToDisk(String id, Configuration conf, Memtable m) throws IOException{
    File sstableFile = new File(conf.getSstableDirectory(), id + ".ss");
    CountingBufferedOutputStream ssOutputStream = null;
    IndexWriter indexWriter = new IndexWriter(id, conf);

    try {
      ssOutputStream = new CountingBufferedOutputStream(new FileOutputStream(sstableFile));
      indexWriter.open();
      for (Entry<Token, ConcurrentSkipListMap<String, Val>> i : m.getData().entrySet()){
        long startOfRecord = ssOutputStream.getWrittenOffset();
        ssOutputStream.writeAndCount(START_RECORD);
        ssOutputStream.writeAndCount(i.getKey().getToken().getBytes());
        ssOutputStream.writeAndCount(END_TOKEN);
        ssOutputStream.writeAndCount(i.getKey().getRowkey().getBytes());
        ssOutputStream.writeAndCount(END_ROWKEY);
        indexWriter.handleRow(startOfRecord, i.getKey().getToken());
        boolean writeJoin = false;

When we write a sstable with rows 00000 to 10000 the following following index is produced:

^@00000^A0
^@01000^A69000
^@02000^A138000
^@03000^A207000
^@04000^A276000
^@05000^A345000
^@06000^A414000
^@07000^A483000
^@08000^A552000
^@09000^A621000
 

Sweet! Now let's think about the use case of this index. It needs to provide one function: Given a rowkey return the byte offset for the sstable to begin searching at.

 public long findStartOffset(String token)

Here is most of the implementation:

  public long findStartOffset(String token) throws IOException {
    long offset = 0;
    do {
      if (bgIndex.dst[bgIndex.currentIndex] == SSTable.END_ROW) {
        bgIndex.advanceIndex();
      }
      readHeader(bgIndex);
      StringBuilder readToken = readToken(bgIndex);
      long thisOffset = readIndexSize(bgIndex);
      if(readToken.toString().equals(token)){
        return thisOffset;
      } else if (readToken.toString().compareTo(token) > 0) {
        return offset;
      } else {
        offset = thisOffset;
      }
    } while (bgIndex.currentIndex < bgIndex.dst.length - 1 || bgIndex.mbb.position()  < bgIndex.channel.size());
    return offset;
  }

Our sstable read only need a single change:

  public Val get (String row, String column) throws IOException{
    ...
    BufferGroup bg = new BufferGroup();
    bg.channel = ssChannel;
    bg.mbb = (MappedByteBuffer) ssBuffer.duplicate();
//bg.setStartOffset(0);
    bg.setStartOffset((int)index.findStartOffset(row));

With this change we start reading from a close offset rather than the beginning of the file. If we get lucky 1/1000 the index might help us even seek directly to the row!

What is next? I do not know maybe we can made the index and stable searching do a binary search. Maybe make bloom filters or even a key cache!

Thursday Dec 04, 2014

Building a Column Family store for fun and enjoyment

Recently I thought I would have some fun. What is my definition of fun? I decided to take on building my own Column Family data store. I decided to start with making a small server and add features one by one. I started with memtables, , but I decided it was time to up my game.

Cassandra writes go to memtables, eventually these memtables flush to disk as sstables. SS stands for Sorted String. Besides the sorted nature of the SSTables Cassandra uses Bloom Filters, and Indexes (every N rows) to make searches for keys efficient.

To build an SSTable I have been playing around with memory mapped byte buffers and file channels. The first step is turning a memtable (a Map<String,Map<String,Column>>) into an sstable.

This is not the most efficient design in terms of storing numbers as string in some cases but I can always evolve this later.

public void flushToDisk(String id, Configuration conf, Memtable m) throws IOException{
    File f = new File(conf.getSstableDirectory(), id + ".ss");
    OutputStream output = null;
    try {
      output = new BufferedOutputStream(new FileOutputStream(f));
      for (Entry<Token, ConcurrentSkipListMap<String, Val>> i : m.getData().entrySet()){
        output.write(START_RECORD);
        output.write(i.getKey().getToken().getBytes());
        output.write(END_TOKEN);
        output.write(i.getKey().getRowkey().getBytes());
        output.write(END_ROWKEY);
        boolean first = true;
        for (Entry<String, Val> j : i.getValue().entrySet()){
          if (!first){
            output.write(END_COLUMN);
            first = false;
          }
          output.write(j.getKey().getBytes());
          output.write(END_COLUMN_PART);
          output.write(String.valueOf(j.getValue().getCreateTime()).getBytes());
          output.write(END_COLUMN_PART);
          output.write(String.valueOf(j.getValue().getTime()).getBytes());
          output.write(END_COLUMN_PART);
          output.write(String.valueOf(j.getValue().getTtl()).getBytes());
          output.write(END_COLUMN_PART);
          output.write(String.valueOf(j.getValue().getValue()).getBytes());
        }
        output.write('\n');
      }
    }
    finally {
      output.close();
    }
  }

Cool we can write a memtable to disk! Next step is reading it.

  private RandomAccessFile raf;
  private FileChannel channel;
 
  public void open(String id, Configuration conf) throws IOException {
    File sstable = new File(conf.getSstableDirectory(), id + ".ss");
    raf = new RandomAccessFile(sstable, "r");
    channel = raf.getChannel();
  }

I am not going to share all the code to read through the data. It is somewhat tricky, I do not write stuff like this often, so I am sure I could do a better job looking at some other data stores and following what they do. This piece can always be evolved later.

I am proud of this class in particular. This BufferGroup makes it easier to navigate the file channel and buffer and read in blocks while iterating byte by byte.

public class BufferGroup {
  private int blockSize = 1024;
  byte [] dst = new byte [blockSize];
  int startOffset = 0;
  int currentIndex = 0;
  FileChannel channel;
  MappedByteBuffer mbb;
 
  public BufferGroup(){}
 
  void read() throws IOException{
    if (channel.size() - startOffset < blockSize){
      blockSize = (int) (channel.size() - startOffset);
      dst = new byte[blockSize];
    }
    mbb.get(dst, startOffset, blockSize);
    currentIndex = 0;
  }
 
  void advanceIndex() throws IOException{
    currentIndex++;
    if (currentIndex == blockSize){
      read();
    }
  }
}

Write to a memtable, flush it to disk, load the disk back into memory as a memory mapped buffer, and use that buffer for reads. Here is what it looks like when you put it all together.

   public void aTest() throws IOException{
    File tempFolder = testFolder.newFolder("sstable");
    System.out.println("Test folder: " + testFolder.getRoot());
    Configuration configuration = new Configuration();
    configuration.setSstableDirectory(tempFolder);
    Memtable m = new Memtable();
    Keyspace ks1 = MemtableTest.keyspaceWithNaturalPartitioner();
    m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2", "c", 1, 0L);
    Assert.assertEquals("c", m.get(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2").getValue());
    m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2", "d", 2, 0L);
    SSTable s = new SSTable();
    s.flushToDisk("1", configuration, m);
    s.open("1", configuration);
    long x = System.currentTimeMillis();
    for (int i = 0 ; i < 50000 ; i++) {
      Assert.assertEquals("d", s.get("row1", "column2").getValue());
    }
    System.out.println((System.currentTimeMillis() - x));
  }

I am pretty excited. I am not sure what I will do next indexes, bloom filter, commit log?




Tuesday Oct 21, 2014

Cassandra....bootstrapping

Would you expect apache-cassandra-2.0.10.jar  to work out of the box....


 INFO 19:39:44,690 Compacted 4 sstables to [/media/ephemeral0/cassandra/data/system/schema_columnfamilies/system-schema_columnfamilies-jb-9,].  9,410 bytes to 5,949 (~63% of original) in 41ms = 0.138376MB/s.  5 total partitions merged to 2.  Partition merge counts were {1:1, 4:1, }
 INFO 19:39:44,728 Compacted 4 sstables to [/media/ephemeral0/cassandra/data/system/schema_columns/system-schema_columns-jb-9,].  14,324 bytes to 10,549 (~73% of original) in 48ms = 0.209590MB/s.  5 total partitions merged to 2.  Partition merge counts were {1:1, 4:1, }
 INFO 19:40:14,566 JOINING: Starting to bootstrap...
 INFO 19:40:14,743 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Executing streaming plan for Bootstrap
 INFO 19:40:14,744 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Beginning stream session with /10.9.49.159
 INFO 19:40:14,744 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Beginning stream session with /10.9.52.15
 INFO 19:40:14,745 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Beginning stream session with /10.9.55.58
 INFO 19:40:14,745 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Beginning stream session with /10.9.57.215
 INFO 19:40:14,746 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Beginning stream session with /10.9.49.13
 INFO 19:40:14,821 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Session with /10.9.55.58 is complete
 INFO 19:40:14,820 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Session with /10.9.49.13 is complete
 INFO 19:40:14,821 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Session with /10.9.52.15 is complete
 INFO 19:40:14,854 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Session with /10.9.49.159 is complete
 INFO 19:40:14,860 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Session with /10.9.57.215 is complete
 WARN 19:40:14,862 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Stream failed
ERROR 19:40:14,864 Exception encountered during startup
java.lang.RuntimeException: Error during boostrap: Stream failed
    at org.apache.cassandra.dht.BootStrapper.bootstrap(BootStrapper.java:86)
    at org.apache.cassandra.service.StorageService.bootstrap(StorageService.java:994)
    at org.apache.cassandra.service.StorageService.joinTokenRing(StorageService.java:797)
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:612)
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:502)
    at org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:378)
    at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:496)
    at org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:585)
Caused by: org.apache.cassandra.streaming.StreamException: Stream failed
    at org.apache.cassandra.streaming.management.StreamEventJMXNotifier.onFailure(StreamEventJMXNotifier.java:85)
    at com.google.common.util.concurrent.Futures$4.run(Futures.java:1160)
    at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
    at com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
    at com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:202)
    at org.apache.cassandra.streaming.StreamResultFuture.maybeComplete(StreamResultFuture.java:216)
    at org.apache.cassandra.streaming.StreamResultFuture.handleSessionComplete(StreamResultFuture.java:191)
    at org.apache.cassandra.streaming.StreamSession.closeSession(StreamSession.java:364)
    at org.apache.cassandra.streaming.StreamSession.sessionFailed(StreamSession.java:569)
    at org.apache.cassandra.streaming.StreamSession.messageReceived(StreamSession.java:424)
    at org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
    at java.lang.Thread.run(Thread.java:744)
java.lang.RuntimeException: Error during boostrap: Stream failed
    at org.apache.cassandra.dht.BootStrapper.bootstrap(BootStrapper.java:86)
    at org.apache.cassandra.service.StorageService.bootstrap(StorageService.java:994)
    at org.apache.cassandra.service.StorageService.joinTokenRing(StorageService.java:797)
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:612)
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:502)
    at org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:378)
    at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:496)
    at org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:585)
Caused by: org.apache.cassandra.streaming.StreamException: Stream failed
    at org.apache.cassandra.streaming.management.StreamEventJMXNotifier.onFailure(StreamEventJMXNotifier.java:85)
    at com.google.common.util.concurrent.Futures$4.run(Futures.java:1160)
    at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
    at com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
    at com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:202)
    at org.apache.cassandra.streaming.StreamResultFuture.maybeComplete(StreamResultFuture.java:216)
    at org.apache.cassandra.streaming.StreamResultFuture.handleSessionComplete(StreamResultFuture.java:191)
    at org.apache.cassandra.streaming.StreamSession.closeSession(StreamSession.java:364)
    at org.apache.cassandra.streaming.StreamSession.sessionFailed(StreamSession.java:569)
    at org.apache.cassandra.streaming.StreamSession.messageReceived(StreamSession.java:424)
    at org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
    at java.lang.Thread.run(Thread.java:744)
Exception encountered during startup: Error during boostrap: Stream failed
 INFO 19:40:14,867 Announcing shutdown
 INFO 19:40:16,868 Waiting for messaging service to quiesce
 INFO 19:40:16,869 MessagingService has terminated the accept() thread

Of course not.

Sunday Aug 31, 2014

Stream Processing, Counting the things that are counting things

One of the qualities of good software is visibility. For me to provide users statistics on how the site is performing, data moves through a series of systems. Browser events fire from Javascript which are received by web servers, to be placed on a message queue. A process will read data from a message queue and write this data to a hadoop file system. Another process needs to consume this data and apply some streaming transformation before writing to a NoSQL database.

Imagine a system without enough visibility, a user might send you an email that stating "It looks like the count of hits to this page for today are off. There should be more hits." If you have no visibility into the system a request like this can turn out to be a nightmare. The data could be getting lost anywhere, in your custom software, in open source software, even in commercial software! The other possibility the person reporting the issue is just wrong. That happens to! Without visibility it is hard to say what is wrong, maybe the NoSQL database is dropping messages, or maybe it is your code? You just have to take a shot in the dark and start somewhere, maybe you pick the right component and find a bug, maybe you spend two weeks and find nothing wrong.

Now, imagine a system with enough visibility.  You would look at some graphs your software is maintaining and determine that "The number of messages sent to our NoSQL system is close (hopefully exact :) to the number of raw messages we received into our message queue". You could even go a step further and attempt to create pre-emptive alerts based on what is normal message flow for this time of day and day of week, so if there is an issue you can hopefully notice it and fix it before a user becomes aware of a problem.

Some rules:

  1. Count things in and out of each system. Even if the correlation is not 1 to 1 some relationship should exist that will become apparent over time
  2. Record things that are dropped or cause exception, actively monitor so this number stays close to 0
  3. Go for low hanging fruit, do not try to build an overarching system round one. If a sprint builds or adds a feature find a way to monitor this new feature.
  4. Time things that could be orders of magnitudes long. Use histograms to time DB requests that involve reading disk, things that can have a high variance if load increases.

Getting it done

With our stream processing platform, teknek, I had been doing counters and timers on a case by case basis in user code. I decided to extend this into the framework itself so that users would get some a set of metrics for free. Users have the ability to add their own metrics easily. (We will show the code to add your own counters later in this article)

The de-facto standard metrics package for Java is the coda-hale library. Originally called "yammer-metrics" it provides counters, meters, histograms and other types. It has a clever way to do sampling similar to streaming quantiles, so that it can efficiently keep a 95th percentile measurement without having to save a large number of samples in memory. Really really cool.

For each "plan" in teknek we have a series of counters that record events inbounds, operator retries, time to process the event, and more. In the image below "shutup" :) is the name of the plan. Metrics are captured both globally and on a per thread basis.

 Sample of metrics provided

Every teknek "operator" in the plan has it's own set of metrics. For example, if the plan has three steps such as "read from kafka", "lowercase", "write to cassandra", "write to hbase" metrics are kept on these for free with no extra effort for the user.

The same metrics library is available to each the operators you are implementing so custom metrics can be added with ease.

package io.teknek.metric;
import io.teknek.model.ITuple;
import io.teknek.model.Operator;

public class OperatorWithMetrics extends Operator {
  @Override
  public void handleTuple(ITuple tuple) {
    getMetricRegistry().meter("a.b").mark();
    getMetricRegistry().meter(getPath() + ".processed").mark();
    getMetricRegistry().meter(getPath() + "." + this.getPartitionId() + ".processed").mark();
  }
}

The theme of this post is visibility, and having counters in JMX is one form of visibility.

"But come on playa! You gotta up your big data game!"

No problem! It turns out that there is already great support in coda-hale metrics to send those metrics directly to graphite. Thus all the counters that you have in teknek are available in graphite with no extra effort. Graphite offers a number of ways to search group and make custom dashboards with this information.

Quick note: The coda-hale graphite reporter tends to send too many counters to graphite. For example it sends 50th,95th,99th,999th etc to graphite which generally more information then you need. Take a look at my graphite package which does a lot to trim down the metrics sent, adds host name, cluster name, and overall streamlines the process and configuration.

Conclusion

Build monitoring up front, make it a party of your definition of done. Good monitoring makes it easier to trouble shoot. It also makes it easier to be confident in beta testing or after releasing a new version of your software. With a new release old metrics should stay near there pre-release values and you can use the new metrics to reason that new features are working correctly in production.

The new features to teknek discussed in this post were incorporated in this pull request, and should appear in the 0.0.7 release.


Thursday Aug 21, 2014

CQL, did you just tell me to fck myself?

Last night decided to give CQL another chance. After about 20 minutes of hacking at a 1 row table I pretty much hit every caveat and error message possible in my quest to get some result that was not SELECT *. The query language is a minefield of things you CAN'T do!

cqlsh:test>  select * from stats where year='54'  and tags='bla' order by ycount ALLOW filtering;
Bad Request: ORDER BY with 2ndary indexes is not supported.

cqlsh:test> select * from stats where year='54' and ycount >  10  order by ycount ALLOW filtering;
Bad Request: No indexed columns present in by-columns clause with Equal operator

cqlsh:test> select * from stats where year='54' and ycount >  10 and tags='bla';
Bad Request: Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING

cqlsh:test> select * from stats where year='54' and ycount >  0  allow filtering;
Bad Request: No indexed columns present in by-columns clause with Equal operator
 

http://b.z19r.com/post/did-you-just-tell-me-to-go-fuck-myself

Sunday Aug 10, 2014

Why I am reverting from Java to C++

For a long time java has been good to me. I know these days everyone is looking to move on to maybe scala or closure or whatever, but I am actually looking the other way. Java has quite a large number of problems that are bubbling below the surface. Where to start...

Ok go back to 1995 when java was like an upstart. The going logic was that c++ pointers were "TOO COMPLICATED FOR PROGRAMMERS"... Fast forward to 2014 and even cell phone languages have pointers. So how is it that a lowly cell phone programmer can understand pointers but a server side coder thinks they are "too complicated"?

Ok but lets talk of the big ugly problem...GC and memory. I have been supporting a number of Java projects for a long time and the major performance bottleneck is always Garbage Collection. Now listen, regardless of what anyone tells you, there is NO way to tune away all the garbage collection issues. At some point if your application moves enough data the JVM will pause.

JVMs don't get much bigger then 10 GB of memory before its best performing GC algorithm CMS fall apart. I have seen it in batch processing systems like Hadoop and Hive, I have seen it in Hbase, I have seen it in cassandra. Want to run a database with a really fast fusion IO SSD under high load? CPU bottlenecks with GC before the disk runs out of IO. G1 the garbage collector that was supposed be an answer for these large heaps, it seems to be a large failure.

Many projects that require decent performance are doing some combination of java and off-heap memory. This I do not get. At the point where you start doing things off-heap you are basically start giving up everything Java provides for you. (thread safety, debugging) Besides the fact that it makes debugging harder, it still has more overhead than native code.

In many cases causes random corruptions due to library developers not actually writing these systems correctly. Followed by embarrassing statements like "Sorry our really smart really efficient off-heap thing x was fucking up for three versions and we just figured it out."

Let's talk more about memory. Java is just plain pig-ish with memory. Java objects have a number of bytes of overhead and an object is just way bigger in java than c++. "Enterprise" libraries are so big and bloated, I find myself having to tweak my eclipse JVM settings just so that I CAN DEVELOP java apps.

You can not even allocate a damn object on the stack, Java forces you to put it in heap. WTF?

Every enterprise java application seems to need about 512 MB of ram just to start up and about 2GB of overhead to run under light load...Hello cloud...No micro instances for me! Every hello world needs 4GB heap.

Back in 2007 a server machine had maybe 4GB memory... So no big deal that Java VM gets pause-ish with 13 GB heap....But now in 2014 I can get a server from amazon with 222GB ram. Machines with 1TB are around the corner, when i have a big-data application and I going to have to run 100-1000 shard-ed copies of a program on a single machine so I can simply just address the memory?

So I have started going backwards. Writing programs in c++. Falling in love with template functions and finding them more powerful then java's generics. Using lambdas in c++11 and saying, "what is the big deal with scala?". Using smart pointers in boost when I need to, freeing memory by hand when I do not.

Feels good , feels great. Feels great to run a program that only uses 4K of memory that starts up in .0000001 seconds. "Did that run? Yes it did run and its already finished!"

Saturday Jul 19, 2014

Travis CI is awesome!

https://travis-ci.org/edwardcapriolo/teknek-core

Travis CI is awesome... That is all.

Thursday Jul 03, 2014

MapReduce on Cassandra's sstable2json backups

I was talking to a buddy about having nothing to do today. He said to me, "You know what would be awesome? We have all these sstable2json files in s3 and it would be cool if we could map reduce them."

For those not familiar sstable2json makes files like this:

[
{"key": "62736d697468","columns": [["6c6173746e616d65","736d697468",1404396845806000]]},
{"key": "6563617072696f6c6f","columns": [["66697273746e616d65","656477617264",1404396708566000], ["6c6173746e616d65","63617072696f6c6f",1404396801537000]]}
]

Now. There already exists a json hive serde. https://github.com/rcongiu/Hive-JSON-Serde, however there is a small problem.

That serde expects data to look like this:

{}
{}

Not like this:

[
{},
{}
]

What is a player to do? Make a custom input format that is what:

The magic is in a little custom record reader that skips everything except what the json serde wants and trims trailing commas.

  @Override
  public synchronized boolean next(LongWritable arg0, Text line) throws IOException {
    boolean res = super.next(arg0, line);
    if (line.charAt(0) == '['){
      res = super.next(arg0, line);
    }
    if (line.charAt(0) == ']'){
      res = super.next(arg0, line);
    }
    if (line.getLength() > 0 && line.getBytes()[line.getLength()-1]==','){  
      line.set( line.getBytes(),0, line.getLength()-1);
    }
    if (res == false){
      return false;
    } 
    return res;
  }

Next, we create a table using the JSON serde and the input format from above.

hive> show create table json_test1;                                                         
OK
CREATE  TABLE json_test1(
  key string COMMENT 'from deserializer',
  columns array<array<string>> COMMENT 'from deserializer')

ROW FORMAT SERDE
  'org.openx.data.jsonserde.JsonSerDe'
STORED AS INPUTFORMAT
  'io.teknek.arizona.ssjsoninputformat.SSTable2JsonInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'file:/user/hive/warehouse/json_test1'
TBLPROPERTIES (
  'numPartitions'='0',
  'numFiles'='1',
  'transient_lastDdlTime'='1404408280',
  'numRows'='0',
  'totalSize'='249',
  'rawDataSize'='0')

When we use these together we get:

hive> SELECT key , col FROM json_test1 LATERAL VIEW explode (columns) colTable as col;
62736d697468    ["6c6173746e616d65","736d697468","1404396845806000"]
6563617072696f6c6f    ["66697273746e616d65","656477617264","1404396708566000"]
6563617072696f6c6f    ["6c6173746e616d65","63617072696f6c6f","1404396801537000"]
Time taken: 4.704 seconds, Fetched: 3 row(s)

Winning! Now there are some things to point out here:

  1. sstable2json with replication N is going get N duplicates that you will have to filter yourself. (maybe it would be nice to build a feature in sstable2json that only dumps the primary range of each node?)
  2. Your probably going to need a group and a window function to remove all but the last entry (dealing with overwrites and tombstones)

But whatever, I just started playing with this this morning. I do not have time to sort out all the details. (maybe you don't have updates and this is not a big deal for you).

Tuesday Jul 01, 2014

Next hadoop enterprise pissing match beginning

http://hortonworks.com/blog/cloudera-just-shoot-impala/

"I hold out hope that their interests in enabling Hive on Spark are genuine and not part of some broader aspirational marketing campaign laced with bombastic FUD."

I really think horton is being FUDLY here. Cloudera has had 1-2 people involved with the hive project for a while now. Maybe like 6+ years. Carl is the hive lead, previously he worked for Cloudera. Cloudera has 2 people now adding features one is Brock Noland who is doing an awesome job.

Hortonworks is relatively new to the hive project. 2-3 years tops? (not counting people who did work for hive before joining horton)

So, even though cloudera did build impala (and made some noise about it being better then hive), they have kept steady support on the hive project for a very long time.

Spark is just very buzzy now. Everyone wants to have it, or be involved with it, like "cloud", but spark is actually 3-4 years old right?

But it is really great to see spark. Everyone wants to have it, and the enterprise pissing matches are starting! Sit back and watch the fun! Low blows coming soon!

Previous pissing matches: 

  1. Who has the best hadoop distro?
  2. Who "leads" the community?
  3. Parquet vs ORC?
  4. Who got the "credit" for hadoop security and who did "all the work"



Monday Jun 23, 2014

Server Side Cassandra

http://www.xaprb.com/blog/2014/06/08/time-series-database-requirements/

"Ideally, the language and database should support server-side processing of at least the following, and probably much more"

A co-worker found this. I love it. Sounds JUST like what I am trying to implement in:

https://issues.apache.org/jira/browse/CASSANDRA-6704

and what we did implement in https://github.com/zznate/intravert-ug .

How does that saying go? First they ignore you...

 


Tuesday Jun 17, 2014

Cloudera desktop manager forces you to disable SELINUX

This is a very curious thing. When trying to install cdh I found that it forces me to disable SELINUX completely. I can understand why an installed would have problems, but why wont it allow me to do the install in 'permissive' mode? Then I would be able to see the warnings.

This is kinda ##SHADEY##. Normally I do not give a shit about selinux, but being forced to have it completely disabled?

Monday Jun 09, 2014

Wait...Say what.

http://www.datastax.com/dev/blog/4-simple-rules-when-using-the-datastax-drivers-for-cassandra

Cassandra’s storage engine is optimized to avoid storing unnecessary empty columns, but when using prepared statements those parameters that are not provided result in null values being passed to Cassandra (and thus tombstones being stored). Currently the only workaround for this scenario is to have a predefined set of prepared statement for the most common insert combinations and using normal statements for the more rare cases.

So what your saying is ... if I don't specify a column when I insert, I delete it?

Saturday May 17, 2014

The important lesson of functional programming

I wanted to point something out: Many times I hear people going on and on about functional programming, how java can't be good without function passing (functors), how lambda features are massively important, or ivory tower talk about how terrible the 'kingdom of nouns" is.

Let us look at Wikipedia's definition of functional programming.
----

In computer science, functional programming is a programming paradigm, a style of building the structure and elements of computer programs, that treats computation as the evaluation of mathematical functions and avoids state and mutable data. 

---

Though hipsters and 'kingdom of verb' fan boys will go on and on about lamdas, anonymous inner functions, and programs that have so many callbacks you need an api to un roll the callbacks into something readable,  the important part of functional programming (to me) is avoiding state and mutable data, and you can leverage that concept from any language that has a method (function)!

Removing state has big benefits. One is repeatability this brings testability. I enjoy writing code that is easily testable without mocking or a writing large test harness.

Here is an example. I am currently working on a teknek feature to coordinate how many instances of a process run on a cluster of nodes. At first you may think this problem is not a functional problem, because depends on the state of local threads, as well as a cluster state that is stored in zookeeper. Let's look at an implementation:

---

  private boolean alreadyAtMaxWorkersPerNode(Plan plan){
List<String> workerUuids = null;
try {
workerUuids = WorkerDao.findWorkersWorkingOnPlan(zk, plan);
} catch (WorkerDaoExecption ex) {
return true;
}
    if (plan.getMaxWorkersPerNode() == 0){
      return false;
    }
    int numberOfWorkersRunningInDaemon = 0;
    List<Worker> workingOnPlan = workerThreads.get(plan);
    if (workingOnPlan == null){
      return false;
    }
    for (Worker worker: workingOnPlan){
      if (worker.getMyId().toString().equals(workerUuids)){
        numberOfWorkersRunningInDaemon++;
      }
    }
    if (numberOfWorkersRunningInDaemon >= plan.getMaxWorkersPerNode()){
      return true;
    } else {
      return false;
    }
  }

---

Worker threads is a member variable, another method uses a data access object, and the method is called from 'deep' inside a stateful application.

There is a simple way to develop this feature and still have great test coverage. Eliminate state! Functional Programming! Write methods that are functional, methods that return the same output always based on inputs.

Let's pull everything not functional out of the method and see what marvellous things this does for us!

---

  @VisibleForTesting
  boolean alreadyAtMaxWorkersPerNode(Plan plan, List<String> workerUuids, List<Worker> workingOnPlan){
    if (plan.getMaxWorkersPerNode() == 0){
      return false;
    }
    int numberOfWorkersRunningInDaemon = 0;
    if (workingOnPlan == null){
      return false;
    }
    for (Worker worker: workingOnPlan){
      if (worker.getMyId().toString().equals(workerUuids)){
        numberOfWorkersRunningInDaemon++;
      }
    }
    if (numberOfWorkersRunningInDaemon >= plan.getMaxWorkersPerNode()){
      return true;
    } else {
      return false;
    }
  }

---

Look Ma! No state! All the state is in the caller!

---

  private void considerStarting(String child){
    Plan plan = null;
    List<String> workerUuidsWorkingOnPlan = null;
    try {
      plan = WorkerDao.findPlanByName(zk, child);
      workerUuidsWorkingOnPlan = WorkerDao.findWorkersWorkingOnPlan(zk, plan);
    } catch (WorkerDaoException e) {
      logger.error(e);
      return;
    }
    if (alreadyAtMaxWorkersPerNode(plan, workerUuidsWorkingOnPlan, workerThreads.get(plan))){
      return;
    }

---

Why is removing state awesome? For one it makes Test Driven Development easy. Hitting this condition with an integration test is possible but it involves a lot of effort and hard to coordinate timing. Since we removed the state look how straight forward the test is.

---

  @Test
  public void maxWorkerTest(){
    Plan aPlan = new Plan().withMaxWorkersPerNode(0).withMaxWorkers(2);
    Worker workingOn1 = new Worker(aPlan, null, null);
    Worker workingOn2 = new Worker(aPlan, null, null);
    List<String> workerIds = Arrays.asList(workingOn1.getMyId().toString(), workingOn2.getMyId().toString());
    List<Worker> localWorkers = Arrays.asList(workingOn1,workingOn2);
    Assert.assertFalse(td.alreadyAtMaxWorkersPerNode(aPlan, workerIds, localWorkers));
    aPlan.setMaxWorkersPerNode(2);
    Assert.assertTrue(td.alreadyAtMaxWorkersPerNode(aPlan, workerIds, localWorkers));
  }

---

Holy crap! The bolded line failed the assert! Remember "testing reveals the presence of bugs not the absence". Bugs should be easy to find an fix now that the logic is not buried deep. In fact, I easily stepped this code and found out the problem.

---

 if (worker.getMyId().toString().equals(workerUuids)){

---

In Java is is not a syntax error to call String.equals(List). It always returns false! DOH. Without good testing we may not have even found this bug. Win. Lets fix that.

---

 for (Worker worker: workingOnPlan){
for (String uuid : workerUuids ) {
        if (worker.getMyId().toString().equals(uuid)){
          numberOfWorkersRunningInDaemon++;
        }
    }
 }

---

Now, lets use our friend cobertura to see what our coverage looks like. (If your not familiar with coverage tests get acquainted fast.  Cobertura is awesome sauce! It runs your tests and counts how many times each branch and line of code was hit! This way you see where you need to do more testing!)

[edward@jackintosh teknek-core]$ mvn cobertura:cobertura


Pretty good! We can see many of our cases are covered and we can write a few more tests to reach 100%. That is just academic at this point. Anyway tests are great. I think of tests like tripwire against future changes, and assurance that the project does what it advertises.

Anyway the big take away is functional programming is possible from a "non functional" language. Functional programming makes it easy to build and tests applications. And as always, anyone that does not write tests should be taken out back and beaten with a hose.

For those interested you can see the entire commit here.

Calendar

Feeds

Search

Links

Navigation