Edward Capriolo

Monday Jan 26, 2015

Request Routing - Building a NoSQL Store - Part 9

This blog series is trucking along. What fun! I decided to switch the titles from 'Building a Column Family store' to 'building a NoSQL store' because we are going far beyond demonstrating a Column Family, store since a Column Family is only an on-disk representation of data.

In our last blog we used a gossiper for cluster membership and showed how to replicating meta-data requests across the cluster. Now, we will focus on implementing plugable request routing. Unless you have been living under a rock you are probably aware of a few things like the CAP theorem and Dynamo Style consistent hashing. Stepping back a bit, a goal of Nimbiru is to support plugable everything, and request routing is no exception. For example, a user may wish a system where the node they send the operation to is the node where data is stored, they may wish a system like Cassandra where data is stored on multiple cluster nodes using consistent hashing, or something else entirely.


To make things plugable we start with a router interface.

public interface Router {

/**
* Determine which hosts a message can be sent to. (in the future keyspace should hold a node list)
* @param message the message sent from the user
* @param local the uuid of this server
* @param requestKeyspace the keyspace/database of the request
* @param ClusterMembership an interface for live/dead cluster nodes
* @param token a token (typically a hash) computed from the rowkey of the message
* @return all hosts a given request can be routed to
*/
public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace, ClusterMembership clusterMembership, Token token);

}

Wow! This method has a lot of arguments! Haha. Some of the parameters (clusterMembership and ServerId) probably could have been constructor injected, and token and message are a redundant (because you can determine the other) but for now using a pure interface seemed attractive (since everyone hates on abstract classes these days). Not all routers need each of these things.

I mentioned two cases. Lets start with an easy case store and read data locally only. 

public class LocalRouter implements Router{

@Override
public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
Destination d = new Destination();
d.setDestinationId(local.getU().toString());
return Arrays.asList(d);
}

}

This is fairly easy to reason about. For any request the answer is found here! This is basically a do-it-yourself (outside of nibiru) sharding. It also makes sense for a single node deployment.

Now it gets interesting

The dynamo style request routing is more involved. Even thought it has been explained many times I will offer my own quick description.

At first do not think ring, instead think of hashing a key into a flat fixed sized array.

10 MOD 7
[][][][10][][][]

If you have replication you would also write that next slot. Imagine we had replication 3.

10 MOD 7. Replication 3
[][][][10][10][10][]

What happens if the data hashes to the last element? Think of the array as a ring and the replication will wrap-around.

6 MOD 7. Replication 3
[6][6][][][][][6]

Implementing this is slightly different. We do not use array indexes. Instead we consider that each node has a token that represents it's position in the ring.

The token router needs to be aware of two facts, a Replication Factor (RF) that describes how many copies of data, and a token map that describes the splits.

Algorithm
1. Find the first destination in the sorted map
2. while < replication factor add the next destination (wrap around if need be)

public class TokenRouter implements Router {

public static final String TOKEN_MAP_KEY = "token_map";
public static final String REPLICATION_FACTOR = "replication_factor";

@Override
public List<Destination> routesTo(Message message, ServerId local, Keyspace requestKeyspace,
ClusterMembership clusterMembership, Token token) {
//TODO this is not efficient we should cache this or refactor
Map<String, String> tokenMap1 = (Map<String, String>) requestKeyspace
.getKeyspaceMetadata().getProperties().get(TOKEN_MAP_KEY);
TreeMap<String,String> tokenMap = new TreeMap<String,String>(tokenMap1);
Integer replicationFactor = (Integer) requestKeyspace
.getKeyspaceMetadata().getProperties().get(REPLICATION_FACTOR);
int rf = 1;
if (replicationFactor != null){
rf = replicationFactor;
}
if (rf > tokenMap.size()) {
throw new IllegalArgumentException(
"Replication factor specified was larger than token map size");
}
List<Destination> destinations = new ArrayList<Destination>();

Map.Entry<String,String> ceilingEntry;
ceilingEntry = tokenMap.ceilingEntry(token.getToken());//1
if (ceilingEntry == null){
ceilingEntry = tokenMap.firstEntry();//1

}
destinations.add(new Destination(ceilingEntry.getValue()));
for (int i = 1; i < rf; i++) {
ceilingEntry = tokenMap.higherEntry(ceilingEntry.getKey());//2
if (ceilingEntry == null) {
ceilingEntry = tokenMap.firstEntry();//2
}
destinations.add(new Destination(ceilingEntry.getValue()));
}
return destinations;
}

If that code is making your head buzz the unit test may explain it better. One of the really great parts of Nibiru is that the decision to make everything plugable makes testing and developing easier.

We can mock the cluster membership to provide a 3 node cluster that is always 'alive'.

