Edward Capriolo

Tuesday Mar 17, 2015

Nibiru has a CLI!

Ow yea!

connect connect 127.0.0.1 7070
ok>
createcolumnfamily data pets
{}
ok>
describekeyspace data
[pets]
ok>
use data pets
ok>
set jack type bunny
ok>
get jack type
Val [value=bunny, time=1426649553036000, createTime=1426649553120, ttl=0]
ok>

Ow yea!

Saturday Mar 14, 2015

On my fling with the ASF and Cassandra

Nothing good lasts forever your know. At one point I may have been the #1 Apache Cassandra fanboy, buy like many ASF projects they basically reach the point where they stop being a meritocracy and start becoming a bureaucracy. 

A while back I tried to implement this idea in Cassandra that was not even a super complicated one. Do server side code. Like many times in Apache Software open source even if you are 100% willing to do ALL the work and TESTING to see something get done, a crew of people manages to block your ideas. You can read the ticket and read the responses form your own opinion.

Well as usual it takes people a while to come around to an idea. For the ASF this concept is basically once they think it is an idea them came up with...They love it.

Well there you go. Here is another classic Cassandra-isms for you:

In memory column families...https://issues.apache.org/jira/browse/CASSANDRA-1657

Closed: Later

http://www.datastax.com/documentation/datastax_enterprise/4.0/datastax_enterprise/inMemory.html

Great idea! So good we will make everyone pay for it as a secret sauce closed source feature!

Thursday Mar 12, 2015

Battling entropy with Hinted Handoff - Building a NoSQL store - Part 11

It has been nearly a month since my last NoSQL blog. :( Sorry about that. I actually spent some time working on my stream processing platform teknek. And yes in my free time I work on my own stream processing platform and my own nosql, and no, there is no support group I know of that can help me.

My last blog covered eventual consistency and CAP, so it makes sense to move entropy resolution. CAP is typically described as a tradeoff off between the three features Consistency, Availability, and Partition Tolerance. Eventually Consistent systems favor A. If you remember we demonstrated a case where we wrote data at different Consistency Levels and we observed what is returned to a client.  Let's describe a situation where entropy happens:

Imagine we are writing at Consistency Level ONE. The coordinator needs only wait for 1 ACK before returning success to the client:

client ->  serverX --->|
server3 |
server4 <---|
server5 <---|
server6 <---|

This is the normal case:


client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 -------------------->|
server6 -------------------->|

But imagine this happens:


client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 (brain fart) ?|
server6 -------------------->|

The system now has entropy, because we do not know if server5 died permanently, or if someone tripped over the Ethernet cable in the data center and is rushing to plug it back in. The write could have completed and the server died before the reply or not.

One way to combat entropy is to read from multiple places like reading from 2 nodes, a quorum of nodes, or ALL the nodes that have the data. Reads tend to be expensive and reading from multiple places multiplies the expense.

Hinting

Another way to deal with failed write/delete operations is to store a Hint (commonly called Hinted Handoff). Remember the column with the highest timestamp wins, so if writes are delivered late or are of order the final result is the one with the highest timestamp. To implement Hints we need to track which writes fail or timeout and put them in a place where they can be redelivered later. You can almost think of Hints as a queue of delayed writes.


I have refactored the EventualCoordinator slightly from the last blog. The keys aspects are the same. We have an ExecutorCompletionService completionService that has futures handling writes to networked nodes.

    else if (c.getLevel() == ConsistencyLevel.N) {
      Response response = handleN(start, deadline, completionService, destinations, merger, message, c); //1
      if (hinter != null) {
        maybeSendHints(remote, remoteFutures, start, deadline, hinter); //2
      }
      return response;
    }  

This part is going to bend your mind a bit. For replication THREE we would have THREE futures. Each one handling the write operation on a different server in the cluster. Lets assume the user has specified Consistency N as ONE. This means that the method handleN will return as soon as 1 future is complete. The other futures may also have returned or they may still be in the process. We should not consider them failed and create a hint yet, because we should give them up to the normal timeout window to complete. We do not want to block this coordinator thread anymore because the consistency level is met.

Since we do not want to block the client request anymore we need to handle the events still pending. We made an executor service 'lastChance' for this. RemoteMessageCallable is an abstraction I made that tracks the original message, which destination it was sent to, and provides a field so we can tell if the future logically finished all its work.

Again, we do not want maybeSendHints to block, because we have to return to the client.

private ExecutorService lastChance;

private void maybeSendHints(List<RemoteMessageCallable> remotes,
List<Future<Response>> remoteFutures, long start, long deadline, final Hinter hinter) {
for (int i = 0; i < remotes.size(); i++) {
final RemoteMessageCallable remote = remotes.get(i);
final Future<Response> future = remoteFutures.get(i);
//the future is done but the method must have throw exception we need a hint here
if (!remote.isComplete() && future.isDone()) {
hinter.hint(remote.getMessage(), remote.getDestination());
}

if (!remote.isComplete() && !future.isDone()){
final long remaining = deadline - System.currentTimeMillis();
//Not complete and not exception but time is up we need a hint here
if (remaining <= 0){
hinter.hint(remote.getMessage(), remote.getDestination());
} else {
//Not complete and not exception but not past the deadline either
//put request on lastChance executor so we do not block
lastChance.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
//here we block, than hint on failure :)
future.get(remaining, TimeUnit.MILLISECONDS);
if (!remote.isComplete()){
hinter.hint(remote.getMessage(), remote.getDestination());
}
return null;
}
});
}
}
}
}

Whew! When I write something like that I never know if I am the smartest guy in the world or if it is a fork bomb. I think it is l33t hacking... The next pieces of the puzzle are a little easier to grock.

Hinter

The first thing we will do is automatically create a column family to store our hints.

private void addSystemKeyspace(){
    Keyspace system = new Keyspace(configuration);
    KeyspaceMetaData ksmd = new KeyspaceMetaData();
    ksmd.setName("system");
    ksmd.setPartitioner( new NaturalPartitioner());
    ksmd.setRouter(new LocalRouter());
    system.setKeyspaceMetadata(ksmd);
    {
      Map<String,Object> properties = new HashMap<>();
      properties.put("implementing_class", DefaultColumnFamily.class.getName());
      system.createColumnFamily("hints", properties);
    }
    server.getKeyspaces().put("system", system);
  }

We had writes that did not complete, and what we want to do is put the message on a queue to be delivered later. We use destination as a row key, make the column a uuid, and store the message as JSON text in the value.

public class Hinter {

private final AtomicLong hintsAdded = new AtomicLong();
private final ColumnFamilyPersonality hintsColumnFamily;
private ObjectMapper OM = new ObjectMapper();

public Hinter(ColumnFamilyPersonality person){
this.hintsColumnFamily = person;
}

public void hint(Message m, Destination destination) {
try {
hintsColumnFamily.put(destination.getDestinationId(), UUID.randomUUID().toString(),
OM.writeValueAsString(m), System.currentTimeMillis() * 1000L);
hintsAdded.getAndIncrement();
} catch (IOException e) {}
}

}

This allows us to ask the data store "give me all the hints for server X". (This can be modeled other ways) This shows you how simple, flexible, and eloquent the ColumnFamily model is.

Hint Replayer

I just happened to read a post that suggested that class names ending in 'er' are bad ideas. Well fuck that noise! jk. You may remember that we have built plugable cluster membership into Nibiru. ClusterMembership allows us know if the other nodes in the cluster are up or down. We will implement the HintReplayer as a thread that:

  1. scans the list of cluster members that are ALIVE.
  2. If the host is ALIVE we see if we have any Hints for that node.
  3. If there are hints deliver them and then delete the hint
public class HintReplayer extends AbstractPlugin implements Runnable {

ObjectMapper om = new ObjectMapper();

public static final String MY_NAME = "hint_replayer";
private volatile boolean goOn = true;
private ColumnFamilyPersonality hintCf;

public HintReplayer(Server server) {
super(server);
}

@Override
public String getName() {
return MY_NAME;
}

@Override
public void init() {
hintCf = Coordinator.getHintCf(server);
runThread = new Thread(this);
runThread.start();
}

@Override
public void run() {
while (goOn){
List<ClusterMember> live = server.getClusterMembership().getLiveMembers();//1
for (ClusterMember member : live){
SortedMap<AtomKey,AtomValue> results = hintCf.slice(member.getId(), "", "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"); //2
if (results.size() > 0){
for (Entry<AtomKey, AtomValue> i : results.entrySet()){
if (i.getValue() instanceof ColumnValue){
ColumnValue v = (ColumnValue) i.getValue();
ColumnKey c = (ColumnKey) i.getKey();
writeAndDelete(member, c, v);
}
}
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) { }
}
}


private void writeAndDelete(ClusterMember m, ColumnKey c, ColumnValue v){
Destination d = new Destination();
d.setDestinationId(m.getId());
Client client = this.clientForDestination(d);
try {
client.post(om.readValue(v.getValue(), Message.class));
hintCf.delete(m.getId(), c.getColumn(), System.currentTimeMillis() * 1000);
hintsDelivered.getAndIncrement();
} catch ( IOException | RuntimeException e) {
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
}
hintsFailed.getAndIncrement();
}
}

Testing

We already have a test that demonstrated how the coordinator works.  Testing this code is basic. We shut down some servers and do a write. The node that receives the write should store hints for the down nodes, when those nodes come back alive the hints should be re-delivered.

