Edward Capriolo

Monday Jan 26, 2015

ConvoToMetaException


https://twitter.com/tupshin/status/559833624913379330

Good: Good to have a storage format that matches what the database does.

C* has come a long way over the past few years, and unfortunately our storage format hasn't kept pace with the data models we are now encouraging people to utilise.

Bad! Why people encouraged to utilize database wrong?

??And historically, "columnar oltp sql data store" has not really been a thing.

Bad/Confused? is Cassandra a columnar oltp data store? is it trying to become one?

I see it as specialization for special cases.

Good!

but:

https://issues.apache.org/jira/browse/CASSANDRA-2995

  • deal with potential use cases where maybe the current sstables are not the best fit
  • allow several types of internal storage formats (at the same time) optimized for different data types

I understand the "wouldn't it be cool" factor but let's be clear: Cassandra is not a research-paper creation engine. Pluggability here improves generality at the cost of a substantial increase of implementation complexity, QA work, and cognitive burden on users. This is a bad direction for the project.

Bad: Basically it got hand waved, the concept of making a special format or letting the database be pluggable in any way. Got shot down even though people were interested.

That all being said.

Cassandra has evolved/is evolving into a quasi colummnar, kinda fixed schema, around custom query language database. As such it only makes sense that it have a quasi/columnar, kinda fixed schema friendly sstable format (not saying that jokiningly).

Also if it works just as well in the general case, that is good as well. Assuming it does not take 4 major versions to become stable, and in the process cause multiple heart aches, along the lines of broken streaming, lost data shit.

Request Routing - Building a NoSQL Store - Part 9

This blog series is trucking along. What fun! I decided to switch the titles from 'Building a Column Family store' to 'building a NoSQL store' because we are going far beyond demonstrating a Column Family, store since a Column Family is only an on-disk representation of data.

In our last blog we used a gossiper for cluster membership and showed how to replicating meta-data requests across the cluster. Now, we will focus on implementing plugable request routing. Unless you have been living under a rock you are probably aware of a few things like the CAP theorem and Dynamo Style consistent hashing. Stepping back a bit, a goal of Nimbiru is to support plugable everything, and request routing is no exception. For example, a user may wish a system where the node they send the operation to is the node where data is stored, they may wish a system like Cassandra where data is stored on multiple cluster nodes using consistent hashing, or something else entirely.


To make things plugable we start with a router interface.

public interface Router {

/**
* Determine which hosts a message can be sent to. (in the future keyspace should hold a node list)
* @param message the message sent from the user
* @param local the uuid of this server
* @param requestKeyspace the keyspace/database of the request
* @param ClusterMembership an interface for live/dead cluster nodes
* @param token a token (typically a hash) computed from the rowkey of the message
* @return all hosts a given request can be routed to
*/
public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace, ClusterMembership clusterMembership, Token token);

}

Wow! This method has a lot of arguments! Haha. Some of the parameters (clusterMembership and ServerId) probably could have been constructor injected, and token and message are a redundant (because you can determine the other) but for now using a pure interface seemed attractive (since everyone hates on abstract classes these days). Not all routers need each of these things.

I mentioned two cases. Lets start with an easy case store and read data locally only. 

public class LocalRouter implements Router{

@Override
public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
Destination d = new Destination();
d.setDestinationId(local.getU().toString());
return Arrays.asList(d);
}

}

This is fairly easy to reason about. For any request the answer is found here! This is basically a do-it-yourself (outside of nibiru) sharding. It also makes sense for a single node deployment.

Now it gets interesting

The dynamo style request routing is more involved. Even thought it has been explained many times I will offer my own quick description.

At first do not think ring, instead think of hashing a key into a flat fixed sized array.

10 MOD 7
[][][][10][][][]

If you have replication you would also write that next slot. Imagine we had replication 3.

10 MOD 7. Replication 3
[][][][10][10][10][]

What happens if the data hashes to the last element? Think of the array as a ring and the replication will wrap-around.

6 MOD 7. Replication 3
[6][6][][][][][6]

Implementing this is slightly different. We do not use array indexes. Instead we consider that each node has a token that represents it's position in the ring.

The token router needs to be aware of two facts, a Replication Factor (RF) that describes how many copies of data, and a token map that describes the splits.

Algorithm
1. Find the first destination in the sorted map
2. while < replication factor add the next destination (wrap around if need be)

public class TokenRouter implements Router {

public static final String TOKEN_MAP_KEY = "token_map";
public static final String REPLICATION_FACTOR = "replication_factor";

@Override
public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
//TODO this is not efficient we should cache this or refactor
Map<String, String> tokenMap1 = (Map<String, String>) requestKeyspace
.getKeyspaceMetadata().getProperties().get(TOKEN_MAP_KEY);
TreeMap<String,String> tokenMap = new TreeMap<String,String>(tokenMap1);
Integer replicationFactor = (Integer) requestKeyspace
.getKeyspaceMetadata().getProperties().get(REPLICATION_FACTOR);
int rf = 1;
if (replicationFactor != null){
rf = replicationFactor;
}
if (rf > tokenMap.size()) {
throw new IllegalArgumentException(
"Replication factor specified was larger than token map size");
}
List<Destination> destinations = new ArrayList<Destination>();

Map.Entry<String,String> ceilingEntry;
ceilingEntry = tokenMap.ceilingEntry(token.getToken());//1
if (ceilingEntry == null){
ceilingEntry = tokenMap.firstEntry();//1

}
destinations.add(new Destination(ceilingEntry.getValue()));
for (int i = 1; i < rf; i++) {
ceilingEntry = tokenMap.higherEntry(ceilingEntry.getKey());//2
if (ceilingEntry == null) {
ceilingEntry = tokenMap.firstEntry();//2
}
destinations.add(new Destination(ceilingEntry.getValue()));
}
return destinations;
}

If that code is making your head buzz the unit test may explain it better. One of the really great parts of Nibiru is that the decision to make everything plugable makes testing and developing easier.

We can mock the cluster membership to provide a 3 node cluster that is always 'alive'.

  private ClusterMembership threeLiveNodes(){
ClusterMembership mock = new ClusterMembership(null, null) {
public void init() {}
public void shutdown() {}
public List<ClusterMember> getLiveMembers() {
return Arrays.asList( new ClusterMember("127.0.0.1", 2000, 1, "id1"),
new ClusterMember("127.0.0.2", 2000, 1, "id2"),
new ClusterMember("127.0.0.3", 2000, 1, "id3"));
}
public List<ClusterMember> getDeadMembers() { return null; }};
return mock;
}
}


Next we give each node a single token.

  public static TreeMap<String,String> threeNodeRing(){
TreeMap<String,String> tokenMap = new TreeMap<>();
tokenMap.put("c", "id1");
tokenMap.put("h", "id2");
tokenMap.put("r", "id3");
return tokenMap;
}

Now we can throw some data at it and see how the router routes it. Notice here that the "z" key "wraps" around to the first node.

  @Test
public void test(){
TokenRouter token = new TokenRouter();
Keyspace keyspace = new Keyspace(new Configuration());
KeyspaceMetaData meta = new KeyspaceMetaData();
Map<String,Object> props = new HashMap<>();
props.put(TokenRouter.TOKEN_MAP_KEY, threeNodeRing());
meta.setProperties(props);
keyspace.setKeyspaceMetadata(meta);
Partitioner p = new NaturalPartitioner();
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("a")).get(0).getDestinationId());
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("aa")).get(0).getDestinationId());
Assert.assertEquals("id2", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("h")).get(0).getDestinationId());
Assert.assertEquals("id3", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("i")).get(0).getDestinationId());
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("z")).get(0).getDestinationId());
}

Great success! Now try with a replication factor of 2!

  @Test
public void testWithReplicationFactor(){
TokenRouter token = new TokenRouter();
Keyspace keyspace = new Keyspace(new Configuration());
KeyspaceMetaData meta = new KeyspaceMetaData();
Map<String,Object> props = new HashMap<>();
props.put(TokenRouter.TOKEN_MAP_KEY, threeNodeRing());
props.put(TokenRouter.REPLICATION_FACTOR, 2);
meta.setProperties(props);
keyspace.setKeyspaceMetadata(meta);
Partitioner p = new NaturalPartitioner();
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("a")));
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("aa")));
Assert.assertEquals(Arrays.asList(new Destination("id2"), new Destination("id3")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("h")));
Assert.assertEquals(Arrays.asList(new Destination("id3"), new Destination("id1")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("i")));
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("z")));
}

Correctly the API returns two "natural endpoints" for the given token. Great Success!

Sweet! So I do not want to pat myself on the back much, but I have to take this opportunity to say I love the way Nibiru is shaping up. When I presented the idea of a completely plugable NoSql to a friend he did voice a concern that having 'plugable everything' could cause the system to have 'lots more bugs' because of all the potential use cases. I actually do not see it that way. I think that forcing as many components into interfaces and API's is making the process cleaner by having less use-case specific hacks and optimizations.

This is important if you think about it. Remember that both hbase and cassandra are data stores that were basically 'thrown away' by their creators, powerset and facebook respectively. (Yes they were both thrown away, abandoned and open sourced). IMHO both of them suffer by being build on specific assumptions for specific use cases. ( X is good and Y is bad because we value most X type rhetoric)

The next blog is going to show some already working code about coordinating requests against N nodes and building some of the plugable consistency on the read/write path. Futures, executors, and multi node failures coming, fun!

 


Monday Jan 19, 2015

Auto Clustering and gossip - Building a Column Family store - Part 8