  private ClusterMembership threeLiveNodes(){
ClusterMembership mock = new ClusterMembership(null, null) {
public void init() {}
public void shutdown() {}
public List<ClusterMember> getLiveMembers() {
return Arrays.asList( new ClusterMember("127.0.0.1", 2000, 1, "id1"),
new ClusterMember("127.0.0.2", 2000, 1, "id2"),
new ClusterMember("127.0.0.3", 2000, 1, "id3"));
}
public List<ClusterMember> getDeadMembers() { return null; }};
return mock;
}
}


Next we give each node a single token.

  public static TreeMap<String,String> threeNodeRing(){
TreeMap<String,String> tokenMap = new TreeMap<>();
tokenMap.put("c", "id1");
tokenMap.put("h", "id2");
tokenMap.put("r", "id3");
return tokenMap;
}

Now we can throw some data at it and see how the router routes it. Notice here that the "z" key "wraps" around to the first node.

  @Test
public void test(){
TokenRouter token = new TokenRouter();
Keyspace keyspace = new Keyspace(new Configuration());
KeyspaceMetaData meta = new KeyspaceMetaData();
Map<String,Object> props = new HashMap<>();
props.put(TokenRouter.TOKEN_MAP_KEY, threeNodeRing());
meta.setProperties(props);
keyspace.setKeyspaceMetadata(meta);
Partitioner p = new NaturalPartitioner();
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("a")).get(0).getDestinationId());
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("aa")).get(0).getDestinationId());
Assert.assertEquals("id2", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("h")).get(0).getDestinationId());
Assert.assertEquals("id3", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("i")).get(0).getDestinationId());
Assert.assertEquals("id1", token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("z")).get(0).getDestinationId());
}

Great success! Now try with a replication factor of 2!

  @Test
public void testWithReplicationFactor(){
TokenRouter token = new TokenRouter();
Keyspace keyspace = new Keyspace(new Configuration());
KeyspaceMetaData meta = new KeyspaceMetaData();
Map<String,Object> props = new HashMap<>();
props.put(TokenRouter.TOKEN_MAP_KEY, threeNodeRing());
props.put(TokenRouter.REPLICATION_FACTOR, 2);
meta.setProperties(props);
keyspace.setKeyspaceMetadata(meta);
Partitioner p = new NaturalPartitioner();
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("a")));
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("aa")));
Assert.assertEquals(Arrays.asList(new Destination("id2"), new Destination("id3")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("h")));
Assert.assertEquals(Arrays.asList(new Destination("id3"), new Destination("id1")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("i")));
Assert.assertEquals(Arrays.asList(new Destination("id1"), new Destination("id2")),
token.routesTo(null, null, keyspace, threeLiveNodes(), p.partition("z")));
}

Correctly the API returns two "natural endpoints" for the given token. Great Success!

Sweet! So I do not want to pat myself on the back much, but I have to take this opportunity to say I love the way Nibiru is shaping up. When I presented the idea of a completely plugable NoSql to a friend he did voice a concern that having 'plugable everything' could cause the system to have 'lots more bugs' because of all the potential use cases. I actually do not see it that way. I think that forcing as many components into interfaces and API's is making the process cleaner by having less use-case specific hacks and optimizations.

This is important if you think about it. Remember that both hbase and cassandra are data stores that were basically 'thrown away' by their creators, powerset and facebook respectively. (Yes they were both thrown away, abandoned and open sourced). IMHO both of them suffer by being build on specific assumptions for specific use cases. ( X is good and Y is bad because we value most X type rhetoric)

The next blog is going to show some already working code about coordinating requests against N nodes and building some of the plugable consistency on the read/write path. Futures, executors, and multi node failures coming, fun!

 


Comments:

Post a Comment:
Comments are closed for this entry.

Calendar

Feeds

Search

Links

Navigation