    //insert when some nodes are down
failIfSomeAreDown(s, clAll);
for (int i=0;i<2;i++){
s[i].shutdown();
}
//start up all nodes
for (int i = 0; i < s.length; i++) {
s[i] = new Server(cs[i]);
s[i].init();
}
//get the hint replayer from the first node.
HintReplayer h = (HintReplayer) s[0].getPlugins().get(HintReplayer.MY_NAME);
for (int tries = 0; tries < 10; tries++) {
//wait for the delivered counter to get to 3
long x = h.getHintsDelivered();
if (x == 3) {
break;
}
Thread.sleep(1000);
}
Assert.assertEquals(3, h.getHintsDelivered());
//Now go to the servers and ensure the new value has been delivered
int found = 0;
for (int i = 0; i < s.length; i++) {
ColumnValue cv = (ColumnValue) s[i].get("abc", "def", "a", "b");
if (cv != null && cv.getValue().equals("d")){
found++;
}
}
Assert.assertEquals(3, found);

Conclusion

Hints help us ensure writes and deletes do not get lost during random outages. They make the system more self-healing, and in CAP they can help you have a 'little more C'. Even though hinted handoff does not provide any additional guarantees that the Consistency Level does not.

What should Nibiru do next? Hit me up on twitter and let me know what NoSQL feature I should try to build next!


                    

Saturday Jan 31, 2015

Request coordination, eventual consistency, and CAP - Building a NoSQL store - Part 10

In the last blog we mentioned the CAP theorem but now it is time to dive in. Distributed systems face challenges that non distributed systems do not. If data is replicated the system needs some notion of where data is, how it is replicated for redundancy, how the data is kept in sync, and what operations are available to the user.

"Eventual consistency" was the "Affordable Care Act" of the Dynamo Style (Cassandra) vs Strongly Consistent (HBase) debate. So embattled was the term "Eventual consistency" that "Tunable consistency" was coined. Fast forward some time and Cassandra now supports "atomic compare and swap" and HBase has support for "sometimes relaxed consistency". ROFL. That, as they say, is politics ... I mean... vendor driven open source...

By the way, in case you think I am a drama queen/conspiracy theorist and surely there is no quasi-political system attempting to defame eventual consistency, I did some searches for reference links to put in this article. As a result I have fallen into an anti-eventual consistency add targeting campaign...



In any case, a point I will constantly beat to death Nibiru aims to be plugable-everything. A lofty goal is to support all (possible) consistency models. Can it be done? I don't know, but it is worth trying I guess. I do not have all the answers. That being said, a system can not be "half-atomic" or "kinda-consistent", but per Column Family/Table we should be able to chose what consistency model fits what we want. For example, one store "pets" is an eventually consistency ColumnFamily store while another store "credit_cards" is configured as a strongly consistent key value store.

Cassandra's implementation of CAP is to provide per-request Consistency Level. The consistency level allows the user to say what aspects they desire. For example:

WRITE.ALL: The write is acknowledged (durable) by ALL Natural Endpoints  (all replicas of the data) before a result is returned to the client.
READ.ALL: The read request is sent to ALL Natural Endpoints the results are compared and the highest time stamp wins. (Other logic for timestamp tie)

WRITE.ONE: The request is sent to all Natural Endpoints, after one acknowledged (durable) write completes the result is returned, other writes still continue on the server
READ.ONE: The request is sent to one (or more) Natural Endpoints, the first result is returned to the client

There are other Consistency Levels including QUORUM, LOCAL_ONE, Serial (For compare and swap), etc, but save those for another day.

As our last blog showed a router can return one or more Destination(s)/Natural Endpoints for a given row key. Given the destinations and the consistency level we can make different strategies (code paths) for the desired results.

Lets take a case that is easy to explain, WRITE.ALL. A client sends a write operation for Column Family pets
{ rowkey: rover , column: type, value: poodle }

The node that receives this request is called the coordinator. The coordinator computes the Natural Endpoints (one of which could be local) where the data lives. For replication factor N, N futures will be created, each future will forward the request. Since the request is as CONSISTENCY ALL, we want to wait for all the futures to finish before a result is returned to the client. We also have an upper limit on how long we wish to wait for the futures before we consider a timeout.

This process looks something like this:

client ->  serverX --->|
server3 |
server4 <---|
server5 <---|
server6 <---|

client -> serverX <--(merge results) <-|
server3 |
server4 -------------------->|
server5 -------------------->|
server6 -------------------->|

I took the approach of separating the EventualCoordinator (the thing managing the futures) from the Merger (the logic merging the results). I did this because HighestTimestampWins does not make sense for a key value store which has no timestamp.

Lets do this

The coordinator does a few things. First the request could have a local component. The LocalAction class contains the logic to physically carry out the operation locally. The merger is a class that produces one result from multiple responses. Because N nodes will return N results, but only a single result is returned to the client. Again we defined the LocationAction and the Merge here because the eventualCoordinator is message/response agnostic.

  public Response handle(Message message) {
...
Token token = keyspace.getKeyspaceMetadata().getPartitioner().partition((String) message.getPayload().get("rowkey"));
List<Destination> destinations = keyspace.getKeyspaceMetadata().getRouter()
.routesTo(message, server.getServerId(), keyspace, server.getClusterMembership(), token);
long timeoutInMs = determineTimeout(columnFamily, message);

if (ColumnFamilyPersonality.COLUMN_FAMILY_PERSONALITY.equals(message.getRequestPersonality())) {
LocalAction action = new LocalColumnFamilyAction(message, keyspace, columnFamily);
ResultMerger merger = new HighestTimestampResultMerger();
return eventualCoordinator.handleMessage(token, message, destinations,
timeoutInMs, destinationLocal, action, merger);

}
...
}

So let's dive into the eventual coordinator. If there is only one destination and that destination is here. We do the action and return the results.

  public Response handleMessage(Token token, final Message message, List<Destination> destinations,
long timeoutInMs, Destination destinationLocal, final LocalAction action, ResultMerger merger) {
...
if (destinations.size() == 1 && destinations.contains(destinationLocal)) {
return action.handleReqest();
}

This next part prevents us from 1 request turning into 3 turning into 9 ...turning into doom. Basically we tag messages with "reroute" so we know if it is a client or server that sent this message.

    if (message.getPayload().containsKey("reroute")){
return action.handleReqest();
}
if (!message.getPayload().containsKey("reroute")){
message.getPayload().put("reroute", "");
}

I'm back Doc. I'm back from the future!

Java's ExecutorCompletionService is pretty nifty. It reminds me of what ThreadGroup does for threads. For each Destination the data is either local or remote so we have a callable to deal with both cases. LocalActionCallable wraps the LocalAction and the RemoteMessageCallable forwards the request over the network.

    ExecutorCompletionService<Response> completionService = new ExecutorCompletionService<>(executor);
final ArrayBlockingQueue<Response> results = new ArrayBlockingQueue<Response>(destinations.size());
ArrayBlockingQueue<Future<Response>> futures = new ArrayBlockingQueue<Future<Response>>(destinations.size());
for (final Destination destination : destinations) {
Future<Response> f = null;
if (destination.equals(destinationLocal)) {
f = completionService.submit(new LocalActionCallable(results, action));
} else {
f = completionService.submit(new RemoteMessageCallable(results, clientForDestination(destination), message));
}
futures.add(f);
}

After the futures have been submitted we want to wait for some amount of time for all to return. If we hit the deadline we fail, if we do not get all results before the deadline we fail, and if one future fails we fail.

    long start = System.currentTimeMillis();
long deadline = start + timeoutInMs;
List<Response> responses = new ArrayList<>();
if (c.getLevel() == ConsistencyLevel.ALL) {
while (start <= deadline) {
Response r = null;
try {
Future<Response> future = completionService.poll(deadline - start, TimeUnit.MILLISECONDS);
r = future.get();
if (r != null){
responses.add(r);
}
} catch (InterruptedException | ExecutionException e) {
break;
}
if (r == null){
return new Response().withProperty("exception", "coordinator timeout");
}
if (r.containsKey("exception")){
return r;
}
if (responses.size() == destinations.size()){
//success condition
break;
}

start = System.currentTimeMillis();
}
if (responses.size() == destinations.size()){
return merger.merge(responses, message);
} else {
return new Response().withProperty("exception", "coordinator timeout");
}
}

Note at level ALL two success and one fail is still a fail. Write operations do not have anything fancy to merge but reads do. Here is the highest timestamp wins merge logic. Every time I write the "find the highest" algorithm I am reminded of my first CS class in fortan. Thanks Professor D! (But we aint fortran-in no more)

 
private static ObjectMapper OM = new ObjectMapper();

@Override
public Response merge(List<Response> responses, Message message) {
if ("put".equals(message.getPayload().get("type"))
|| "delete".equals(message.getPayload().get("type"))) {
return new Response();
} else if ("get".equals(message.getPayload().get("type"))) {
return highestTimestampResponse(responses);

} else {
return new Response().withProperty("exception", "unsupported operation " + message);
}
}

private Response highestTimestampResponse(List<Response> responses){
long highestTimestamp = Long.MIN_VALUE;
int highestIndex = Integer.MIN_VALUE;
for (int i = 0; i < responses.size(); i++) {
Val v = OM.convertValue(responses.get(i).get("payload"), Val.class);
if (v.getTime() > highestTimestamp) {
highestTimestamp = v.getTime();
highestIndex = i;
}
}
return responses.get(highestIndex);
}

}

Testing

This code is taken from the unit test here. (I should be doing perma-links I am so lazy.) We should be able to test two things:

  1. If all nodes are up, operations at ALL should pass
  2. If at least one node is down, operations at ALL should fail
    ColumnFamilyClient cf = new ColumnFamilyClient(s[0].getConfiguration().getTransportHost(), s[0]
.getConfiguration().getTransportPort());
Session clAll = cf.createBuilder().withKeyspace("abc").withColumnFamily("def")
.withWriteConsistency(ConsistencyLevel.ALL, new HashMap())
.withReadConsistency(ConsistencyLevel.ALL, new HashMap()).build();
passIfAllAreUp(s, clAll);
failIfSomeAreDown(s, clAll);

public void passIfAllAreUp(Server [] s, Session sb) throws ClientException {
Response r = sb.put("a", "b", "c", 1);
Assert.assertFalse(r.containsKey("exception"));
int found = 0 ;
for (int i = 0; i < s.length; i++) {
ColumnFamilyPersonality c = (ColumnFamilyPersonality) s[i].getKeyspaces().get("abc").getColumnFamilies().get("def");
Val v = c.get("a", "b");
if (v != null){
//assert data is in three places
Assert.assertEquals("c", c.get("a", "b").getValue());
found ++;
}
}
Assert.assertEquals(3, found);
Assert.assertEquals("c", sb.get("a", "b").getValue());

}

public void failIfSomeAreDown(Server [] s, Session sb) throws ClientException {
for (int i = 2; i < s.length; i++) {
s[i].shutdown();
try {
sb.put("a", "b", "c", 1);

} catch (ClientException ex){
Assert.assertTrue(ex.getMessage().equals("coordinator timeout"));
}

}
}

Sweet! Dude! There you have it! Eventual Consistency, I mean Tunable Consistency, I mean Sometimes Relaxed Consistency :) Basically from here implementing other consistency levels is adding different case logic and pulling results from the ExecutorCompletionService. What is coming next? Hinted Handoff I think :) Stay tuned!

