Edward Capriolo

Monday Jan 19, 2015

Auto Clustering and gossip - Building a Column Family store - Part 8

Up until this point we have been building a single node NoSQL implementation. Though a NoSql can by definition be a single node system often times a NoSql data store is  a multi-node solution. Building a multiple node system has many challenges that a single node system does not. The designers need to consider how the CAP Theorem applies to the API they are attempting to implement. A goal of nibiru is to create a plug-able NoSql system build around APIs. Thus we can think of Cluster Membership as something plug-able.

Plug-able Cluster Membership

I have commonly seen a few systems for cluster membership:

  1. Static: Cluster membership is configuration based and there is no node discovery (e.g. zookeeper static list of peers)
  2. Master: Nodes register themselves with an entity (e.g HBase register servers register with a master)
  3. Peer-to-Peer: Nodes can auto-discover each other typically with one or more known contact points (e.g. Cassandra/Riak)
  4. active-passive: At any given time a single node is the master and typically there is an automatic/manual failover

Other components in the nosql stack will use the cluster information to route requests and place data. Lets shell out a base class for a ClusterMembership:

public abstract class ClusterMembership {

protected Configuration configuration;
protected ServerId serverId;

public ClusterMembership(Configuration configuration, ServerId serverId){
this.configuration = configuration;
this.serverId = serverId;

public abstract void init();
public abstract void shutdown();
public abstract List<ClusterMember> getLiveMembers();
public abstract List<ClusterMember> getDeadMembers();


public class ClusterMember {
private String host;
private int port;
private long heatbeat;
private String id;

Because I am from the Dynamo school of hard knocks, I decided to start off with method #3 and forked off a gossip implementation I found. Each nibiru node will create a node-id (uuid) on start up, get a seed-list from the configuration, and then attempt to join the cluster.

public class GossipClusterMembership extends ClusterMembership{

public static final String HOSTS = "gossip_hosts_list";
public static final String PORT = "gossip_port";

private GossipService gossipService;

public GossipClusterMembership(Configuration configuration, ServerId serverId) {
super(configuration, serverId);

public void init() {
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
GossipSettings settings = new GossipSettings();
List<String> hosts = null;
if (configuration.getClusterMembershipProperties() != null){
hosts = (List<String>) configuration.getClusterMembershipProperties().get(HOSTS);
} else {
hosts = new ArrayList<String>();
int port = 2000;
for (String host: hosts){
GossipMember g = new RemoteGossipMember(host, port, "");
try {
gossipService = new GossipService(configuration.getTransportHost(), port, serverId.getU().toString(), LogLevel.DEBUG, startupMembers, settings);
} catch (UnknownHostException | InterruptedException e) {
throw new RuntimeException(e);

Now, we should be able to make a test and prove that N instances of Nibiru have locate each other via gossip. We use as the seed-list and the other two nodes and should discover each other.

public void letNodesDiscoverEachOther() throws InterruptedException, ClientException{
Server [] s = new Server[3];
Configuration conf = TestUtil.aBasicConfiguration(node1Folder);
Map<String,Object> clusterProperties = new HashMap<>();
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList(""));
s[0] = new Server(conf);
Configuration conf = TestUtil.aBasicConfiguration(node2Folder);
Map<String,Object> clusterProperties = new HashMap<>();
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList(""));
s[1] = new Server(conf);
Configuration conf = TestUtil.aBasicConfiguration(node3Folder);
Map<String,Object> clusterProperties = new HashMap<>();
clusterProperties.put(GossipClusterMembership.HOSTS, Arrays.asList(""));
s[2] = new Server(conf);
for (Server server : s){
Assert.assertEquals(2 , s[2].getClusterMembership().getLiveMembers().size());
Assert.assertEquals("", s[2].getClusterMembership().getLiveMembers().get(0).getHost());

Winning! Now nodes can discover each other! The next question is what message types should be sent over gossip and which should be sent via some other method. I decided to keep the gossip code for only cluster membership..

The first thing I decided to tackle was meta-data operations. This makes sense because without tables or column families, we can nor write or read from tables or column families. Duh! The approach I took was:

if a node gets a meta-data request then re-broadcasts it out to all live hosts.

This is not a perfect solution because we need something to re-broadcast messages to down hosts (and ordering could be an issue down the line) but for now it gets the point across. We can potentially use something like hinted-handoff to deal with this. The piece that is handling message routing is known as the coordinator.

  public Response handle(Message message) {
if (SYSTEM_KEYSPACE.equals(message.getKeyspace())) {
return metaDataCoordinator.handleSystemMessage(message);
//other stuff

What we are doing here is: if the message does not have "reroute" in its payload then broadcast it to all. Otherwise we handle it locally. (By broadcast I mean send N unicast messages not a tcp broadcast or multicast)

  public Response handleSystemMessage(final Message message){
if (MetaPersonality.CREATE_OR_UPDATE_KEYSPACE.equals(message.getPayload().get("type"))){
(Map<String,Object>) message.getPayload().get("properties"));
if (!message.getPayload().containsKey("reroute")){
message.getPayload().put("reroute", "");
List<Callable<Void>> calls = new ArrayList<>();
for (ClusterMember clusterMember : clusterMembership.getLiveMembers()){
final MetaDataClient c = clientForClusterMember(clusterMember);
Callable<Void> call = new Callable<Void>(){
public Void call() throws Exception {
(Map<String,Object>) message.getPayload().get("properties")

return null;
try {
List<Future<Void>> res = metaExecutor.invokeAll(calls, 10, TimeUnit.SECONDS);
//todo return results to client
} catch (InterruptedException e) {

return new Response();
} else ...

Obviously this implementation needs better error handling and potentially longer timeouts. We test this by taking our last test and sending out a meta-data request. Within a few milliseconds the other nodes should have received and processed that message. In other words create a keyspace and column family then check each not to ensure this is complete on all hosts.

    MetaDataClient c = new MetaDataClient("", s[1].getConfiguration().getTransportPort());
c.createOrUpdateKeyspace("abc", new HashMap<String,Object>());

for (Server server : s){
Map<String,Object> stuff = new HashMap<String,Object>();
stuff.put(ColumnFamilyMetaData.IMPLEMENTING_CLASS, DefaultColumnFamily.class.getName());
c.createOrUpdateColumnFamily("abc", "def", stuff);
for (Server server : s){
for (Server server : s){

Great. Now that we have cluster membership we can start tackling the really - really - really fun stuff like request routing and quorum reads etc! Stay tuned!


Post a Comment:
Comments are closed for this entry.