Up until this point we have been building a single node NoSQL implementation. Though a NoSql can by definition be a single node system often times a NoSql data store is  a multi-node solution. Building a multiple node system has many challenges that a single node system does not. The designers need to consider how the CAP Theorem applies to the API they are attempting to implement. A goal of nibiru is to create a plug-able NoSql system build around APIs. Thus we can think of Cluster Membership as something plug-able.

Plug-able Cluster Membership

I have commonly seen a few systems for cluster membership:

  1. Static: Cluster membership is configuration based and there is no node discovery (e.g. zookeeper static list of peers)
  2. Master: Nodes register themselves with an entity (e.g HBase register servers register with a master)
  3. Peer-to-Peer: Nodes can auto-discover each other typically with one or more known contact points (e.g. Cassandra/Riak)
  4. active-passive: At any given time a single node is the master and typically there is an automatic/manual failover

Other components in the nosql stack will use the cluster information to route requests and place data. Lets shell out a base class for a ClusterMembership:

public abstract class ClusterMembership {

protected Configuration configuration;
protected ServerId serverId;

public ClusterMembership(Configuration configuration, ServerId serverId){
this.configuration = configuration;
this.serverId = serverId;
}

public abstract void init();
public abstract void shutdown();
public abstract List<ClusterMember> getLiveMembers();
public abstract List<ClusterMember> getDeadMembers();

}


public class ClusterMember {
private String host;
private int port;
private long heatbeat;
private String id;
...
}


Because I am from the Dynamo school of hard knocks, I decided to start off with method #3 and forked off a gossip implementation I found. Each nibiru node will create a node-id (uuid) on start up, get a seed-list from the configuration, and then attempt to join the cluster.

public class GossipClusterMembership extends ClusterMembership{

public static final String HOSTS = "gossip_hosts_list";
public static final String PORT = "gossip_port";

private GossipService gossipService;

public GossipClusterMembership(Configuration configuration, ServerId serverId) {
super(configuration, serverId);
}

@Override
public void init() {
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
GossipSettings settings = new GossipSettings();
List<String> hosts = null;
if (configuration.getClusterMembershipProperties() != null){
hosts = (List<String>) configuration.getClusterMembershipProperties().get(HOSTS);
} else {
hosts = new ArrayList<String>();
}
int port = 2000;
for (String host: hosts){
GossipMember g = new RemoteGossipMember(host, port, "");
startupMembers.add(g);
}
try {
gossipService = new GossipService(configuration.getTransportHost(), port, serverId.getU().toString(), LogLevel.DEBUG, startupMembers, settings);
} catch (UnknownHostException | InterruptedException e) {
throw new RuntimeException(e);
}
gossipService.start();
}

Now, we should be able to make a test and prove that N instances of Nibiru have locate each other via gossip. We use 127.0.0.1 as the seed-list and the other two nodes 127.0.0.2 and 127.0.0.3 should discover each other.

  @Test
public void letNodesDiscoverEachOther() throws InterruptedException, ClientException{
Server [] s = new Server[3];
{
Configuration conf = TestUtil.aBasicConfiguration(node1Folder);
Map<String,Object> clusterProperties = new HashMap<>();
conf.setClusterMembershipProperties(clusterProperties);
conf.setTransportHost("127.0.0.1");
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList("127.0.0.1"));
s[0] = new Server(conf);
}
{
Configuration conf = TestUtil.aBasicConfiguration(node2Folder);
Map<String,Object> clusterProperties = new HashMap<>();
conf.setClusterMembershipProperties(clusterProperties);
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList("127.0.0.1"));
conf.setTransportHost("127.0.0.2");
s[1] = new Server(conf);
}
{
Configuration conf = TestUtil.aBasicConfiguration(node3Folder);
Map<String,Object> clusterProperties = new HashMap<>();
conf.setClusterMembershipProperties(clusterProperties);
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList("127.0.0.1"));
conf.setTransportHost("127.0.0.3");
s[2] = new Server(conf);
}
for (Server server : s){
server.init();
}
Thread.sleep(11000);
Assert.assertEquals(2 , s[2].getClusterMembership().getLiveMembers().size());
Assert.assertEquals("127.0.0.1", s[2].getClusterMembership().getLiveMembers().get(0).getHost());

Winning! Now nodes can discover each other! The next question is what message types should be sent over gossip and which should be sent via some other method. I decided to keep the gossip code for only cluster membership..

The first thing I decided to tackle was meta-data operations. This makes sense because without tables or column families, we can nor write or read from tables or column families. Duh! The approach I took was:

if a node gets a meta-data request then re-broadcasts it out to all live hosts.