Wednesday Jan 28, 2015

The curios case of the never joining node.

[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h  10.9.54.233 status
Note: Ownership information does not include topology; for complete information, specify a keyspace
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address      Load       Tokens  Owns   Host ID                               Rack
UN  10.9.55.181  47.47 GB   256     10.4%  5a635595-1cc9-4b43-8e6b-12ef4639c1db  rack1
UN  10.9.58.53   42.12 GB   256     8.9%   bca09da7-85a9-476b-9a60-1b6a0973f375  rack1
UN  10.9.55.58   44.71 GB   256     10.0%  06b6a87e-7c3e-42ae-be28-20efe8920615  rack1
UN  10.9.52.15   48.01 GB   256     9.9%   d5746748-457e-4821-8f3d-bdac850cfcd3  rack1
UN  10.9.57.215  45.47 GB   256     10.3%  1d3bdec7-6f0b-4bc9-9329-72cd41cf5934  rack1
UJ  10.9.54.233  29.35 GB   256     ?      e973ef19-bd06-4a2a-bff1-661742b5ca14  rack1
UN  10.9.59.47   43.39 GB   256     11.3%  ea95e65d-8365-402c-aab9-3658fe5cc69f  rack1
UN  10.9.48.198  46.3 GB    256     10.7%  007c3169-dde4-4d81-acb9-3bbeda76723a  rack1
UN  10.9.49.224  48.15 GB   256     9.6%   0ed47a3d-cf70-4e08-99a3-3fdbe5ea4c72  rack1
UN  10.9.58.78   48.39 GB   256     9.4%   ca0144ac-948e-49ce-a3b9-857827c3cf5b  rack1
UN  10.9.49.13   43.54 GB   256     9.6%   59d23ac8-50ab-45dc-9ddc-d7f2825e1169  rack1


[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h  10.9.54.233 netstats
Mode: JOINING
Bootstrap d4ab87c0-a6ff-11e4-82bf-6f05f2e5b99d
    /10.9.55.181
    /10.9.48.198
    /10.9.59.47
    /10.9.58.78
    /10.9.57.215
    /10.9.52.15
    /10.9.49.13
    /10.9.58.53
    /10.9.55.58
Read Repair Statistics:
Attempted: 0
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name                    Active   Pending      Completed
Commands                        n/a         0              0
Responses                       n/a         0        4248573

Can this be expla

[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h  10.9.54.233 join
This node has already joined the ring.

Yea you could have fucking fooled me.

[ecapriolo@production-cassandra-54-233 ~]$ /opt/cassandra/bin/nodetool -h  10.9.54.233 gossipinfo
production-cassandra-54-233.use1.huffpo.net/10.9.54.233
  generation:1422457813
  heartbeat:12652
  LOAD:3.1492231438E10
  RELEASE_VERSION:2.0.10
  DC:datacenter1
  HOST_ID:e973ef19-bd06-4a2a-bff1-661742b5ca14
  NET_VERSION:7
  RPC_ADDRESS:10.9.54.233
  SCHEMA:a948f8b3-dbc3-37f7-b121-c8eacfe9a772
  SEVERITY:0.0
  STATUS:BOOT,-2466632499206578162
  RACK:rack1

Monday Jan 26, 2015

ConvoToMetaException


https://twitter.com/tupshin/status/559833624913379330

Good: Good to have a storage format that matches what the database does.

C* has come a long way over the past few years, and unfortunately our storage format hasn't kept pace with the data models we are now encouraging people to utilise.

Bad! Why people encouraged to utilize database wrong?

??And historically, "columnar oltp sql data store" has not really been a thing.

Bad/Confused? is Cassandra a columnar oltp data store? is it trying to become one?

I see it as specialization for special cases.

Good!

but:

https://issues.apache.org/jira/browse/CASSANDRA-2995

  • deal with potential use cases where maybe the current sstables are not the best fit
  • allow several types of internal storage formats (at the same time) optimized for different data types

I understand the "wouldn't it be cool" factor but let's be clear: Cassandra is not a research-paper creation engine. Pluggability here improves generality at the cost of a substantial increase of implementation complexity, QA work, and cognitive burden on users. This is a bad direction for the project.

Bad: Basically it got hand waved, the concept of making a special format or letting the database be pluggable in any way. Got shot down even though people were interested.

That all being said.

Cassandra has evolved/is evolving into a quasi colummnar, kinda fixed schema, around custom query language database. As such it only makes sense that it have a quasi/columnar, kinda fixed schema friendly sstable format (not saying that jokiningly).

Also if it works just as well in the general case, that is good as well. Assuming it does not take 4 major versions to become stable, and in the process cause multiple heart aches, along the lines of broken streaming, lost data shit.

Request Routing - Building a NoSQL Store - Part 9

This blog series is trucking along. What fun! I decided to switch the titles from 'Building a Column Family store' to 'building a NoSQL store' because we are going far beyond demonstrating a Column Family, store since a Column Family is only an on-disk representation of data.

In our last blog we used a gossiper for cluster membership and showed how to replicating meta-data requests across the cluster. Now, we will focus on implementing plugable request routing. Unless you have been living under a rock you are probably aware of a few things like the CAP theorem and Dynamo Style consistent hashing. Stepping back a bit, a goal of Nimbiru is to support plugable everything, and request routing is no exception. For example, a user may wish a system where the node they send the operation to is the node where data is stored, they may wish a system like Cassandra where data is stored on multiple cluster nodes using consistent hashing, or something else entirely.


To make things plugable we start with a router interface.

public interface Router {

/**
* Determine which hosts a message can be sent to. (in the future keyspace should hold a node list)
* @param message the message sent from the user
* @param local the uuid of this server
* @param requestKeyspace the keyspace/database of the request
* @param ClusterMembership an interface for live/dead cluster nodes
* @param token a token (typically a hash) computed from the rowkey of the message
* @return all hosts a given request can be routed to
*/
public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace, ClusterMembership clusterMembership, Token token);

}

Wow! This method has a lot of arguments! Haha. Some of the parameters (clusterMembership and ServerId) probably could have been constructor injected, and token and message are a redundant (because you can determine the other) but for now using a pure interface seemed attractive (since everyone hates on abstract classes these days). Not all routers need each of these things.

I mentioned two cases. Lets start with an easy case store and read data locally only. 

public class LocalRouter implements Router{

@Override
public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
Destination d = new Destination();
d.setDestinationId(local.getU().toString());
return Arrays.asList(d);
}

}

This is fairly easy to reason about. For any request the answer is found here! This is basically a do-it-yourself (outside of nibiru) sharding. It also makes sense for a single node deployment.

Now it gets interesting

The dynamo style request routing is more involved. Even thought it has been explained many times I will offer my own quick description.

At first do not think ring, instead think of hashing a key into a flat fixed sized array.

10 MOD 7
[][][][10][][][]

If you have replication you would also write that next slot. Imagine we had replication 3.

10 MOD 7. Replication 3
[][][][10][10][10][]

What happens if the data hashes to the last element? Think of the array as a ring and the replication will wrap-around.

6 MOD 7. Replication 3
[6][6][][][][][6]

Implementing this is slightly different. We do not use array indexes. Instead we consider that each node has a token that represents it's position in the ring.

The token router needs to be aware of two facts, a Replication Factor (RF) that describes how many copies of data, and a token map that describes the splits.

Algorithm
1. Find the first destination in the sorted map
2. while < replication factor add the next destination (wrap around if need be)

public class TokenRouter implements Router {

public static final String TOKEN_MAP_KEY = "token_map";
public static final String REPLICATION_FACTOR = "replication_factor";

@Override
public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
//TODO this is not efficient we should cache this or refactor
Map<String, String> tokenMap1 = (Map<String, String>) requestKeyspace
.getKeyspaceMetadata().getProperties().get(TOKEN_MAP_KEY);
TreeMap<String,String> tokenMap = new TreeMap<String,String>(tokenMap1);
Integer replicationFactor = (Integer) requestKeyspace
.getKeyspaceMetadata().getProperties().get(REPLICATION_FACTOR);
int rf = 1;
if (replicationFactor != null){
rf = replicationFactor;
}
if (rf > tokenMap.size()) {
throw new IllegalArgumentException(
"Replication factor specified was larger than token map size");
}
List<Destination> destinations = new ArrayList<Destination>();

Map.Entry<String,String> ceilingEntry;
ceilingEntry = tokenMap.ceilingEntry(token.getToken());//1
if (ceilingEntry == null){
ceilingEntry = tokenMap.firstEntry();//1

}
destinations.add(new Destination(ceilingEntry.getValue()));
for (int i = 1; i < rf; i++) {
ceilingEntry = tokenMap.higherEntry(ceilingEntry.getKey());//2
if (ceilingEntry == null) {
ceilingEntry = tokenMap.firstEntry();//2
}
destinations.add(new Destination(ceilingEntry.getValue()));
}
return destinations;
}

If that code is making your head buzz the unit test may explain it better. One of the really great parts of Nibiru is that the decision to make everything plugable makes testing and developing easier.

We can mock the cluster membership to provide a 3 node cluster that is always 'alive'.

  private ClusterMembership threeLiveNodes(){
ClusterMembership mock = new ClusterMembership(null, null) {
public void init() {}
public void shutdown() {}
public List<ClusterMember> getLiveMembers() {
return Arrays.asList( new ClusterMember("127.0.0.1", 2000, 1, "id1"),
new ClusterMember("127.0.0.2", 2000, 1, "id2"),
new ClusterMember("127.0.0.3", 2000, 1, "id3"));
}
public List<ClusterMember> getDeadMembers() { return null; }};
return mock;
}
}


Next we give each node a single token.

  public static TreeMap<String,String> threeNodeRing(){
TreeMap<String,String> tokenMap = new TreeMap<>();
tokenMap.put("c", "id1");
tokenMap.put("h", "id2");
tokenMap.put("r", "id3");
return tokenMap;
}

Now we can throw some data at it and see how the router routes it. Notice here that the "z" key "wraps" around to the first node.

  @Test
public void test(){
TokenRouter token = new TokenRouter();
Keyspace keyspace = new Keyspace(new Configuration());
KeyspaceMetaData meta = new KeyspaceMetaData();
Map<String,Object> props = new HashMap<>();
props.put(TokenRouter.TOKEN_MAP_KEY, threeNodeRing());
meta.setProperties(props);
keyspace.setKeyspaceMetadata(meta);
Partitioner p = new NaturalPartitioner();
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("a")).get(0).getDestinationId());
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("aa")).get(0).getDestinationId());
Assert.assertEquals("id2", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("h")).get(0).getDestinationId());
Assert.assertEquals("id3", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("i")).get(0).getDestinationId());
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("z")).get(0).getDestinationId());
}

Great success! Now try with a replication factor of 2!

  @Test
public void testWithReplicationFactor(){
TokenRouter token = new TokenRouter();
Keyspace keyspace = new Keyspace(new Configuration());
KeyspaceMetaData meta = new KeyspaceMetaData();
Map<String,Object> props = new HashMap<>();
props.put(TokenRouter.TOKEN_MAP_KEY, threeNodeRing());
props.put(TokenRouter.REPLICATION_FACTOR, 2);
meta.setProperties(props);
keyspace.setKeyspaceMetadata(meta);
Partitioner p = new NaturalPartitioner();
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("a")));
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("aa")));
Assert.assertEquals(Arrays.asList(new Destination("id2"), new Destination("id3")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("h")));
Assert.assertEquals(Arrays.asList(new Destination("id3"), new Destination("id1")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("i")));
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("z")));
}

Correctly the API returns two "natural endpoints" for the given token. Great Success!

Sweet! So I do not want to pat myself on the back much, but I have to take this opportunity to say I love the way Nibiru is shaping up. When I presented the idea of a completely plugable NoSql to a friend he did voice a concern that having 'plugable everything' could cause the system to have 'lots more bugs' because of all the potential use cases. I actually do not see it that way. I think that forcing as many components into interfaces and API's is making the process cleaner by having less use-case specific hacks and optimizations.

This is important if you think about it. Remember that both hbase and cassandra are data stores that were basically 'thrown away' by their creators, powerset and facebook respectively. (Yes they were both thrown away, abandoned and open sourced). IMHO both of them suffer by being build on specific assumptions for specific use cases. ( X is good and Y is bad because we value most X type rhetoric)

The next blog is going to show some already working code about coordinating requests against N nodes and building some of the plugable consistency on the read/write path. Futures, executors, and multi node failures coming, fun!

 


Monday Jan 19, 2015

Auto Clustering and gossip - Building a Column Family store - Part 8

Up until this point we have been building a single node NoSQL implementation. Though a NoSql can by definition be a single node system often times a NoSql data store is  a multi-node solution. Building a multiple node system has many challenges that a single node system does not. The designers need to consider how the CAP Theorem applies to the API they are attempting to implement. A goal of nibiru is to create a plug-able NoSql system build around APIs. Thus we can think of Cluster Membership as something plug-able.

Plug-able Cluster Membership

I have commonly seen a few systems for cluster membership:

  1. Static: Cluster membership is configuration based and there is no node discovery (e.g. zookeeper static list of peers)
  2. Master: Nodes register themselves with an entity (e.g HBase register servers register with a master)
  3. Peer-to-Peer: Nodes can auto-discover each other typically with one or more known contact points (e.g. Cassandra/Riak)
  4. active-passive: At any given time a single node is the master and typically there is an automatic/manual failover

Other components in the nosql stack will use the cluster information to route requests and place data. Lets shell out a base class for a ClusterMembership:

public abstract class ClusterMembership {

protected Configuration configuration;
protected ServerId serverId;

public ClusterMembership(Configuration configuration, ServerId serverId){
this.configuration = configuration;
this.serverId = serverId;
}

public abstract void init();
public abstract void shutdown();
public abstract List<ClusterMember> getLiveMembers();
public abstract List<ClusterMember> getDeadMembers();

}


public class ClusterMember {
private String host;
private int port;
private long heatbeat;
private String id;
...
}


Because I am from the Dynamo school of hard knocks, I decided to start off with method #3 and forked off a gossip implementation I found. Each nibiru node will create a node-id (uuid) on start up, get a seed-list from the configuration, and then attempt to join the cluster.

public class GossipClusterMembership extends ClusterMembership{

public static final String HOSTS = "gossip_hosts_list";
public static final String PORT = "gossip_port";

private GossipService gossipService;

public GossipClusterMembership(Configuration configuration, ServerId serverId) {
super(configuration, serverId);
}

@Override
public void init() {
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
GossipSettings settings = new GossipSettings();
List<String> hosts = null;
if (configuration.getClusterMembershipProperties() != null){
hosts = (List<String>) configuration.getClusterMembershipProperties().get(HOSTS);
} else {
hosts = new ArrayList<String>();
}
int port = 2000;
for (String host: hosts){
GossipMember g = new RemoteGossipMember(host, port, "");
startupMembers.add(g);
}
try {
gossipService = new GossipService(configuration.getTransportHost(), port, serverId.getU().toString(), LogLevel.DEBUG, startupMembers, settings);
} catch (UnknownHostException | InterruptedException e) {
throw new RuntimeException(e);
}
gossipService.start();
}

Now, we should be able to make a test and prove that N instances of Nibiru have locate each other via gossip. We use 127.0.0.1 as the seed-list and the other two nodes 127.0.0.2 and 127.0.0.3 should discover each other.

  @Test
public void letNodesDiscoverEachOther() throws InterruptedException, ClientException{
Server [] s = new Server[3];
{
Configuration conf = TestUtil.aBasicConfiguration(node1Folder);
Map<String,Object> clusterProperties = new HashMap<>();
conf.setClusterMembershipProperties(clusterProperties);
conf.setTransportHost("127.0.0.1");
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList("127.0.0.1"));
s[0] = new Server(conf);
}
{
Configuration conf = TestUtil.aBasicConfiguration(node2Folder);
Map<String,Object> clusterProperties = new HashMap<>();
conf.setClusterMembershipProperties(clusterProperties);
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList("127.0.0.1"));
conf.setTransportHost("127.0.0.2");
s[1] = new Server(conf);
}
{
Configuration conf = TestUtil.aBasicConfiguration(node3Folder);
Map<String,Object> clusterProperties = new HashMap<>();
conf.setClusterMembershipProperties(clusterProperties);
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList("127.0.0.1"));
conf.setTransportHost("127.0.0.3");
s[2] = new Server(conf);
}
for (Server server : s){
server.init();
}
Thread.sleep(11000);
Assert.assertEquals(2 , s[2].getClusterMembership().getLiveMembers().size());
Assert.assertEquals("127.0.0.1", s[2].getClusterMembership().getLiveMembers().get(0).getHost());

Winning! Now nodes can discover each other! The next question is what message types should be sent over gossip and which should be sent via some other method. I decided to keep the gossip code for only cluster membership..

The first thing I decided to tackle was meta-data operations. This makes sense because without tables or column families, we can nor write or read from tables or column families. Duh! The approach I took was:

if a node gets a meta-data request then re-broadcasts it out to all live hosts.