This is not a perfect solution because we need something to re-broadcast messages to down hosts (and ordering could be an issue down the line) but for now it gets the point across. We can potentially use something like hinted-handoff to deal with this. The piece that is handling message routing is known as the coordinator.

  public Response handle(Message message) {
if (SYSTEM_KEYSPACE.equals(message.getKeyspace())) {
return metaDataCoordinator.handleSystemMessage(message);
}
//other stuff

What we are doing here is: if the message does not have "reroute" in its payload then broadcast it to all. Otherwise we handle it locally. (By broadcast I mean send N unicast messages not a tcp broadcast or multicast)

  public Response handleSystemMessage(final Message message){
if (MetaPersonality.CREATE_OR_UPDATE_KEYSPACE.equals(message.getPayload().get("type"))){
metaDataManager.createOrUpdateKeyspace((String)message.getPayload().get("keyspace"),
(Map<String,Object>) message.getPayload().get("properties"));
if (!message.getPayload().containsKey("reroute")){
message.getPayload().put("reroute", "");
List<Callable<Void>> calls = new ArrayList<>();
for (ClusterMember clusterMember : clusterMembership.getLiveMembers()){
final MetaDataClient c = clientForClusterMember(clusterMember);
Callable<Void> call = new Callable<Void>(){
public Void call() throws Exception {
c.createOrUpdateKeyspace(
(String)message.getPayload().get("keyspace"),
(Map<String,Object>) message.getPayload().get("properties")
);

return null;
}};
calls.add(call);
}
try {
List<Future<Void>> res = metaExecutor.invokeAll(calls, 10, TimeUnit.SECONDS);
//todo return results to client
} catch (InterruptedException e) {

}
}
return new Response();
} else ...

Obviously this implementation needs better error handling and potentially longer timeouts. We test this by taking our last test and sending out a meta-data request. Within a few milliseconds the other nodes should have received and processed that message. In other words create a keyspace and column family then check each not to ensure this is complete on all hosts.

    MetaDataClient c = new MetaDataClient("127.0.0.1", s[1].getConfiguration().getTransportPort());
c.createOrUpdateKeyspace("abc", new HashMap<String,Object>());

Thread.sleep(1000);
for (Server server : s){
Assert.assertNotNull(server.getKeyspaces().get("abc"));
}
Map<String,Object> stuff = new HashMap<String,Object>();
stuff.put(ColumnFamilyMetaData.IMPLEMENTING_CLASS, DefaultColumnFamily.class.getName());
c.createOrUpdateColumnFamily("abc", "def", stuff);
Thread.sleep(1000);
for (Server server : s){
Assert.assertNotNull(server.getKeyspaces().get("abc").getColumnFamilies().get("def"));
}
for (Server server : s){
server.shutdown();
}

Great. Now that we have cluster membership we can start tackling the really - really - really fun stuff like request routing and quorum reads etc! Stay tuned!


Saturday Jan 03, 2015

Impala vs Hive - un Buzzifying big data

Everyone loves fast queries, no denying that. Everyone loves charts that say things are fast. I had some time to sit down and play with Impala for the past couple weeks and wanted to give my initial impressions on the debate from the seat of someone who uses Hive, MySql, and !Vertica! in my daily life.

Pre-blog rando notes

I had just installed CDH 5.3 in our 3 node staging sandbox. Believe it or not, I spend most of my time and very modest sized hardware clusters, 3 nodes or 10 nodes. These days I have read some vendor documentation that "suggests" the "standard" Hadoop now the i2.8xlarge, the computer that is $2476.16 monthly and $3.3920 hourly, with 200+ GB of RAM (on one year contract). Well guys like me without deep pockets routinely use m1.xlarge which is $163.52 monthly and $0.2240 hourly.

For hadoop, I sometimes go with m1.xlarge, so I can get 4x 400 GB disks without paying for crazy enterprise SSDs or 250GB RAM. I guess this makes me the oddball. This could lead me into an amazon rant, about lack of machines with nice local storage options but I will not go there.

Impala round 1: Kick the tires, they kick back

So anyway impala. I click a few buttons in Cloudera Manager and it is installed. Great. That was easy button. I go on a hadoop node and run impala-shell.

My old tables are there. New tables I am making do not appear. I spend some time and my co-worker finds a command  "invalidate metadata" which I run every so often. (maybe there is some way I would not need to do this, but not interested in searching ATM)

I query our main table.

impala > "select * from weblogs limit 4"
impala "Sorry I dont know what com.huffingtonpost.Serde is"

Ok fair. I expected this. I know impala is written in c/c++ and it is not going to be able to understand every format. Especially those just ones written in java (although it can bridge java UDFS). But this is an important thing you know, the BUZZ says "use X its fast and awesome!" but unless you are starting fresh compatibility and upgrading is something that you have to take seriously. Once you have a large infrastructure built around X you can not flap around and react to every new architecture or piece of software out there. You want to be getting things done, not rebuilding your stack everyday.

Impala Round 2: Best laid plans

So ok I get it. I cant expect impala to understand our proprietary serde/input format. The good news is that Impala and Hive are not enemies. At best they are frenemies :).

Now the great thing about hive is I can transmute things between different formats, json, and xml, and parquet, and orc, and text. So this query command will just make a plan text table.

hive > create table impala_test1 as select * from weblogs
hive [doing stuff]
hive [doing stuff]
done
impala > invalidate metadata
impala > select * from weblogs"
impala "Sorry I dont like list<string>"


So what happened here is I converted to text files, but Impala does not understand Hive's complex types yet...Damn... Well I see this on the impala roadmap, but if you are keeping score in the meaningless x vs y debate, Impala is only a subset of hive's data formats and is only a subset of it's features. Hives collection types are very powerful and missing them hurts. It would be better if impala just ignored these fields instead of blowing up and refusing to query the entire table.

Ok so lets deal with this.

Impala Round 3: I am willing to accept less than the table I had before

Lets do a query and strip out all the collection columns. Again it would be better if I did not have to have N copies of data in different formats here, but for the purposed of kicking the tires I will just make a duplicate table in a different format without the columns impala does not understand.

hive > create table impala2 as select col1,col2 from weblogs
impala> select * from impala2 limit 4
impala> result1 , result2, result3

Great success! So now I can run queries and I was really impressed with performance of some operations.

Impala round 4: Ok lets get real

We have to do a lot of top-n queries. Top n is a big PITA, sspecially over arbitrary windows because you can not easily pre-compute. We also have a lot of active entries (millions).

We went through a process and moved over some summary tables that looked like this:

entryid, viralcount, seedcount, othercount, otherstuff2count
3434252, 4, 2, 4,7
3412132, 10, 3, 4, 2

In a nutshell this is a PITA query we run:

select entryid, sum(viralcount) as x, sum(seedcount) from [data] where ( join conditions ) group by entryid, order by x

impala > [above query] where [one day time range]
impala > fast, excitingly fast, results

Great lets try the actual query on full data set. (11Gb data)

impala > [above query] where [30 day time range]
impala > out of memory

Impala Round 5: Superstitious tuning.

So when things run out of memory I add memory. The setting was 256MB. I tried 1GB. I queried again. It failed again.

I set the memory to no limit, and it worked.
I added the tables to hdfs cache.
Well that is good it worked and it was fast really fast.

Unfortunately, I don't love what this means for the "yarn" world because if impala is sucking up as much memory as it feels, I am not sure how happy everything else on my hadoop cluster is going to be.Maybe there was some magic setting like 4GB would have worked but I don't love having to constantly review memory thresholds of software so I set to unlimited.

It was also a bit frustrating to me because the table was only 11GB and the intermediate results of that query are actually very small (one row per article).

Who knows how it works and why? Whatevers.

Impala Round 6: Bring out tez

So as you know impala is a cloudera "thing" and tez is a Hortonworks "thing". One of the reasons I upgraded was that Tez is included in hive 13.1 which should be part of CDH 5.3.

Now apparently tez is supposed to deliver some serious hive performance gains. I went looking for a chart of tez vs impala for you. Instead I found a chart about how IBMS BIG SQL beats them both See what I said about VS debates? If all your care about is charts you always get beat by someone.

Anyway...

hive > set hive.exeecution.engine = tez
 class not found tez/whatever

Arge. You gotta be crapping me. Today was a frustrating day. Apparently Tez is missing some jars  in CDH in typical accidentally,potential not accidentally/[I dont care] fashion. Likely fallout from Next hadoop enterprise pissing match.

This really grinds my gears...

Final thoughts

If you thought I was going to sit here and tell you how great X is and how bad Y is and show you a chart validating my opinions. Well I did not do that.

I see a lot of potential for impala, but it is not close to the features of hive. Hive and impala actually work well together. Hive just simply is more plugable and versatile. Hive is not as fast on the read side, but for your level 1 data ingestion and aggregation building Hive is still the bomb. Once you have sliced and diced with the Hive "Hammer" impala makes a very nice "Scalpel". It is really nice that Hive and Impala share the same metastore, and can read and write each others data (parquet) as well.




                    

Friday Jan 02, 2015

Bloom Filters - Building a Column Family store - Part 7

The last blog in this series we built a Commitlog for our quickly evolving NoSql/Column Family store nibiru. I decided this morning that I had been dodging BloomFilters for a while and it was time to get to it.

You may ask what are Bloom Filters and why might a Column Family store need/want them? If you read my post on SSTables and Compaction, you can see that we are implementing a Structured Log Merge system. Namely, we are not doing updates in places, and instead we periodically merging files. If you imagine data in these files over time, you can picture a scenario where there are 30 or more SSTables on disk. Now, imagine that only 1 of those 30 files has the rowkey being searched for. To find that rowkey we would have to search all 30 of the SStables. 

A Bloom Filter (described here in detail) could potentially help us avoid searching tables. This is because a Bloom Filter can give two answers 'Absolutely NO' or 'Maybe'. (Answers I used to get when I was asking someone out a date!)

'Absolutely No' in this case means we do not have to bother searching, because we are sure the rowkey is not in this table.

'Maybe' means we do have to search and maybe the row key is in the table or maybe it is not. (We would have searched it anyway without the Bloom Filter so no big loss)

Because of these properties, Bloom filters make a very nice option for a negative-cache. (For those interested the pull request with the majority of the feature is here  I did make other subtle changes along the way). So let's have at it and implement this bad boy.

Writing and Persisting Bloom Filters

Remember that our only writes are Memtable, Flushing, and SSTables. Thus we can simply hook into the SSTableWriting process and attach in a bloom filter writer.

public class SsTableStreamWriter {

...

private BloomFilterWriter bloomFilter;

public SsTableStreamWriter(String id, ColumnFamily columnFamily){
...
indexWriter = new IndexWriter(id, columnFamily.getKeyspace().getConfiguration());
bloomFilter = new BloomFilterWriter(id, columnFamily.getKeyspace().getConfiguration());
}

public void write(Token t, Map<String,Val> columns) throws IOException {
long startOfRecord = ssOutputStream.getWrittenOffset();
bloomFilter.put(t);

public void close() throws IOException {
indexWriter.close();
ssOutputStream.close();
bloomFilter.writeAndClose();
}

I did not go out and write a Bloom Filter from scratch I decided to use guava. But I did wrap it in facade to make it look like I did something.

package io.teknek.nibiru.engine;

public class BloomFilterWriter {

private final BloomFilter<Token> bloomFilter;
private static final int expectedInsertions = 50000; //Be smarter here
private final String id;
private final Configuration configuration;

public BloomFilterWriter(String id, Configuration configuration){
this.id = id;
this.configuration = configuration;
bloomFilter = BloomFilter.create(TokenFunnel.INSTANCE, expectedInsertions);
}

public enum TokenFunnel implements Funnel<Token> {
INSTANCE;
public void funnel(Token person, PrimitiveSink into) {
into.putUnencodedChars(person.getToken()).putUnencodedChars(person.getRowkey());
}
}

public void put(Token t){
bloomFilter.put(t);//see I did something
}

public static File getFileForId(String id, Configuration configuration){
return new File(
configuration.getSstableDirectory(), id + ".bf");
}

public void writeAndClose() throws IOException {
BufferedOutputStream bo = new BufferedOutputStream(new FileOutputStream(getFileForId(id,configuration)));
bloomFilter.writeTo(bo);
bo.close();
}
}

You do have to size the filters correctly otherwise they are not efficient. A better thing to do here would be estimate on the SSTables being flushed or compacted. I just cheated and put it at 50000 for the time being.

Reading Bloomfilters

When we open up an SSTable we also open the associated Bloom Filter.

public class SsTable implements Comparable<SsTable>{

private KeyCache keyCache;
private ColumnFamily columnFamily;
private SsTableReader ssTableReader;
private BloomFilter bloomFilterReader;

public SsTable(ColumnFamily columnFamily){
this.columnFamily = columnFamily;
}

public void open(String id, Configuration conf) throws IOException {
bloomFilterReader = new BloomFilter();
bloomFilterReader.open(id, conf);

keyCache = new KeyCache(columnFamily.getColumnFamilyMetadata().getKeyCachePerSsTable());
ssTableReader = new SsTableReader(this, keyCache, bloomFilterReader);
ssTableReader.open(id);
}

Now here is the payday! Any operation that does a read we can ask the bloom filter for help, and skip the entire rest of the read path if the bloom filter says no!

  public Val get(Token searchToken, String column) throws IOException {
boolean mightContain = bloomFilter.mightContain(searchToken);
if (!mightContain) {
return null;
}

BufferGroup bgIndex = new BufferGroup();

Performance testing this is simple. First, we can just write a test that searches for a rowkey not in the SStable. This was a run I did without the Bloom Filter code.

  {
long x = System.currentTimeMillis();
for (int i = 0 ; i < 50000 ; i++) {
Assert.assertEquals(null, s.get(ks1.getKeyspaceMetadata().getPartitioner().partition("wontfindthis"), "column2"));
}
System.out.println("non existing key " +(System.currentTimeMillis() - x));
}

result >> non existing key 11694

(I suspect the code is not opting out early when it reads a row key > search, but that is another performance tweak). Next we put the code into action.

result >> index match 891
result >> far from index 186
result >> index match 159
result >> far from index 139
result >>non existing key 26

Wow! Maximum performance!

There you have it. BloomFilters, great for looking for data that is not there :) Sounds stupid but the case happens more often then you might first think!



Wednesday Dec 31, 2014

Commitlog - Building a Column Family store - Part 6

In previous entries we have built Memtables and SSTables. For a quick recap, Memtables allow us to make the write path fast by writing to memory and periodically flushing to on disk SSTables. Fast is great but durability is important as well. Imagine if a memtable contains 1000 writes and delete tombstones and then the server process is killed. These entries are only in memory and would be lost.

The answer for this (in a single node scenario) is a Commit log. The Commitlog is a flat log of all mutation (write/delete) operations.

If the server were to crash we can replay the commitlog and recover those operations. In fact, the only time we need to use the commitlog files are on start up. Once a Memtable is safely flushed to an SStable the Commitlog can be deleted.

Because a Commitlog is only written sequentially it does not de-duplicate like a Memtable. We do not need to memory map or index the Commitlog because we will only ever read them sequentially.

Let's talk about implementing a Commitlog.


Server -> Memtable -> SStable
-> Commitlog

I will stub out the important methods in the Commitlog class.

public class CommitLog {

private final ColumnFamily columnFamily;
private CountingBufferedOutputStream ssOutputStream;

public CommitLog(ColumnFamily cf);

//Open a commit log before calling write
public void open() throws FileNotFoundException;

//write a value to the commit log
public synchronized void write(Token rowkey, String column, String value, long stamp, long ttl) ;

//delete commit log from disk it is no longer needed
public void delete() throws IOException;

//close the log for writing but do not delete it
public void close() throws IOException;
}

After pondering the design I determined I would make the Commitlog a member of the Memtable. The relationship does not need to be a has-a relation like this, but it makes it fairly easy to track the commit logs and later delete them only after we are sure the memtable is flushed.


public DefaultColumnFamily(Keyspace keyspace, ColumnFamilyMetadata cfmd){
super(keyspace,cfmd);
CommitLog commitLog = new CommitLog(this);
try {
commitLog.open();
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
memtable = new AtomicReference<Memtable>(new Memtable(this, commitLog));
memtableFlusher = new MemtableFlusher(this);
memtableFlusher.start();
}

Next, we take any mutation and make sure it is also applied to the commitlog as well as the memtable.

  public void put(String rowkey, String column, String value, long time, long ttl){
try {
memtable.get().getCommitLog().write(keyspace.getKeyspaceMetadata().getPartitioner().partition(rowkey), column, value, time, ttl);
} catch (IOException e) {
throw new RuntimeException (e);
}
memtable.get().put(keyspace.getKeyspaceMetadata().getPartitioner().partition(rowkey), column, value, time, ttl);
considerFlush();
}

Great! Next there are two cases to deal with:

Case 1: The memtable is flushed to an sstable and we can safely delete the associated commitlog.

Because of our decision to attach the commit log to the sstable this was a trivial change to the MemtableFlusher.

      for (Memtable memtable : memtables){
SSTableWriter ssTableWriter = new SSTableWriter();
try {
//TODO: a timeuuid would be better here
String tableId = String.valueOf(System.nanoTime());
ssTableWriter.flushToDisk(tableId, columnFamily.getKeyspace().getConfiguration(), memtable);
SsTable table = new SsTable(columnFamily);
table.open(tableId, columnFamily.getKeyspace().getConfiguration());
columnFamily.getSstable().add(table);
memtables.remove(memtable);
memtable.getCommitLog().delete(); //Delete the commit log because we know the data is now durable
flushes.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

Case 2: On start up replay any commit logs, by reading them and applying the mutations to a memtable.

Start up is a fairly simple process. Open and load up all SStables, replay all commit logs, and done.


public void init() throws IOException {
for(File ssTable: keyspace.getConfiguration().getSstableDirectory().listFiles()){
String [] parts = ssTable.getName().split("\\.");
if (parts.length == 2){
if ("ss".equalsIgnoreCase(parts[1])){
String id = parts[0];
SsTable toOpen = new SsTable(this);
toOpen.open(id, keyspace.getConfiguration());
sstable.add(toOpen);
}
}
}

for(File commitlog: CommitLog.getCommitLogDirectoryForColumnFamily(this).listFiles()){
String [] parts = commitlog.getName().split("\\.");
if (parts.length == 2){
if (CommitLog.EXTENSION.equalsIgnoreCase(parts[1])){
processCommitLog(parts[0]);
}
}
}
}

For the actual format of the commitlog I used the same format as the SsTable. This made it simple to borrow the code used by compaction that reads the SStable linearly.


void processCommitLog(String id) throws IOException {
CommitLogReader r = new CommitLogReader(id, this);
r.open();
Token t;
while ((t = r.getNextToken()) != null){
SortedMap<String,Val> x = r.readColumns();
for (Map.Entry<String,Val> col: x.entrySet()){
memtable.get().put(t, col.getKey(), col.getValue().getValue(), col.getValue().getTime(), col.getValue().getTtl());
}
}
}

Cool so how do we test this beast? Well how about this?

  1. Start up a server
  2. Set the Memtable flush at 2 rowkeys.
  3. Insert 3 records.
  4. Shutdown the server.

This should leave a commitlog with one row unflushed. Next:

  1. Startup the server (which will read in commit logs)
  2. Read the unflushed row and assert it exists

@Test
public void commitLogTests() throws IOException, InterruptedException {
String ks = "data";
String cf = "pets";
Configuration configuration = aBasicConfiguration(testFolder);
Server s = new Server(configuration);
s.init(); //1
s.createKeyspace(ks);
s.createColumnFamily(ks, cf);
s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setFlushNumberOfRowKeys(2); //2
s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setCommitlogFlushBytes(1);
for (int i = 0; i < 3; i++) {
s.put(ks, cf, i+"", "age", "4", 1);//3
Thread.sleep(1);
}
Val x = s.get(ks, cf, "0", "age");
Assert.assertEquals("4", x.getValue());
{
Val y = s.get(ks, cf, "2", "age");
Assert.assertEquals("4", y.getValue());
}
Thread.sleep(1000);
s.shutdown();//4
Thread.sleep(1000);
{
Server j = new Server(configuration);
j.init();//1
Assert.assertNotNull(j.getKeyspaces().get(ks).getColumnFamilies().get(cf));
Val y = j.get(ks, cf, "2", "age");
Assert.assertEquals("4", y.getValue());//2
j.shutdown();
}
}

Some notes about commitlog's and durability: Disk file systems have synchronous and asynchronous operations. Virtualization is particularly hard to enforce durability because the disk systems are abstracted and even operations like flush, or sync could be virtualized. We went with the option of doing a synchronized write to a buffered stream and periodic flushing in this example.  In a multi-node scenario there is added durability as write operations could be sent to multiple nodes. This solution is "probably ok". Durability is always a fun problem and there are other ways to construct this commitlog for sure.

Awesomesause! We now have a commitlog, memtables, sstables, keycache, index files, and compaction. We have a very very functional Column Family store implementation! All that is left is bloom filters and compaction removing tombstones.

Nibiru is getting very fun at this point. It may be turning from example code in blog to the next NoSQL. You may want to fork it and play.

Monday Dec 22, 2014

SSTables and Compaction - Building a column family store - Part 5

In the last blog we introduced a threshold based system that flushed Memtables to SSTables. In a Structured Log Merge storage system having more SSTables will make the read path slower. Also the system needs a way to merge the writes and updates so we do not run out of disk space.

Compaction is a process that merges multiple SSTables together. Compaction removes older columns and can delete entire rows. It also optimizes the data by putting all the columns of the row close together on disk.

Imagine if each of the three columns here was an SSTable.

 
 rowkey column value
 myrow a 5
 rowkey
 column value
 myrow a 2
 myrow b b
 myrow c c
 
 rowkey column value
 row1 a b
 row1 b c
   
 
 

After a compaction the resulting SSTable would look like this:

 rowkey column value
 myrow a 2
 myrow b b
 myrow c c
 row1 a b
 row1 b c

Because the SSTables are sorted by rowkey and secondly column, merging multiple together is straight forward. We iterate each SStable at the same time and merge columns into a new table.

Note: If a row has a large number of columns, merging in memory would likely cause an out-of-memory error. We would need a strategy to detect when the merge size gets too large and then start doing on disk merging. For now we will pretend like this is not an issue.

I assumed I was going to struggle with this algorithm a bit more than I did. Either I am getting good or, more than likely, there are some glaring holes in the implementation I do not see yet.

The approach I took was to create a method that can merge multiple SSTables into a single new one. I re-factored the SSTableWriter to write SSTables incrementally.

Then the algorithm I came up with was:

  1. Read a token from each sstable being compacted
  2. Find the lowest token
  3. Merge the columns of only the lowest token
  4. Advance the readers that only had the lowest token
public static SsTable compact(SsTable [] ssTables, String newName) throws IOException {
ColumnFamily columnFamily = ssTables[0].getColumnFamily();
SsTableStreamReader[] readers = new SsTableStreamReader[ssTables.length];
SsTableStreamWriter newSsTable = new SsTableStreamWriter(newName,
columnFamily.getKeyspace().getConfiguration());

newSsTable.open();
Token[] currentTokens = new Token[ssTables.length];
for (int i = 0; i < ssTables.length; i++) {
readers[i] = ssTables[i].getStreamReader();
}
for (int i = 0; i < currentTokens.length; i++) {
currentTokens[i] = readers[i].getNextToken();//1

}
while (!allNull(currentTokens)){
Token lowestToken = lowestToken(currentTokens);
SortedMap<String,Val> allColumns = new TreeMap<>();
for (int i = 0; i < currentTokens.length; i++) {
if (currentTokens[i] != null && currentTokens[i].equals(lowestToken)) {
SortedMap<String, Val> columns = readers[i].readColumns();
merge(allColumns, columns);//3
}
}
newSsTable.write(lowestToken, allColumns);
advance(lowestToken, readers, currentTokens);//4
}
newSsTable.close();
SsTable s = new SsTable(columnFamily);
s.open(newName, columnFamily.getKeyspace().getConfiguration());
return s;
}

I always try to write functional code. By functional I do not mean super sexy, complicated scala, and function pointers, I mean methods that to not modify and shared state. When you write code like this it makes testing easier. I wrote a quick test that would merge two SSTables:

  @Test
public void test() throws IOException{
...
Keyspace ks1 = MemtableTest.keyspaceWithNaturalPartitioner();
SsTable s = new SsTable(cf);
{
Memtable m = new Memtable(cf);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2", "c", 1, 0L);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2", "d", 2, 0L);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column3", "e", 2, 0L);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row2"), "column1", "e", 2, 0L);
SSTableWriter w = new SSTableWriter();
w.flushToDisk("1", configuration, m);
s.open("1", configuration);
}
SsTable s2 = new SsTable(cf);
{
Memtable m2 = new Memtable(cf);
m2.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column1", "c", 1, 0L);
m2.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row2"), "column1", "f", 3, 0L);
SSTableWriter w2 = new SSTableWriter();
w2.flushToDisk("2", configuration, m2);
s2.open("2", configuration);
}
CompactionManager.compact(new SsTable[] { s, s2 }, "3");
SsTable ss = new SsTable(cf);
ss.open("3", configuration);
Assert.assertEquals("e", ss.get("row1", "column3").getValue());
}

Great! Now we can merge multiple SSTables into a single one, but have to build the state machine right. In the last blog we flushed memtables to disk and reloaded them up before removing the memtable. We need to do something similar here.

What we want to do is detect a trigger for compaction. In this case if we have more then 4 SSTables we want to compact them into a single table. To do this we have a background thread iterate all keyspaces. In each keyspace we iterate all the ColumnFamily instances. In each column family if there are more then 4 SSTables compact them.

@Override
public void run() {
while (true){
for (Entry<String, Keyspace> keyspaces : server.getKeyspaces().entrySet()){
Keyspace keyspace = keyspaces.getValue();
for (Map.Entry<String,ColumnFamily> columnFamilies : keyspace.getColumnFamilies().entrySet()){
Set<SsTable> tables = columnFamilies.getValue().getSstable();
if (tables.size() >= 4){
//...do compaction here

 

Like the memtable flushing, we want to make sure we insert the new SSTable first and later remove the older SSTables in a concurrent way. Again, we can leverage the fact that having the data in multiple files will not change the end result.

          Set<SsTable> tables = columnFamilies.getValue().getSstable();
if (tables.size() >= 4){
SsTable [] ssArray = tables.toArray(new SsTable[] {});
try {
String newName = getNewSsTableName();
SsTable s = compact(ssArray, newName);
tables.add(s);
for (SsTable table : ssArray){
tables.remove(table);
}
numberOfCompactions.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

To show the entire process in action I cooked up this end-to-end test. What we are doing here:

  1. Set the memtable to flush at 2 rows
  2. Insert 9 rows (this should cause 4 flushes)
  3. Assert the flushes are as expected
  4. Assert the compaction thread had 1 event
  5. Check that all the rows still exist
  @Test
public void compactionTest() throws IOException, InterruptedException{
...
Configuration configuration = new Configuration();
configuration.setSstableDirectory(tempFolder);
Server s = new Server();
s.setConfiguration(configuration);
s.init();
s.createKeyspace(ks);
s.createColumnFamily(ks, cf);
s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setFlushNumberOfRowKeys(2);//1
for (int i = 0; i < 9; i++) {
s.put(ks, cf, i+"", "age", "4", 1);//2
Thread.sleep(1);
}
Val x = s.get(ks, cf, "8", "age");
Thread.sleep(1000);
Assert.assertEquals(4, s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getMemtableFlusher().getFlushCount());//3
Assert.assertEquals(1, s.getCompactionManager().getNumberOfCompactions());//4
Assert.assertEquals("4", x.getValue());
for (int i = 0; i < 9; i++) {
Val y = s.get(ks, cf, i+"", "age");
Assert.assertEquals("4", y.getValue());//5
}
}

Sweet! Now we have a system with Memtables will flush to SSTables. We also have a background thread optimizing the SSTables on disk by compaction! We are getting really really close to have a fully functional single node ColumnFamily store. We only need two more critical components, Bloom Filters and CommitLogs. (There are some little things we need to do like have compaction remove deleted rows which are easier once we have bloom filters). Awesome sauce!

Sunday Dec 14, 2014

Memtable, Flushing, and SSTables - Building a column family store - Part 4

Up until this point the past blogs memtables, sstables, Key cache have been built and tested as discrete components. To build a Column Family store they need to work in concert. This means we need to talk about the write path.

Our SSTables and associated IndexFiles are write once. This has some nice characteristics. First, the data store does not do any random write. This is great for rotational disks and for SSD disks. We want our write path to look like this:

set-request -> server -> memtable 

Eventually the Memtable will grow, and if it grows unbounded the system will run out of memory.  We want to flush the Memtable to disk as SStable before we have a catastrophic OutOfMemory error, but we also want the Memtable to be as large as possible so we do less small writes. (and require less compaction to merge these tables).

It would just be nice to do something like:
memoryInUse += size_of(column)


Unfortunately that is not an easy task. In the Java world counting how much memory structures use is a hard problem. There are several ways to go about counting used memory but the approach I am going to take is avoiding the hard problem and solving an easier one:

Flush when the number of row keys is > X.

In the end we likely want to build numerous criteria:

  1. Flush if memtable is very old
  2. Flush if memtable > certain size
  3. Flush if memtabe hash had more then X operations

All of these things should be easy to add incrementally.

public class ColumnFamilyMetadata {
private String name;
private long tombstoneGraceMillis;
private int flushNumberOfRowKeys = 10000;

Next shell out a method for flushing

public class ColumnFamily {
void considerFlush(){
// flush memtable to sstable
}
}

Any method that modifies or creates data should consider flushing

  public void delete(String rowkey, String column, long time){
memtable.get().delete(keyspace.getKeyspaceMetadata().getPartitioner().partition(rowkey), column, time);
considerFlush();
}
Flushing a memory table to disk turns out to be a tricky concurrency problem. We do not want to block writes. Even thought writing to disk linearly is fast, it is not as fast as writing to the memtable. We do not want to be forced to acquire a synchronization lock every operation to check if a flush is in progress. After the sstable is flushed we can not clear the memtable until the sstable loaded back up. Otherwise data could temporarily vanish during the period between the time the sstable is written but before it is loaded back into memory.

Here is my solution. Maybe aphyr will, call me maybe, out on it, but for a 0 follower github project this is fine :). The good news is that data here is idempotent, thus having the data in a memtable and an sstable at the same will not produce an incorrect result.

First, we create MemtableFlusher a Runnable per column family to flush memtable instances.

public class MemtableFlusher implements Runnable {
private ConcurrentSkipListSet<Memtable> memtables = new ConcurrentSkipListSet<>();

This piece is tightly coupled with the ColumnFamily instance. Our read operations need to be able to access the current memtable, all the memtables pending flush to disk, and the already flushed sstables.

public class ColumnFamily {

private AtomicReference<Memtable> memtable;
private MemtableFlusher memtableFlusher;
private Set<SSTable> sstable = new ConcurrentSkipListSet<>();


Here is the implementation of the considerFlush() method we talked about above. Namely, if size of memtable is larger than the threshold, atomically move the current memtable onto the flush queue and put a new memtable in it's place.

 void considerFlush(){
Memtable now = memtable.get();
if (columnFamilyMetadata.getFlushNumberOfRowKeys() != 0
&& now.size() > columnFamilyMetadata.getFlushNumberOfRowKeys()){
Memtable aNewTable = new Memtable(this);
boolean success = memtableFlusher.add(now);
if (success){
boolean swap = memtable.compareAndSet(now, aNewTable);
if (!swap){
throw new RuntimeException("race detected");
}
}
}
}

Now that the memtable is placed onto the flush queue. The MemtableFlusher should:

  1. flush memtable to sstable
  2. load the sstable (memory map it)
  3. remove any reference to the memtable
  @Override
public void run() {
while (true){
for (Memtable memtable : memtables){
SSTableWriter ssTableWriter = new SSTableWriter();
try {
//TODO: a timeuuid would be better here
String tableId = String.valueOf(System.nanoTime());
ssTableWriter.flushToDisk(tableId, columnFamily.getKeyspace().getConfiguration(), memtable); //1
SSTable table = new SSTable();
table.open(tableId, columnFamily.getKeyspace().getConfiguration());
columnFamily.getSstable().add(table); //2
memtables.remove(memtable); //3
flushes.incrementAndGet();
} catch (IOException e) {
//TODO: catch this and terminate server?
throw new RuntimeException(e);
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) { }
}
}

I have written a small test. Obviously this will not find concurrency, bugs but at least it certifies that the complements are working together as designed in a simple case. Here we set the flush size to 2 and insert 3 row keys. The third row key inserted should cause a flush.

  @Test
  public void aTest() throws IOException, InterruptedException{
    File tempFolder = testFolder.newFolder("sstable");
    Configuration configuration = new Configuration();
    configuration.setSstableDirectory(tempFolder);
    Server s = new Server();
    s.setConfiguration(configuration);
    s.createKeyspace(ks);
    s.createColumnFamily(ks, cf);
    s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setFlushNumberOfRowKeys(2);
    s.put(ks, cf, "jack", "name", "bunnyjack", 1);
    s.put(ks, cf, "jack", "age", "6", 1);
    Val x = s.get(ks, cf, "jack", "age");
    Assert.assertEquals("6", x.getValue());
    s.put(ks, cf, "ziggy", "name", "ziggyrabbit", 1);
    s.put(ks, cf, "ziggy", "age", "8", 1);
    s.put(ks, cf, "dotty", "age", "4", 1);
    Thread.sleep(2000);
    Assert.assertEquals(1, s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getMemtableFlusher().getFlushCount());
    x = s.get(ks, cf, "jack", "age");
    Assert.assertEquals("6", x.getValue());
  } 

We are getting closer and closer to being fully functional. There are two big pieces left: Currently we are writing files to disk, but to run continuously we will need to merge these tables together. This process is known as compaction. After that a commit log will be needed to add durability.

This is getting exciting!

Wednesday Dec 10, 2014

Key Cache - Building a column family store - Part 3

In my last entry we discussed how to build a data structure similar to that of Cassandra's Index File. If you remember the Index Interval setting builds a structure that indexes every N keys. In a best case scenario a look up may hit this index, but in a worse case scenario our linear search may have had to page through (Index Interval - 1) rows to locate data.

In one of my unit tests I captured the effect that this has:

    {
      long x = System.currentTimeMillis();
      for (int i = 0 ; i < 50000 ; i++) {
        Assert.assertEquals("c", s.get("00001", "column2").getValue());
      }
      System.out.println("index match " + (System.currentTimeMillis() - x));
    }
    {
      long x = System.currentTimeMillis();
      for (int i = 0 ; i < 50000 ; i++) {
        Assert.assertEquals("c", s.get("08999", "column2").getValue());
      }
      System.out.println("far from index " +(System.currentTimeMillis() - x));
    }
We can see the performance difference between a perfect index hit to a key far from the index:
Test folder: /tmp/junit4018208685678427527
index match 1088
far from index 11166

Note: Our SStable format and decision to use String (and not ByteBuffer) means there are several micro-optimizations that can be done.

If the requests have a normal distribution the difference in access speed may not be noticeable. Still, it seems unfair that some key requests will always be faster then others.

Key Cache to the rescue!

The Key Cache is like the Index Interval in that it helps avoid seeking for keys. It works by storing the offset of frequently used keys in memory. The less random the key access is the more benefit this type of caching delivers. Also like the Index format we can store a large cache in a small amount of storage.

Let's build our own Key Cache and add it to our increasingly more sophisticated Column Family store. There is nothing spectacular being done here. We are wrapping a guava cache that will handle eviction with a small facade.

package io.teknek.nibiru.engine;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

public class KeyCache {
private Cache<String,Long> lc;
public KeyCache(int cacheSize){
lc = CacheBuilder.newBuilder().maximumSize(cacheSize).recordStats().build();
}

public void put(String key, long value){
lc.put(key, value);
}

public long get(String key) {
long res = -1;
Long l = lc.getIfPresent(key);
if (l != null) {
res = l.longValue();
}
return res;
}
}

Note: We would be better off using something like this library that does not have to box and un-box primitives, but again our goal is to show relative performance speed up from the Key Cache.

Now we only need to make a few small changes to our sstable reader.

-    bg.setStartOffset((int)index.findStartOffset(row));
+ long startOffset = keyCache.get(row);
+ if (startOffset == -1){
+ startOffset = index.findStartOffset(row);
+ }
+ bg.setStartOffset((int)startOffset);
String searchToken = row;//this is not correct
do {
if (bg.dst[bg.currentIndex] == END_ROW){
bg.advanceIndex();
}
+ long startOfRow = bg.mbb.position() - bg.getBlockSize() + bg.currentIndex;
readHeader(bg);
StringBuilder token = readToken(bg);
if (token.toString().equals(searchToken)){
StringBuilder rowkey = readRowkey(bg);
if (rowkey.toString().equals(row)){
+ keyCache.put(row, startOfRow);

We can see the difference if we re run our tests:

Test folder: /tmp/junit6910931010171014079
index match 840
far from index 171
index match 219
far from index 137

Notice here that I ran the tests multiple times. This is because the Java virtual machine is much like an old car. You do not drive it fast right away; you let it sit in your driveway and warm up. :) Just kidding, the reality is with Just In Time compilation and multiple garbage collection threads timing code exactly can be difficult.

Even though the numbers fluctuate, we see the Key Cache has given us much better performance by helping us get data without having to seek on disk (memory mapped disk) as much. We went from 11 seconds for 50,000 look ups to around 200ms!

Awesome! What is next? Bloom filters, or Row Cache? Maybe start building a system to compact multiple SSTables together? Stay tuned.


Sunday Dec 07, 2014

Index Files - Building a column store - Part 2

In a previous blog post I demonstrated how to build an SSTable. For reference the sstable data on disk looks like this:

^@00000^A00000^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc
^@00001^A00001^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc
^@00002^A00002^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc

Another way to visualize the data:

Token(rowkey)
rowkey
columns
0000100001
name value time
 column2 c1417966551490
 column3 c1417966551490

Side note: In this case the token and rowkey are the same because we are using a natural partitioner that does not hash the key.

An astute reader of the last blog may have noticed that even though are tables are sorted by token and our get (key, column) operation is searching via token, we were using an inefficient linear search. One way to optimize this type of search is to use a binary search. However this is not the primary way Cassandra optimizes the problem.

Enter IndexInterval

The concept of IndexInterval is that every Nth rowkey we record an index of that row key and the byte offset the row key exists at in the file. In this way we build a relatively small fast lookup table of rowkeys. Even if the row key is not found in the index the database should get close to the rowkey and only have to page at most index-interval-1 keys.

Depending on the use case small rows vs larger rows a larger or smaller index size might be right for you.

Writing our own Index

Since cassandra's data files are write-once this make the process straight forward: As we are flushing our memtable to an sstable we also record the byte offset of every Nth row in a separate file.

I was not aware of any output stream that could count it's position and I was not wanting to track it out of band so I whipped up an output stream that counted as it outputted.

package io.teknek.nibiru.engine;
import java.io.BufferedOutputStream;

public class CountingBufferedOutputStream extends BufferedOutputStream {
  private long writtenOffset;
  public CountingBufferedOutputStream(OutputStream out) {
    super(out);
  }
  public synchronized void writeAndCount(int b) throws IOException {
    super.write(b);
    writtenOffset++;
  }
  public void writeAndCount(byte[] b) throws IOException {
    super.write(b);
    writtenOffset += b.length;
  }
  public long getWrittenOffset() {
    return writtenOffset;
  }
}

I could have buried the index creating logic inside the sstable writing logic, but I decided to use OOD and design a separate class to write index files rather than intermingle the logic of index writing with sstable writing.

public class IndexWriter {

  private final String id;
  private final Configuration conf;
  private BufferedOutputStream indexStream;
  private long rowkeyCount;
 
  public IndexWriter(String id, Configuration conf){
    this.id = id;
    this.conf = conf;
  }
 
  public void open() throws FileNotFoundException {
    File indexFile = new File(conf.getSstableDirectory(), id + ".index");
    indexStream = new CountingBufferedOutputStream(new FileOutputStream(indexFile));
  }
 
  public void handleRow(long startOfRecord, String token) throws IOException {
    if (rowkeyCount++ % conf.getIndexInterval() == 0){
      indexStream.write(SSTable.START_RECORD);
      indexStream.write(token.getBytes());
      indexStream.write(SSTable.END_TOKEN);
      indexStream.write(String.valueOf(startOfRecord).getBytes());
      indexStream.write(SSTable.END_ROW);
    }
  }
 
  public void close () throws IOException {
    indexStream.close();
  }
}

Now we make some minor changes to the sstable writer to also write the index as we write the sstable.

 public void flushToDisk(String id, Configuration conf, Memtable m) throws IOException{
    File sstableFile = new File(conf.getSstableDirectory(), id + ".ss");
    CountingBufferedOutputStream ssOutputStream = null;
    IndexWriter indexWriter = new IndexWriter(id, conf);

    try {
      ssOutputStream = new CountingBufferedOutputStream(new FileOutputStream(sstableFile));
      indexWriter.open();
      for (Entry<Token, ConcurrentSkipListMap<String, Val>> i : m.getData().entrySet()){
        long startOfRecord = ssOutputStream.getWrittenOffset();
        ssOutputStream.writeAndCount(START_RECORD);
        ssOutputStream.writeAndCount(i.getKey().getToken().getBytes());
        ssOutputStream.writeAndCount(END_TOKEN);
        ssOutputStream.writeAndCount(i.getKey().getRowkey().getBytes());
        ssOutputStream.writeAndCount(END_ROWKEY);
        indexWriter.handleRow(startOfRecord, i.getKey().getToken());
        boolean writeJoin = false;

When we write a sstable with rows 00000 to 10000 the following following index is produced:

^@00000^A0
^@01000^A69000
^@02000^A138000
^@03000^A207000
^@04000^A276000
^@05000^A345000
^@06000^A414000
^@07000^A483000
^@08000^A552000
^@09000^A621000
 

Sweet! Now let's think about the use case of this index. It needs to provide one function: Given a rowkey return the byte offset for the sstable to begin searching at.

 public long findStartOffset(String token)

Here is most of the implementation:

  public long findStartOffset(String token) throws IOException {
    long offset = 0;
    do {
      if (bgIndex.dst[bgIndex.currentIndex] == SSTable.END_ROW) {
        bgIndex.advanceIndex();
      }
      readHeader(bgIndex);
      StringBuilder readToken = readToken(bgIndex);
      long thisOffset = readIndexSize(bgIndex);
      if(readToken.toString().equals(token)){
        return thisOffset;
      } else if (readToken.toString().compareTo(token) > 0) {
        return offset;
      } else {
        offset = thisOffset;
      }
    } while (bgIndex.currentIndex < bgIndex.dst.length - 1 || bgIndex.mbb.position()  < bgIndex.channel.size());
    return offset;
  }

Our sstable read only need a single change:

  public Val get (String row, String column) throws IOException{
    ...
    BufferGroup bg = new BufferGroup();
    bg.channel = ssChannel;
    bg.mbb = (MappedByteBuffer) ssBuffer.duplicate();
//bg.setStartOffset(0);
    bg.setStartOffset((int)index.findStartOffset(row));

With this change we start reading from a close offset rather than the beginning of the file. If we get lucky 1/1000 the index might help us even seek directly to the row!

What is next? I do not know maybe we can made the index and stable searching do a binary search. Maybe make bloom filters or even a key cache!

Thursday Dec 04, 2014

Building a Column Family store for fun and enjoyment

Recently I thought I would have some fun. What is my definition of fun? I decided to take on building my own Column Family data store. I decided to start with making a small server and add features one by one. I started with memtables, , but I decided it was time to up my game.

Cassandra writes go to memtables, eventually these memtables flush to disk as sstables. SS stands for Sorted String. Besides the sorted nature of the SSTables Cassandra uses Bloom Filters, and Indexes (every N rows) to make searches for keys efficient.

To build an SSTable I have been playing around with memory mapped byte buffers and file channels. The first step is turning a memtable (a Map<String,Map<String,Column>>) into an sstable.

This is not the most efficient design in terms of storing numbers as string in some cases but I can always evolve this later.

public void flushToDisk(String id, Configuration conf, Memtable m) throws IOException{
    File f = new File(conf.getSstableDirectory(), id + ".ss");
    OutputStream output = null;
    try {
      output = new BufferedOutputStream(new FileOutputStream(f));
      for (Entry<Token, ConcurrentSkipListMap<String, Val>> i : m.getData().entrySet()){
        output.write(START_RECORD);
        output.write(i.getKey().getToken().getBytes());
        output.write(END_TOKEN);
        output.write(i.getKey().getRowkey().getBytes());
        output.write(END_ROWKEY);
        boolean first = true;
        for (Entry<String, Val> j : i.getValue().entrySet()){
          if (!first){
            output.write(END_COLUMN);
            first = false;
          }
          output.write(j.getKey().getBytes());
          output.write(END_COLUMN_PART);
          output.write(String.valueOf(j.getValue().getCreateTime()).getBytes());
          output.write(END_COLUMN_PART);
          output.write(String.valueOf(j.getValue().getTime()).getBytes());
          output.write(END_COLUMN_PART);
          output.write(String.valueOf(j.getValue().getTtl()).getBytes());
          output.write(END_COLUMN_PART);
          output.write(String.valueOf(j.getValue().getValue()).getBytes());
        }
        output.write('\n');
      }
    }
    finally {
      output.close();
    }
  }

Cool we can write a memtable to disk! Next step is reading it.

  private RandomAccessFile raf;
  private FileChannel channel;
 
  public void open(String id, Configuration conf) throws IOException {
    File sstable = new File(conf.getSstableDirectory(), id + ".ss");
    raf = new RandomAccessFile(sstable, "r");
    channel = raf.getChannel();
  }

I am not going to share all the code to read through the data. It is somewhat tricky, I do not write stuff like this often, so I am sure I could do a better job looking at some other data stores and following what they do. This piece can always be evolved later.

I am proud of this class in particular. This BufferGroup makes it easier to navigate the file channel and buffer and read in blocks while iterating byte by byte.

public class BufferGroup {
  private int blockSize = 1024;
  byte [] dst = new byte [blockSize];
  int startOffset = 0;
  int currentIndex = 0;
  FileChannel channel;
  MappedByteBuffer mbb;
 
  public BufferGroup(){}
 
  void read() throws IOException{
    if (channel.size() - startOffset < blockSize){
      blockSize = (int) (channel.size() - startOffset);
      dst = new byte[blockSize];
    }
    mbb.get(dst, startOffset, blockSize);
    currentIndex = 0;
  }
 
  void advanceIndex() throws IOException{
    currentIndex++;
    if (currentIndex == blockSize){
      read();
    }
  }
}

Write to a memtable, flush it to disk, load the disk back into memory as a memory mapped buffer, and use that buffer for reads. Here is what it looks like when you put it all together.

   public void aTest() throws IOException{
    File tempFolder = testFolder.newFolder("sstable");
    System.out.println("Test folder: " + testFolder.getRoot());
    Configuration configuration = new Configuration();
    configuration.setSstableDirectory(tempFolder);
    Memtable m = new Memtable();
    Keyspace ks1 = MemtableTest.keyspaceWithNaturalPartitioner();
    m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2", "c", 1, 0L);
    Assert.assertEquals("c", m.get(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2").getValue());
    m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2", "d", 2, 0L);
    SSTable s = new SSTable();
    s.flushToDisk("1", configuration, m);
    s.open("1", configuration);
    long x = System.currentTimeMillis();
    for (int i = 0 ; i < 50000 ; i++) {
      Assert.assertEquals("d", s.get("row1", "column2").getValue());
    }
    System.out.println((System.currentTimeMillis() - x));
  }

I am pretty excited. I am not sure what I will do next indexes, bloom filter, commit log?




Tuesday Oct 21, 2014

Cassandra....bootstrapping

Would you expect apache-cassandra-2.0.10.jar  to work out of the box....


 INFO 19:39:44,690 Compacted 4 sstables to [/media/ephemeral0/cassandra/data/system/schema_columnfamilies/system-schema_columnfamilies-jb-9,].  9,410 bytes to 5,949 (~63% of original) in 41ms = 0.138376MB/s.  5 total partitions merged to 2.  Partition merge counts were {1:1, 4:1, }
 INFO 19:39:44,728 Compacted 4 sstables to [/media/ephemeral0/cassandra/data/system/schema_columns/system-schema_columns-jb-9,].  14,324 bytes to 10,549 (~73% of original) in 48ms = 0.209590MB/s.  5 total partitions merged to 2.  Partition merge counts were {1:1, 4:1, }
 INFO 19:40:14,566 JOINING: Starting to bootstrap...
 INFO 19:40:14,743 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Executing streaming plan for Bootstrap
 INFO 19:40:14,744 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Beginning stream session with /10.9.49.159
 INFO 19:40:14,744 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Beginning stream session with /10.9.52.15
 INFO 19:40:14,745 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Beginning stream session with /10.9.55.58
 INFO 19:40:14,745 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Beginning stream session with /10.9.57.215
 INFO 19:40:14,746 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Beginning stream session with /10.9.49.13
 INFO 19:40:14,821 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Session with /10.9.55.58 is complete
 INFO 19:40:14,820 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Session with /10.9.49.13 is complete
 INFO 19:40:14,821 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Session with /10.9.52.15 is complete
 INFO 19:40:14,854 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Session with /10.9.49.159 is complete
 INFO 19:40:14,860 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Session with /10.9.57.215 is complete
 WARN 19:40:14,862 [Stream #1403ce80-595a-11e4-94ed-1dc981d2ad0a] Stream failed
ERROR 19:40:14,864 Exception encountered during startup
java.lang.RuntimeException: Error during boostrap: Stream failed
    at org.apache.cassandra.dht.BootStrapper.bootstrap(BootStrapper.java:86)
    at org.apache.cassandra.service.StorageService.bootstrap(StorageService.java:994)
    at org.apache.cassandra.service.StorageService.joinTokenRing(StorageService.java:797)
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:612)
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:502)
    at org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:378)
    at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:496)
    at org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:585)
Caused by: org.apache.cassandra.streaming.StreamException: Stream failed
    at org.apache.cassandra.streaming.management.StreamEventJMXNotifier.onFailure(StreamEventJMXNotifier.java:85)
    at com.google.common.util.concurrent.Futures$4.run(Futures.java:1160)
    at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
    at com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
    at com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:202)
    at org.apache.cassandra.streaming.StreamResultFuture.maybeComplete(StreamResultFuture.java:216)
    at org.apache.cassandra.streaming.StreamResultFuture.handleSessionComplete(StreamResultFuture.java:191)
    at org.apache.cassandra.streaming.StreamSession.closeSession(StreamSession.java:364)
    at org.apache.cassandra.streaming.StreamSession.sessionFailed(StreamSession.java:569)
    at org.apache.cassandra.streaming.StreamSession.messageReceived(StreamSession.java:424)
    at org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
    at java.lang.Thread.run(Thread.java:744)
java.lang.RuntimeException: Error during boostrap: Stream failed
    at org.apache.cassandra.dht.BootStrapper.bootstrap(BootStrapper.java:86)
    at org.apache.cassandra.service.StorageService.bootstrap(StorageService.java:994)
    at org.apache.cassandra.service.StorageService.joinTokenRing(StorageService.java:797)
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:612)
    at org.apache.cassandra.service.StorageService.initServer(StorageService.java:502)
    at org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:378)
    at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:496)
    at org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:585)
Caused by: org.apache.cassandra.streaming.StreamException: Stream failed
    at org.apache.cassandra.streaming.management.StreamEventJMXNotifier.onFailure(StreamEventJMXNotifier.java:85)
    at com.google.common.util.concurrent.Futures$4.run(Futures.java:1160)
    at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
    at com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
    at com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:202)
    at org.apache.cassandra.streaming.StreamResultFuture.maybeComplete(StreamResultFuture.java:216)
    at org.apache.cassandra.streaming.StreamResultFuture.handleSessionComplete(StreamResultFuture.java:191)
    at org.apache.cassandra.streaming.StreamSession.closeSession(StreamSession.java:364)
    at org.apache.cassandra.streaming.StreamSession.sessionFailed(StreamSession.java:569)
    at org.apache.cassandra.streaming.StreamSession.messageReceived(StreamSession.java:424)
    at org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
    at java.lang.Thread.run(Thread.java:744)
Exception encountered during startup: Error during boostrap: Stream failed
 INFO 19:40:14,867 Announcing shutdown
 INFO 19:40:16,868 Waiting for messaging service to quiesce
 INFO 19:40:16,869 MessagingService has terminated the accept() thread

Of course not.

Sunday Aug 31, 2014

Stream Processing, Counting the things that are counting things

One of the qualities of good software is visibility. For me to provide users statistics on how the site is performing, data moves through a series of systems. Browser events fire from Javascript which are received by web servers, to be placed on a message queue. A process will read data from a message queue and write this data to a hadoop file system. Another process needs to consume this data and apply some streaming transformation before writing to a NoSQL database.

Imagine a system without enough visibility, a user might send you an email that stating "It looks like the count of hits to this page for today are off. There should be more hits." If you have no visibility into the system a request like this can turn out to be a nightmare. The data could be getting lost anywhere, in your custom software, in open source software, even in commercial software! The other possibility the person reporting the issue is just wrong. That happens to! Without visibility it is hard to say what is wrong, maybe the NoSQL database is dropping messages, or maybe it is your code? You just have to take a shot in the dark and start somewhere, maybe you pick the right component and find a bug, maybe you spend two weeks and find nothing wrong.

Now, imagine a system with enough visibility.  You would look at some graphs your software is maintaining and determine that "The number of messages sent to our NoSQL system is close (hopefully exact :) to the number of raw messages we received into our message queue". You could even go a step further and attempt to create pre-emptive alerts based on what is normal message flow for this time of day and day of week, so if there is an issue you can hopefully notice it and fix it before a user becomes aware of a problem.

Some rules:

  1. Count things in and out of each system. Even if the correlation is not 1 to 1 some relationship should exist that will become apparent over time
  2. Record things that are dropped or cause exception, actively monitor so this number stays close to 0
  3. Go for low hanging fruit, do not try to build an overarching system round one. If a sprint builds or adds a feature find a way to monitor this new feature.
  4. Time things that could be orders of magnitudes long. Use histograms to time DB requests that involve reading disk, things that can have a high variance if load increases.

Getting it done

With our stream processing platform, teknek, I had been doing counters and timers on a case by case basis in user code. I decided to extend this into the framework itself so that users would get some a set of metrics for free. Users have the ability to add their own metrics easily. (We will show the code to add your own counters later in this article)

The de-facto standard metrics package for Java is the coda-hale library. Originally called "yammer-metrics" it provides counters, meters, histograms and other types. It has a clever way to do sampling similar to streaming quantiles, so that it can efficiently keep a 95th percentile measurement without having to save a large number of samples in memory. Really really cool.

For each "plan" in teknek we have a series of counters that record events inbounds, operator retries, time to process the event, and more. In the image below "shutup" :) is the name of the plan. Metrics are captured both globally and on a per thread basis.

 Sample of metrics provided

Every teknek "operator" in the plan has it's own set of metrics. For example, if the plan has three steps such as "read from kafka", "lowercase", "write to cassandra", "write to hbase" metrics are kept on these for free with no extra effort for the user.

The same metrics library is available to each the operators you are implementing so custom metrics can be added with ease.

package io.teknek.metric;
import io.teknek.model.ITuple;
import io.teknek.model.Operator;

public class OperatorWithMetrics extends Operator {
  @Override
  public void handleTuple(ITuple tuple) {
    getMetricRegistry().meter("a.b").mark();
    getMetricRegistry().meter(getPath() + ".processed").mark();
    getMetricRegistry().meter(getPath() + "." + this.getPartitionId() + ".processed").mark();
  }
}

The theme of this post is visibility, and having counters in JMX is one form of visibility.

"But come on playa! You gotta up your big data game!"

No problem! It turns out that there is already great support in coda-hale metrics to send those metrics directly to graphite. Thus all the counters that you have in teknek are available in graphite with no extra effort. Graphite offers a number of ways to search group and make custom dashboards with this information.

Quick note: The coda-hale graphite reporter tends to send too many counters to graphite. For example it sends 50th,95th,99th,999th etc to graphite which generally more information then you need. Take a look at my graphite package which does a lot to trim down the metrics sent, adds host name, cluster name, and overall streamlines the process and configuration.

Conclusion

Build monitoring up front, make it a party of your definition of done. Good monitoring makes it easier to trouble shoot. It also makes it easier to be confident in beta testing or after releasing a new version of your software. With a new release old metrics should stay near there pre-release values and you can use the new metrics to reason that new features are working correctly in production.

The new features to teknek discussed in this post were incorporated in this pull request, and should appear in the 0.0.7 release.


Thursday Aug 21, 2014

CQL, did you just tell me to fck myself?

Last night decided to give CQL another chance. After about 20 minutes of hacking at a 1 row table I pretty much hit every caveat and error message possible in my quest to get some result that was not SELECT *. The query language is a minefield of things you CAN'T do!

cqlsh:test>  select * from stats where year='54'  and tags='bla' order by ycount ALLOW filtering;
Bad Request: ORDER BY with 2ndary indexes is not supported.

cqlsh:test> select * from stats where year='54' and ycount >  10  order by ycount ALLOW filtering;
Bad Request: No indexed columns present in by-columns clause with Equal operator

cqlsh:test> select * from stats where year='54' and ycount >  10 and tags='bla';
Bad Request: Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING

cqlsh:test> select * from stats where year='54' and ycount >  0  allow filtering;
Bad Request: No indexed columns present in by-columns clause with Equal operator
 

http://b.z19r.com/post/did-you-just-tell-me-to-go-fuck-myself

Sunday Aug 10, 2014

Why I am reverting from Java to C++

For a long time java has been good to me. I know these days everyone is looking to move on to maybe scala or closure or whatever, but I am actually looking the other way. Java has quite a large number of problems that are bubbling below the surface. Where to start...

Ok go back to 1995 when java was like an upstart. The going logic was that c++ pointers were "TOO COMPLICATED FOR PROGRAMMERS"... Fast forward to 2014 and even cell phone languages have pointers. So how is it that a lowly cell phone programmer can understand pointers but a server side coder thinks they are "too complicated"?

Ok but lets talk of the big ugly problem...GC and memory. I have been supporting a number of Java projects for a long time and the major performance bottleneck is always Garbage Collection. Now listen, regardless of what anyone tells you, there is NO way to tune away all the garbage collection issues. At some point if your application moves enough data the JVM will pause.

JVMs don't get much bigger then 10 GB of memory before its best performing GC algorithm CMS fall apart. I have seen it in batch processing systems like Hadoop and Hive, I have seen it in Hbase, I have seen it in cassandra. Want to run a database with a really fast fusion IO SSD under high load? CPU bottlenecks with GC before the disk runs out of IO. G1 the garbage collector that was supposed be an answer for these large heaps, it seems to be a large failure.

Many projects that require decent performance are doing some combination of java and off-heap memory. This I do not get. At the point where you start doing things off-heap you are basically start giving up everything Java provides for you. (thread safety, debugging) Besides the fact that it makes debugging harder, it still has more overhead than native code.

In many cases causes random corruptions due to library developers not actually writing these systems correctly. Followed by embarrassing statements like "Sorry our really smart really efficient off-heap thing x was fucking up for three versions and we just figured it out."

Let's talk more about memory. Java is just plain pig-ish with memory. Java objects have a number of bytes of overhead and an object is just way bigger in java than c++. "Enterprise" libraries are so big and bloated, I find myself having to tweak my eclipse JVM settings just so that I CAN DEVELOP java apps.

You can not even allocate a damn object on the stack, Java forces you to put it in heap. WTF?

Every enterprise java application seems to need about 512 MB of ram just to start up and about 2GB of overhead to run under light load...Hello cloud...No micro instances for me! Every hello world needs 4GB heap.

Back in 2007 a server machine had maybe 4GB memory... So no big deal that Java VM gets pause-ish with 13 GB heap....But now in 2014 I can get a server from amazon with 222GB ram. Machines with 1TB are around the corner, when i have a big-data application and I going to have to run 100-1000 shard-ed copies of a program on a single machine so I can simply just address the memory?

So I have started going backwards. Writing programs in c++. Falling in love with template functions and finding them more powerful then java's generics. Using lambdas in c++11 and saying, "what is the big deal with scala?". Using smart pointers in boost when I need to, freeing memory by hand when I do not.

Feels good , feels great. Feels great to run a program that only uses 4K of memory that starts up in .0000001 seconds. "Did that run? Yes it did run and its already finished!"

Calendar

Feeds

Search

Links

Navigation