This is not a perfect solution because we need something to re-broadcast messages to down hosts (and ordering could be an issue down the line) but for now it gets the point across. We can potentially use something like hinted-handoff to deal with this. The piece that is handling message routing is known as the coordinator.

  public Response handle(Message message) {
if (SYSTEM_KEYSPACE.equals(message.getKeyspace())) {
return metaDataCoordinator.handleSystemMessage(message);
}
//other stuff

What we are doing here is: if the message does not have "reroute" in its payload then broadcast it to all. Otherwise we handle it locally. (By broadcast I mean send N unicast messages not a tcp broadcast or multicast)

  public Response handleSystemMessage(final Message message){
if (MetaPersonality.CREATE_OR_UPDATE_KEYSPACE.equals(message.getPayload().get("type"))){
metaDataManager.createOrUpdateKeyspace((String)message.getPayload().get("keyspace"),
(Map<String,Object>) message.getPayload().get("properties"));
if (!message.getPayload().containsKey("reroute")){
message.getPayload().put("reroute", "");
List<Callable<Void>> calls = new ArrayList<>();
for (ClusterMember clusterMember : clusterMembership.getLiveMembers()){
final MetaDataClient c = clientForClusterMember(clusterMember);
Callable<Void> call = new Callable<Void>(){
public Void call() throws Exception {
c.createOrUpdateKeyspace(
(String)message.getPayload().get("keyspace"),
(Map<String,Object>) message.getPayload().get("properties")
);

return null;
}};
calls.add(call);
}
try {
List<Future<Void>> res = metaExecutor.invokeAll(calls, 10, TimeUnit.SECONDS);
//todo return results to client
} catch (InterruptedException e) {

}
}
return new Response();
} else ...

Obviously this implementation needs better error handling and potentially longer timeouts. We test this by taking our last test and sending out a meta-data request. Within a few milliseconds the other nodes should have received and processed that message. In other words create a keyspace and column family then check each not to ensure this is complete on all hosts.

    MetaDataClient c = new MetaDataClient("127.0.0.1", s[1].getConfiguration().getTransportPort());
c.createOrUpdateKeyspace("abc", new HashMap<String,Object>());

Thread.sleep(1000);
for (Server server : s){
Assert.assertNotNull(server.getKeyspaces().get("abc"));
}
Map<String,Object> stuff = new HashMap<String,Object>();
stuff.put(ColumnFamilyMetaData.IMPLEMENTING_CLASS, DefaultColumnFamily.class.getName());
c.createOrUpdateColumnFamily("abc", "def", stuff);
Thread.sleep(1000);
for (Server server : s){
Assert.assertNotNull(server.getKeyspaces().get("abc").getColumnFamilies().get("def"));
}
for (Server server : s){
server.shutdown();
}

Great. Now that we have cluster membership we can start tackling the really - really - really fun stuff like request routing and quorum reads etc! Stay tuned!


Saturday Jan 03, 2015

Impala vs Hive - un Buzzifying big data

Everyone loves fast queries, no denying that. Everyone loves charts that say things are fast. I had some time to sit down and play with Impala for the past couple weeks and wanted to give my initial impressions on the debate from the seat of someone who uses Hive, MySql, and !Vertica! in my daily life.

Pre-blog rando notes

I had just installed CDH 5.3 in our 3 node staging sandbox. Believe it or not, I spend most of my time and very modest sized hardware clusters, 3 nodes or 10 nodes. These days I have read some vendor documentation that "suggests" the "standard" Hadoop now the i2.8xlarge, the computer that is $2476.16 monthly and $3.3920 hourly, with 200+ GB of RAM (on one year contract). Well guys like me without deep pockets routinely use m1.xlarge which is $163.52 monthly and $0.2240 hourly.

For hadoop, I sometimes go with m1.xlarge, so I can get 4x 400 GB disks without paying for crazy enterprise SSDs or 250GB RAM. I guess this makes me the oddball. This could lead me into an amazon rant, about lack of machines with nice local storage options but I will not go there.

Impala round 1: Kick the tires, they kick back

So anyway impala. I click a few buttons in Cloudera Manager and it is installed. Great. That was easy button. I go on a hadoop node and run impala-shell.

My old tables are there. New tables I am making do not appear. I spend some time and my co-worker finds a command  "invalidate metadata" which I run every so often. (maybe there is some way I would not need to do this, but not interested in searching ATM)

I query our main table.

impala > "select * from weblogs limit 4"
impala "Sorry I dont know what com.huffingtonpost.Serde is"

Ok fair. I expected this. I know impala is written in c/c++ and it is not going to be able to understand every format. Especially those just ones written in java (although it can bridge java UDFS). But this is an important thing you know, the BUZZ says "use X its fast and awesome!" but unless you are starting fresh compatibility and upgrading is something that you have to take seriously. Once you have a large infrastructure built around X you can not flap around and react to every new architecture or piece of software out there. You want to be getting things done, not rebuilding your stack everyday.

Impala Round 2: Best laid plans

So ok I get it. I cant expect impala to understand our proprietary serde/input format. The good news is that Impala and Hive are not enemies. At best they are frenemies :).

Now the great thing about hive is I can transmute things between different formats, json, and xml, and parquet, and orc, and text. So this query command will just make a plan text table.

hive > create table impala_test1 as select * from weblogs
hive [doing stuff]
hive [doing stuff]
done
impala > invalidate metadata
impala > select * from weblogs"
impala "Sorry I dont like list<string>"


So what happened here is I converted to text files, but Impala does not understand Hive's complex types yet...Damn... Well I see this on the impala roadmap, but if you are keeping score in the meaningless x vs y debate, Impala is only a subset of hive's data formats and is only a subset of it's features. Hives collection types are very powerful and missing them hurts. It would be better if impala just ignored these fields instead of blowing up and refusing to query the entire table.

Ok so lets deal with this.

Impala Round 3: I am willing to accept less than the table I had before

Lets do a query and strip out all the collection columns. Again it would be better if I did not have to have N copies of data in different formats here, but for the purposed of kicking the tires I will just make a duplicate table in a different format without the columns impala does not understand.

hive > create table impala2 as select col1,col2 from weblogs
impala> select * from impala2 limit 4
impala> result1 , result2, result3

Great success! So now I can run queries and I was really impressed with performance of some operations.

Impala round 4: Ok lets get real

We have to do a lot of top-n queries. Top n is a big PITA, sspecially over arbitrary windows because you can not easily pre-compute. We also have a lot of active entries (millions).

We went through a process and moved over some summary tables that looked like this:

entryid, viralcount, seedcount, othercount, otherstuff2count
3434252, 4, 2, 4,7
3412132, 10, 3, 4, 2

In a nutshell this is a PITA query we run:

select entryid, sum(viralcount) as x, sum(seedcount) from [data] where ( join conditions ) group by entryid, order by x

impala > [above query] where [one day time range]
impala > fast, excitingly fast, results

Great lets try the actual query on full data set. (11Gb data)

impala > [above query] where [30 day time range]
impala > out of memory

Impala Round 5: Superstitious tuning.

So when things run out of memory I add memory. The setting was 256MB. I tried 1GB. I queried again. It failed again.

I set the memory to no limit, and it worked.
I added the tables to hdfs cache.
Well that is good it worked and it was fast really fast.

Unfortunately, I don't love what this means for the "yarn" world because if impala is sucking up as much memory as it feels, I am not sure how happy everything else on my hadoop cluster is going to be.Maybe there was some magic setting like 4GB would have worked but I don't love having to constantly review memory thresholds of software so I set to unlimited.

It was also a bit frustrating to me because the table was only 11GB and the intermediate results of that query are actually very small (one row per article).

Who knows how it works and why? Whatevers.

Impala Round 6: Bring out tez

So as you know impala is a cloudera "thing" and tez is a Hortonworks "thing". One of the reasons I upgraded was that Tez is included in hive 13.1 which should be part of CDH 5.3.

Now apparently tez is supposed to deliver some serious hive performance gains. I went looking for a chart of tez vs impala for you. Instead I found a chart about how IBMS BIG SQL beats them both See what I said about VS debates? If all your care about is charts you always get beat by someone.

Anyway...

hive > set hive.exeecution.engine = tez
 class not found tez/whatever

Arge. You gotta be crapping me. Today was a frustrating day. Apparently Tez is missing some jars  in CDH in typical accidentally,potential not accidentally/[I dont care] fashion. Likely fallout from Next hadoop enterprise pissing match.

This really grinds my gears...

Final thoughts

If you thought I was going to sit here and tell you how great X is and how bad Y is and show you a chart validating my opinions. Well I did not do that.

I see a lot of potential for impala, but it is not close to the features of hive. Hive and impala actually work well together. Hive just simply is more plugable and versatile. Hive is not as fast on the read side, but for your level 1 data ingestion and aggregation building Hive is still the bomb. Once you have sliced and diced with the Hive "Hammer" impala makes a very nice "Scalpel". It is really nice that Hive and Impala share the same metastore, and can read and write each others data (parquet) as well.




                    

Friday Jan 02, 2015

Bloom Filters - Building a Column Family store - Part 7

The last blog in this series we built a Commitlog for our quickly evolving NoSql/Column Family store nibiru. I decided this morning that I had been dodging BloomFilters for a while and it was time to get to it.

You may ask what are Bloom Filters and why might a Column Family store need/want them? If you read my post on SSTables and Compaction, you can see that we are implementing a Structured Log Merge system. Namely, we are not doing updates in places, and instead we periodically merging files. If you imagine data in these files over time, you can picture a scenario where there are 30 or more SSTables on disk. Now, imagine that only 1 of those 30 files has the rowkey being searched for. To find that rowkey we would have to search all 30 of the SStables. 

A Bloom Filter (described here in detail) could potentially help us avoid searching tables. This is because a Bloom Filter can give two answers 'Absolutely NO' or 'Maybe'. (Answers I used to get when I was asking someone out a date!)

'Absolutely No' in this case means we do not have to bother searching, because we are sure the rowkey is not in this table.

'Maybe' means we do have to search and maybe the row key is in the table or maybe it is not. (We would have searched it anyway without the Bloom Filter so no big loss)

Because of these properties, Bloom filters make a very nice option for a negative-cache. (For those interested the pull request with the majority of the feature is here  I did make other subtle changes along the way). So let's have at it and implement this bad boy.

Writing and Persisting Bloom Filters

Remember that our only writes are Memtable, Flushing, and SSTables. Thus we can simply hook into the SSTableWriting process and attach in a bloom filter writer.

public class SsTableStreamWriter {

...

private BloomFilterWriter bloomFilter;

public SsTableStreamWriter(String id, ColumnFamily columnFamily){
...
indexWriter = new IndexWriter(id, columnFamily.getKeyspace().getConfiguration());
bloomFilter = new BloomFilterWriter(id, columnFamily.getKeyspace().getConfiguration());
}

public void write(Token t, Map<String,Val> columns) throws IOException {
long startOfRecord = ssOutputStream.getWrittenOffset();
bloomFilter.put(t);

public void close() throws IOException {
indexWriter.close();
ssOutputStream.close();
bloomFilter.writeAndClose();
}

I did not go out and write a Bloom Filter from scratch I decided to use guava. But I did wrap it in facade to make it look like I did something.

package io.teknek.nibiru.engine;

public class BloomFilterWriter {

private final BloomFilter<Token> bloomFilter;
private static final int expectedInsertions = 50000; //Be smarter here
private final String id;
private final Configuration configuration;

public BloomFilterWriter(String id, Configuration configuration){
this.id = id;
this.configuration = configuration;
bloomFilter = BloomFilter.create(TokenFunnel.INSTANCE, expectedInsertions);
}

public enum TokenFunnel implements Funnel<Token> {
INSTANCE;
public void funnel(Token person, PrimitiveSink into) {
into.putUnencodedChars(person.getToken()).putUnencodedChars(person.getRowkey());
}
}

public void put(Token t){
bloomFilter.put(t);//see I did something
}

public static File getFileForId(String id, Configuration configuration){
return new File(
configuration.getSstableDirectory(), id + ".bf");
}

public void writeAndClose() throws IOException {
BufferedOutputStream bo = new BufferedOutputStream(new FileOutputStream(getFileForId(id,configuration)));
bloomFilter.writeTo(bo);
bo.close();
}
}

You do have to size the filters correctly otherwise they are not efficient. A better thing to do here would be estimate on the SSTables being flushed or compacted. I just cheated and put it at 50000 for the time being.

Reading Bloomfilters

When we open up an SSTable we also open the associated Bloom Filter.

public class SsTable implements Comparable<SsTable>{

private KeyCache keyCache;
private ColumnFamily columnFamily;
private SsTableReader ssTableReader;
private BloomFilter bloomFilterReader;

public SsTable(ColumnFamily columnFamily){
this.columnFamily = columnFamily;
}

public void open(String id, Configuration conf) throws IOException {
bloomFilterReader = new BloomFilter();
bloomFilterReader.open(id, conf);

keyCache = new KeyCache(columnFamily.getColumnFamilyMetadata().getKeyCachePerSsTable());
ssTableReader = new SsTableReader(this, keyCache, bloomFilterReader);
ssTableReader.open(id);
}

Now here is the payday! Any operation that does a read we can ask the bloom filter for help, and skip the entire rest of the read path if the bloom filter says no!

  public Val get(Token searchToken, String column) throws IOException {
boolean mightContain = bloomFilter.mightContain(searchToken);
if (!mightContain) {
return null;
}

BufferGroup bgIndex = new BufferGroup();

Performance testing this is simple. First, we can just write a test that searches for a rowkey not in the SStable. This was a run I did without the Bloom Filter code.

  {
long x = System.currentTimeMillis();
for (int i = 0 ; i < 50000 ; i++) {
Assert.assertEquals(null, s.get(ks1.getKeyspaceMetadata().getPartitioner().partition("wontfindthis"), "column2"));
}
System.out.println("non existing key " +(System.currentTimeMillis() - x));
}

result >> non existing key 11694

(I suspect the code is not opting out early when it reads a row key > search, but that is another performance tweak). Next we put the code into action.

result >> index match 891
result >> far from index 186
result >> index match 159
result >> far from index 139
result >>non existing key 26

Wow! Maximum performance!

There you have it. BloomFilters, great for looking for data that is not there :) Sounds stupid but the case happens more often then you might first think!



Wednesday Dec 31, 2014

Commitlog - Building a Column Family store - Part 6

In previous entries we have built Memtables and SSTables. For a quick recap, Memtables allow us to make the write path fast by writing to memory and periodically flushing to on disk SSTables. Fast is great but durability is important as well. Imagine if a memtable contains 1000 writes and delete tombstones and then the server process is killed. These entries are only in memory and would be lost.

The answer for this (in a single node scenario) is a Commit log. The Commitlog is a flat log of all mutation (write/delete) operations.

If the server were to crash we can replay the commitlog and recover those operations. In fact, the only time we need to use the commitlog files are on start up. Once a Memtable is safely flushed to an SStable the Commitlog can be deleted.

Because a Commitlog is only written sequentially it does not de-duplicate like a Memtable. We do not need to memory map or index the Commitlog because we will only ever read them sequentially.

Let's talk about implementing a Commitlog.


Server -> Memtable -> SStable
-> Commitlog

I will stub out the important methods in the Commitlog class.

public class CommitLog {

private final ColumnFamily columnFamily;
private CountingBufferedOutputStream ssOutputStream;

public CommitLog(ColumnFamily cf);

//Open a commit log before calling write
public void open() throws FileNotFoundException;

//write a value to the commit log
public synchronized void write(Token rowkey, String column, String value, long stamp, long ttl) ;

//delete commit log from disk it is no longer needed
public void delete() throws IOException;

//close the log for writing but do not delete it
public void close() throws IOException;
}

After pondering the design I determined I would make the Commitlog a member of the Memtable. The relationship does not need to be a has-a relation like this, but it makes it fairly easy to track the commit logs and later delete them only after we are sure the memtable is flushed.


public DefaultColumnFamily(Keyspace keyspace, ColumnFamilyMetadata cfmd){
super(keyspace,cfmd);
CommitLog commitLog = new CommitLog(this);
try {
commitLog.open();
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
memtable = new AtomicReference<Memtable>(new Memtable(this, commitLog));
memtableFlusher = new MemtableFlusher(this);
memtableFlusher.start();
}

Next, we take any mutation and make sure it is also applied to the commitlog as well as the memtable.

  public void put(String rowkey, String column, String value, long time, long ttl){
try {
memtable.get().getCommitLog().write(keyspace.getKeyspaceMetadata().getPartitioner().partition(rowkey), column, value, time, ttl);
} catch (IOException e) {
throw new RuntimeException (e);
}
memtable.get().put(keyspace.getKeyspaceMetadata().getPartitioner().partition(rowkey), column, value, time, ttl);
considerFlush();
}

Great! Next there are two cases to deal with:

Case 1: The memtable is flushed to an sstable and we can safely delete the associated commitlog.

Because of our decision to attach the commit log to the sstable this was a trivial change to the MemtableFlusher.

      for (Memtable memtable : memtables){
SSTableWriter ssTableWriter = new SSTableWriter();
try {
//TODO: a timeuuid would be better here
String tableId = String.valueOf(System.nanoTime());
ssTableWriter.flushToDisk(tableId, columnFamily.getKeyspace().getConfiguration(), memtable);
SsTable table = new SsTable(columnFamily);
table.open(tableId, columnFamily.getKeyspace().getConfiguration());
columnFamily.getSstable().add(table);
memtables.remove(memtable);
memtable.getCommitLog().delete(); //Delete the commit log because we know the data is now durable
flushes.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

Case 2: On start up replay any commit logs, by reading them and applying the mutations to a memtable.

Start up is a fairly simple process. Open and load up all SStables, replay all commit logs, and done.


public void init() throws IOException {
for(File ssTable: keyspace.getConfiguration().getSstableDirectory().listFiles()){
String [] parts = ssTable.getName().split("\\.");
if (parts.length == 2){
if ("ss".equalsIgnoreCase(parts[1])){
String id = parts[0];
SsTable toOpen = new SsTable(this);
toOpen.open(id, keyspace.getConfiguration());
sstable.add(toOpen);
}
}
}

for(File commitlog: CommitLog.getCommitLogDirectoryForColumnFamily(this).listFiles()){
String [] parts = commitlog.getName().split("\\.");
if (parts.length == 2){
if (CommitLog.EXTENSION.equalsIgnoreCase(parts[1])){
processCommitLog(parts[0]);
}
}
}
}

For the actual format of the commitlog I used the same format as the SsTable. This made it simple to borrow the code used by compaction that reads the SStable linearly.


void processCommitLog(String id) throws IOException {
CommitLogReader r = new CommitLogReader(id, this);
r.open();
Token t;
while ((t = r.getNextToken()) != null){
SortedMap<String,Val> x = r.readColumns();
for (Map.Entry<String,Val> col: x.entrySet()){
memtable.get().put(t, col.getKey(), col.getValue().getValue(), col.getValue().getTime(), col.getValue().getTtl());
}
}
}

Cool so how do we test this beast? Well how about this?

  1. Start up a server
  2. Set the Memtable flush at 2 rowkeys.
  3. Insert 3 records.
  4. Shutdown the server.

This should leave a commitlog with one row unflushed. Next:

  1. Startup the server (which will read in commit logs)
  2. Read the unflushed row and assert it exists

@Test
public void commitLogTests() throws IOException, InterruptedException {
String ks = "data";
String cf = "pets";
Configuration configuration = aBasicConfiguration(testFolder);
Server s = new Server(configuration);
s.init(); //1
s.createKeyspace(ks);
s.createColumnFamily(ks, cf);
s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setFlushNumberOfRowKeys(2); //2
s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setCommitlogFlushBytes(1);
for (int i = 0; i < 3; i++) {
s.put(ks, cf, i+"", "age", "4", 1);//3
Thread.sleep(1);
}
Val x = s.get(ks, cf, "0", "age");
Assert.assertEquals("4", x.getValue());
{
Val y = s.get(ks, cf, "2", "age");
Assert.assertEquals("4", y.getValue());
}
Thread.sleep(1000);
s.shutdown();//4
Thread.sleep(1000);
{
Server j = new Server(configuration);
j.init();//1
Assert.assertNotNull(j.getKeyspaces().get(ks).getColumnFamilies().get(cf));
Val y = j.get(ks, cf, "2", "age");
Assert.assertEquals("4", y.getValue());//2
j.shutdown();
}
}

Some notes about commitlog's and durability: Disk file systems have synchronous and asynchronous operations. Virtualization is particularly hard to enforce durability because the disk systems are abstracted and even operations like flush, or sync could be virtualized. We went with the option of doing a synchronized write to a buffered stream and periodic flushing in this example.  In a multi-node scenario there is added durability as write operations could be sent to multiple nodes. This solution is "probably ok". Durability is always a fun problem and there are other ways to construct this commitlog for sure.

Awesomesause! We now have a commitlog, memtables, sstables, keycache, index files, and compaction. We have a very very functional Column Family store implementation! All that is left is bloom filters and compaction removing tombstones.

Nibiru is getting very fun at this point. It may be turning from example code in blog to the next NoSQL. You may want to fork it and play.

Monday Dec 22, 2014

SSTables and Compaction - Building a column family store - Part 5

In the last blog we introduced a threshold based system that flushed Memtables to SSTables. In a Structured Log Merge storage system having more SSTables will make the read path slower. Also the system needs a way to merge the writes and updates so we do not run out of disk space.

Compaction is a process that merges multiple SSTables together. Compaction removes older columns and can delete entire rows. It also optimizes the data by putting all the columns of the row close together on disk.

Imagine if each of the three columns here was an SSTable.

 
 rowkey column value
 myrow a 5
 rowkey
 column value
 myrow a 2
 myrow b b
 myrow c c
 
 rowkey column value
 row1 a b
 row1 b c
   
 
 

After a compaction the resulting SSTable would look like this:

 rowkey column value
 myrow a 2
 myrow b b
 myrow c c
 row1 a b
 row1 b c

Because the SSTables are sorted by rowkey and secondly column, merging multiple together is straight forward. We iterate each SStable at the same time and merge columns into a new table.

Note: If a row has a large number of columns, merging in memory would likely cause an out-of-memory error. We would need a strategy to detect when the merge size gets too large and then start doing on disk merging. For now we will pretend like this is not an issue.

I assumed I was going to struggle with this algorithm a bit more than I did. Either I am getting good or, more than likely, there are some glaring holes in the implementation I do not see yet.

The approach I took was to create a method that can merge multiple SSTables into a single new one. I re-factored the SSTableWriter to write SSTables incrementally.

Then the algorithm I came up with was:

  1. Read a token from each sstable being compacted
  2. Find the lowest token
  3. Merge the columns of only the lowest token
  4. Advance the readers that only had the lowest token
public static SsTable compact(SsTable [] ssTables, String newName) throws IOException {
ColumnFamily columnFamily = ssTables[0].getColumnFamily();
SsTableStreamReader[] readers = new SsTableStreamReader[ssTables.length];
SsTableStreamWriter newSsTable = new SsTableStreamWriter(newName,
columnFamily.getKeyspace().getConfiguration());

newSsTable.open();
Token[] currentTokens = new Token[ssTables.length];
for (int i = 0; i < ssTables.length; i++) {
readers[i] = ssTables[i].getStreamReader();
}
for (int i = 0; i < currentTokens.length; i++) {
currentTokens[i] = readers[i].getNextToken();//1

}
while (!allNull(currentTokens)){
Token lowestToken = lowestToken(currentTokens);
SortedMap<String,Val> allColumns = new TreeMap<>();
for (int i = 0; i < currentTokens.length; i++) {
if (currentTokens[i] != null && currentTokens[i].equals(lowestToken)) {
SortedMap<String, Val> columns = readers[i].readColumns();
merge(allColumns, columns);//3
}
}
newSsTable.write(lowestToken, allColumns);
advance(lowestToken, readers, currentTokens);//4
}
newSsTable.close();
SsTable s = new SsTable(columnFamily);
s.open(newName, columnFamily.getKeyspace().getConfiguration());
return s;
}

I always try to write functional code. By functional I do not mean super sexy, complicated scala, and function pointers, I mean methods that to not modify and shared state. When you write code like this it makes testing easier. I wrote a quick test that would merge two SSTables:

  @Test
public void test() throws IOException{
...
Keyspace ks1 = MemtableTest.keyspaceWithNaturalPartitioner();
SsTable s = new SsTable(cf);
{
Memtable m = new Memtable(cf);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2", "c", 1, 0L);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column2", "d", 2, 0L);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column3", "e", 2, 0L);
m.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row2"), "column1", "e", 2, 0L);
SSTableWriter w = new SSTableWriter();
w.flushToDisk("1", configuration, m);
s.open("1", configuration);
}
SsTable s2 = new SsTable(cf);
{
Memtable m2 = new Memtable(cf);
m2.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row1"), "column1", "c", 1, 0L);
m2.put(ks1.getKeyspaceMetadata().getPartitioner().partition("row2"), "column1", "f", 3, 0L);
SSTableWriter w2 = new SSTableWriter();
w2.flushToDisk("2", configuration, m2);
s2.open("2", configuration);
}
CompactionManager.compact(new SsTable[] { s, s2 }, "3");
SsTable ss = new SsTable(cf);
ss.open("3", configuration);
Assert.assertEquals("e", ss.get("row1", "column3").getValue());
}

Great! Now we can merge multiple SSTables into a single one, but have to build the state machine right. In the last blog we flushed memtables to disk and reloaded them up before removing the memtable. We need to do something similar here.

What we want to do is detect a trigger for compaction. In this case if we have more then 4 SSTables we want to compact them into a single table. To do this we have a background thread iterate all keyspaces. In each keyspace we iterate all the ColumnFamily instances. In each column family if there are more then 4 SSTables compact them.

@Override
public void run() {
while (true){
for (Entry<String, Keyspace> keyspaces : server.getKeyspaces().entrySet()){
Keyspace keyspace = keyspaces.getValue();
for (Map.Entry<String,ColumnFamily> columnFamilies : keyspace.getColumnFamilies().entrySet()){
Set<SsTable> tables = columnFamilies.getValue().getSstable();
if (tables.size() >= 4){
//...do compaction here

 

Like the memtable flushing, we want to make sure we insert the new SSTable first and later remove the older SSTables in a concurrent way. Again, we can leverage the fact that having the data in multiple files will not change the end result.

          Set<SsTable> tables = columnFamilies.getValue().getSstable();
if (tables.size() >= 4){
SsTable [] ssArray = tables.toArray(new SsTable[] {});
try {
String newName = getNewSsTableName();
SsTable s = compact(ssArray, newName);
tables.add(s);
for (SsTable table : ssArray){
tables.remove(table);
}
numberOfCompactions.incrementAndGet();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

To show the entire process in action I cooked up this end-to-end test. What we are doing here:

  1. Set the memtable to flush at 2 rows
  2. Insert 9 rows (this should cause 4 flushes)
  3. Assert the flushes are as expected
  4. Assert the compaction thread had 1 event
  5. Check that all the rows still exist
  @Test
public void compactionTest() throws IOException, InterruptedException{
...
Configuration configuration = new Configuration();
configuration.setSstableDirectory(tempFolder);
Server s = new Server();
s.setConfiguration(configuration);
s.init();
s.createKeyspace(ks);
s.createColumnFamily(ks, cf);
s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setFlushNumberOfRowKeys(2);//1
for (int i = 0; i < 9; i++) {
s.put(ks, cf, i+"", "age", "4", 1);//2
Thread.sleep(1);
}
Val x = s.get(ks, cf, "8", "age");
Thread.sleep(1000);
Assert.assertEquals(4, s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getMemtableFlusher().getFlushCount());//3
Assert.assertEquals(1, s.getCompactionManager().getNumberOfCompactions());//4
Assert.assertEquals("4", x.getValue());
for (int i = 0; i < 9; i++) {
Val y = s.get(ks, cf, i+"", "age");
Assert.assertEquals("4", y.getValue());//5
}
}

Sweet! Now we have a system with Memtables will flush to SSTables. We also have a background thread optimizing the SSTables on disk by compaction! We are getting really really close to have a fully functional single node ColumnFamily store. We only need two more critical components, Bloom Filters and CommitLogs. (There are some little things we need to do like have compaction remove deleted rows which are easier once we have bloom filters). Awesome sauce!

Sunday Dec 14, 2014

Memtable, Flushing, and SSTables - Building a column family store - Part 4

Up until this point the past blogs memtables, sstables, Key cache have been built and tested as discrete components. To build a Column Family store they need to work in concert. This means we need to talk about the write path.

Our SSTables and associated IndexFiles are write once. This has some nice characteristics. First, the data store does not do any random write. This is great for rotational disks and for SSD disks. We want our write path to look like this:

set-request -> server -> memtable 

Eventually the Memtable will grow, and if it grows unbounded the system will run out of memory.  We want to flush the Memtable to disk as SStable before we have a catastrophic OutOfMemory error, but we also want the Memtable to be as large as possible so we do less small writes. (and require less compaction to merge these tables).

It would just be nice to do something like:
memoryInUse += size_of(column)


Unfortunately that is not an easy task. In the Java world counting how much memory structures use is a hard problem. There are several ways to go about counting used memory but the approach I am going to take is avoiding the hard problem and solving an easier one:

Flush when the number of row keys is > X.

In the end we likely want to build numerous criteria:

  1. Flush if memtable is very old
  2. Flush if memtable > certain size
  3. Flush if memtabe hash had more then X operations

All of these things should be easy to add incrementally.

public class ColumnFamilyMetadata {
private String name;
private long tombstoneGraceMillis;
private int flushNumberOfRowKeys = 10000;

Next shell out a method for flushing

public class ColumnFamily {
void considerFlush(){
// flush memtable to sstable
}
}

Any method that modifies or creates data should consider flushing

  public void delete(String rowkey, String column, long time){
memtable.get().delete(keyspace.getKeyspaceMetadata().getPartitioner().partition(rowkey), column, time);
considerFlush();
}
Flushing a memory table to disk turns out to be a tricky concurrency problem. We do not want to block writes. Even thought writing to disk linearly is fast, it is not as fast as writing to the memtable. We do not want to be forced to acquire a synchronization lock every operation to check if a flush is in progress. After the sstable is flushed we can not clear the memtable until the sstable loaded back up. Otherwise data could temporarily vanish during the period between the time the sstable is written but before it is loaded back into memory.

Here is my solution. Maybe aphyr will, call me maybe, out on it, but for a 0 follower github project this is fine :). The good news is that data here is idempotent, thus having the data in a memtable and an sstable at the same will not produce an incorrect result.

First, we create MemtableFlusher a Runnable per column family to flush memtable instances.

public class MemtableFlusher implements Runnable {
private ConcurrentSkipListSet<Memtable> memtables = new ConcurrentSkipListSet<>();

This piece is tightly coupled with the ColumnFamily instance. Our read operations need to be able to access the current memtable, all the memtables pending flush to disk, and the already flushed sstables.

public class ColumnFamily {

private AtomicReference<Memtable> memtable;
private MemtableFlusher memtableFlusher;
private Set<SSTable> sstable = new ConcurrentSkipListSet<>();


Here is the implementation of the considerFlush() method we talked about above. Namely, if size of memtable is larger than the threshold, atomically move the current memtable onto the flush queue and put a new memtable in it's place.

 void considerFlush(){
Memtable now = memtable.get();
if (columnFamilyMetadata.getFlushNumberOfRowKeys() != 0
&& now.size() > columnFamilyMetadata.getFlushNumberOfRowKeys()){
Memtable aNewTable = new Memtable(this);
boolean success = memtableFlusher.add(now);
if (success){
boolean swap = memtable.compareAndSet(now, aNewTable);
if (!swap){
throw new RuntimeException("race detected");
}
}
}
}

Now that the memtable is placed onto the flush queue. The MemtableFlusher should:

  1. flush memtable to sstable
  2. load the sstable (memory map it)
  3. remove any reference to the memtable
  @Override
public void run() {
while (true){
for (Memtable memtable : memtables){
SSTableWriter ssTableWriter = new SSTableWriter();
try {
//TODO: a timeuuid would be better here
String tableId = String.valueOf(System.nanoTime());
ssTableWriter.flushToDisk(tableId, columnFamily.getKeyspace().getConfiguration(), memtable); //1
SSTable table = new SSTable();
table.open(tableId, columnFamily.getKeyspace().getConfiguration());
columnFamily.getSstable().add(table); //2
memtables.remove(memtable); //3
flushes.incrementAndGet();
} catch (IOException e) {
//TODO: catch this and terminate server?
throw new RuntimeException(e);
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) { }
}
}

I have written a small test. Obviously this will not find concurrency, bugs but at least it certifies that the complements are working together as designed in a simple case. Here we set the flush size to 2 and insert 3 row keys. The third row key inserted should cause a flush.

  @Test
  public void aTest() throws IOException, InterruptedException{
    File tempFolder = testFolder.newFolder("sstable");
    Configuration configuration = new Configuration();
    configuration.setSstableDirectory(tempFolder);
    Server s = new Server();
    s.setConfiguration(configuration);
    s.createKeyspace(ks);
    s.createColumnFamily(ks, cf);
    s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getColumnFamilyMetadata().setFlushNumberOfRowKeys(2);
    s.put(ks, cf, "jack", "name", "bunnyjack", 1);
    s.put(ks, cf, "jack", "age", "6", 1);
    Val x = s.get(ks, cf, "jack", "age");
    Assert.assertEquals("6", x.getValue());
    s.put(ks, cf, "ziggy", "name", "ziggyrabbit", 1);
    s.put(ks, cf, "ziggy", "age", "8", 1);
    s.put(ks, cf, "dotty", "age", "4", 1);
    Thread.sleep(2000);
    Assert.assertEquals(1, s.getKeyspaces().get(ks).getColumnFamilies().get(cf).getMemtableFlusher().getFlushCount());
    x = s.get(ks, cf, "jack", "age");
    Assert.assertEquals("6", x.getValue());
  } 

We are getting closer and closer to being fully functional. There are two big pieces left: Currently we are writing files to disk, but to run continuously we will need to merge these tables together. This process is known as compaction. After that a commit log will be needed to add durability.

This is getting exciting!

Wednesday Dec 10, 2014

Key Cache - Building a column family store - Part 3

In my last entry we discussed how to build a data structure similar to that of Cassandra's Index File. If you remember the Index Interval setting builds a structure that indexes every N keys. In a best case scenario a look up may hit this index, but in a worse case scenario our linear search may have had to page through (Index Interval - 1) rows to locate data.

In one of my unit tests I captured the effect that this has:

    {
      long x = System.currentTimeMillis();
      for (int i = 0 ; i < 50000 ; i++) {
        Assert.assertEquals("c", s.get("00001", "column2").getValue());
      }
      System.out.println("index match " + (System.currentTimeMillis() - x));
    }
    {
      long x = System.currentTimeMillis();
      for (int i = 0 ; i < 50000 ; i++) {
        Assert.assertEquals("c", s.get("08999", "column2").getValue());
      }
      System.out.println("far from index " +(System.currentTimeMillis() - x));
    }
We can see the performance difference between a perfect index hit to a key far from the index:
Test folder: /tmp/junit4018208685678427527
index match 1088
far from index 11166

Note: Our SStable format and decision to use String (and not ByteBuffer) means there are several micro-optimizations that can be done.

If the requests have a normal distribution the difference in access speed may not be noticeable. Still, it seems unfair that some key requests will always be faster then others.

Key Cache to the rescue!

The Key Cache is like the Index Interval in that it helps avoid seeking for keys. It works by storing the offset of frequently used keys in memory. The less random the key access is the more benefit this type of caching delivers. Also like the Index format we can store a large cache in a small amount of storage.

Let's build our own Key Cache and add it to our increasingly more sophisticated Column Family store. There is nothing spectacular being done here. We are wrapping a guava cache that will handle eviction with a small facade.

package io.teknek.nibiru.engine;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

public class KeyCache {
private Cache<String,Long> lc;
public KeyCache(int cacheSize){
lc = CacheBuilder.newBuilder().maximumSize(cacheSize).recordStats().build();
}

public void put(String key, long value){
lc.put(key, value);
}

public long get(String key) {
long res = -1;
Long l = lc.getIfPresent(key);
if (l != null) {
res = l.longValue();
}
return res;
}
}

Note: We would be better off using something like this library that does not have to box and un-box primitives, but again our goal is to show relative performance speed up from the Key Cache.

Now we only need to make a few small changes to our sstable reader.

-    bg.setStartOffset((int)index.findStartOffset(row));
+ long startOffset = keyCache.get(row);
+ if (startOffset == -1){
+ startOffset = index.findStartOffset(row);
+ }
+ bg.setStartOffset((int)startOffset);
String searchToken = row;//this is not correct
do {
if (bg.dst[bg.currentIndex] == END_ROW){
bg.advanceIndex();
}
+ long startOfRow = bg.mbb.position() - bg.getBlockSize() + bg.currentIndex;
readHeader(bg);
StringBuilder token = readToken(bg);
if (token.toString().equals(searchToken)){
StringBuilder rowkey = readRowkey(bg);
if (rowkey.toString().equals(row)){
+ keyCache.put(row, startOfRow);

We can see the difference if we re run our tests:

Test folder: /tmp/junit6910931010171014079
index match 840
far from index 171
index match 219
far from index 137

Notice here that I ran the tests multiple times. This is because the Java virtual machine is much like an old car. You do not drive it fast right away; you let it sit in your driveway and warm up. :) Just kidding, the reality is with Just In Time compilation and multiple garbage collection threads timing code exactly can be difficult.

Even though the numbers fluctuate, we see the Key Cache has given us much better performance by helping us get data without having to seek on disk (memory mapped disk) as much. We went from 11 seconds for 50,000 look ups to around 200ms!

Awesome! What is next? Bloom filters, or Row Cache? Maybe start building a system to compact multiple SSTables together? Stay tuned.


Sunday Dec 07, 2014

Index Files - Building a column store - Part 2

In a previous blog post I demonstrated how to build an SSTable. For reference the sstable data on disk looks like this:

^@00000^A00000^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc
^@00001^A00001^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc
^@00002^A00002^Bcolumn2^C1417966551490^C1^C0^Cc^Dcolumn3^C1417966551490^C1^C0^Cc

Another way to visualize the data:

Token(rowkey)
rowkey
columns
0000100001
name value time
 column2 c1417966551490
 column3 c1417966551490

Side note: In this case the token and rowkey are the same because we are using a natural partitioner that does not hash the key.

An astute reader of the last blog may have noticed that even though are tables are sorted by token and our get (key, column) operation is searching via token, we were using an inefficient linear search. One way to optimize this type of search is to use a binary search. However this is not the primary way Cassandra optimizes the problem.

Enter IndexInterval

The concept of IndexInterval is that every Nth rowkey we record an index of that row key and the byte offset the row key exists at in the file. In this way we build a relatively small fast lookup table of rowkeys. Even if the row key is not found in the index the database should get close to the rowkey and only have to page at most index-interval-1 keys.

Depending on the use case small rows vs larger rows a larger or smaller index size might be right for you.

Writing our own Index

Since cassandra's data files are write-once this make the process straight forward: As we are flushing our memtable to an sstable we also record the byte offset of every Nth row in a separate file.

I was not aware of any output stream that could count it's position and I was not wanting to track it out of band so I whipped up an output stream that counted as it outputted.

package io.teknek.nibiru.engine;
import java.io.BufferedOutputStream;

public class CountingBufferedOutputStream extends BufferedOutputStream {
  private long writtenOffset;
  public CountingBufferedOutputStream(OutputStream out) {
    super(out);
  }
  public synchronized void writeAndCount(int b) throws IOException {
    super.write(b);
    writtenOffset++;
  }
  public void writeAndCount(byte[] b) throws IOException {
    super.write(b);
    writtenOffset += b.length;
  }
  public long getWrittenOffset() {
    return writtenOffset;
  }
}

I could have buried the index creating logic inside the sstable writing logic, but I decided to use OOD and design a separate class to write index files rather than intermingle the logic of index writing with sstable writing.

public class IndexWriter {

  private final String id;
  private final Configuration conf;
  private BufferedOutputStream indexStream;
  private long rowkeyCount;
 
  public IndexWriter(String id, Configuration conf){
    this.id = id;
    this.conf = conf;
  }
 
  public void open() throws FileNotFoundException {
    File indexFile = new File(conf.getSstableDirectory(), id + ".index");
    indexStream = new CountingBufferedOutputStream(new FileOutputStream(indexFile));
  }
 
  public void handleRow(long startOfRecord, String token) throws IOException {
    if (rowkeyCount++ % conf.getIndexInterval() == 0){
      indexStream.write(SSTable.START_RECORD);
      indexStream.write(token.getBytes());
      indexStream.write(SSTable.END_TOKEN);
      indexStream.write(String.valueOf(startOfRecord).getBytes());
      indexStream.write(SSTable.END_ROW);
    }
  }
 
  public void close () throws IOException {
    indexStream.close();
  }
}

Now we make some minor changes to the sstable writer to also write the index as we write the sstable.

 public void flushToDisk(String id, Configuration conf, Memtable m) throws IOException{
    File sstableFile = new File(conf.getSstableDirectory(), id + ".ss");
    CountingBufferedOutputStream ssOutputStream = null;
    IndexWriter indexWriter = new IndexWriter(id, conf);

    try {
      ssOutputStream = new CountingBufferedOutputStream(new FileOutputStream(sstableFile));
      indexWriter.open();
      for (Entry<Token, ConcurrentSkipListMap<String, Val>> i : m.getData().entrySet()){
        long startOfRecord = ssOutputStream.getWrittenOffset();
        ssOutputStream.writeAndCount(START_RECORD);
        ssOutputStream.writeAndCount(i.getKey().getToken().getBytes());
        ssOutputStream.writeAndCount(END_TOKEN);
        ssOutputStream.writeAndCount(i.getKey().getRowkey().getBytes());
        ssOutputStream.writeAndCount(END_ROWKEY);
        indexWriter.handleRow(startOfRecord, i.getKey().getToken());
        boolean writeJoin = false;

When we write a sstable with rows 00000 to 10000 the following following index is produced:

^@00000^A0
^@01000^A69000
^@02000^A138000
^@03000^A207000
^@04000^A276000
^@05000^A345000
^@06000^A414000
^@07000^A483000
^@08000^A552000
^@09000^A621000
 

Sweet! Now let's think about the use case of this index. It needs to provide one function: Given a rowkey return the byte offset for the sstable to begin searching at.

 public long findStartOffset(String token)

Here is most of the implementation:

  public long findStartOffset(String token) throws IOException {
    long offset = 0;
    do {
      if (bgIndex.dst[bgIndex.currentIndex] == SSTable.END_ROW) {
        bgIndex.advanceIndex();
      }
      readHeader(bgIndex);
      StringBuilder readToken = readToken(bgIndex);
      long thisOffset = readIndexSize(bgIndex);
      if(readToken.toString().equals(token)){
        return thisOffset;
      } else if (readToken.toString().compareTo(token) > 0) {
        return offset;
      } else {
        offset = thisOffset;
      }
    } while (bgIndex.currentIndex < bgIndex.dst.length - 1 || bgIndex.mbb.position()  < bgIndex.channel.size());
    return offset;
  }

Our sstable read only need a single change:

  public Val get (String row, String column) throws IOException{
    ...
    BufferGroup bg = new BufferGroup();
    bg.channel = ssChannel;
    bg.mbb = (MappedByteBuffer) ssBuffer.duplicate();
//bg.setStartOffset(0);
    bg.setStartOffset((int)index.findStartOffset(row));

With this change we start reading from a close offset rather than the beginning of the file. If we get lucky 1/1000 the index might help us even seek directly to the row!

What is next? I do not know maybe we can made the index and stable searching do a binary search. Maybe make bloom filters or even a key cache!

Calendar

Feeds

Search

Links

Navigation