Poking around with an idea: Ranged Metadata
With compact storage one can create a column family. The meta data was fairly straight forward:
[default@demo] CREATE COLUMN FAMILY users WITH comparator = UTF8Type AND key_validation_class=UTF8Type AND column_metadata = [ {column_name: full_name, validation_class: UTF8Type} {column_name: email, validation_class: UTF8Type} {column_name: state, validation_class: UTF8Type} {column_name: gender, validation_class: UTF8Type} {column_name: birth_year, validation_class: LongType} ];
First, you had a comparator that controlled the sorting of columns. Column sort by the 'name' field. The You have a default_validator which is the type for the value. For example, for example if your comparator was LongType and your default validator was UTF8Type, it meant your columns were Longs and your values were strings.
set[rowkey][5]='stuff';
Additionally you can also supply specific validators like in the above example.
{column_name: full_name, validation_class: UTF8Type}
{column_name: fav_number, validation_class: LongType}
These meant "If the column was named full_name its type is UTF8Type, and If the column was named fav_number it's type is LongType". This is easily the best part about cassandra that I do not have to know my column names ahead of time.
CQL3's sparse storage takes a different approach. It has no default validator. Every column must be named. Thus every column must be typed.
CREATE TABLE songs (
id uuid PRIMARY KEY,
title text,
album text,
artist text,
data blob
);
With CQL3's sparse storage you can do something like unknown columns like this.
CREATE TABLE wide ( id text, column text, x int , data blob , primary key(id,column) );
cqlsh:atest> CREATE TABLE wide ( id text, column text, x int , data blob , primary key(id,column) );
cqlsh:atest> insert into wide (id,column,data) values ('4','5', 'eded');
cqlsh:atest> insert into wide (id,column,data) values ('4','6', 'edea');
cqlsh:atest> insert into wide (id,column,x) values ('4','6', 2);
cqlsh:atest> select * from wide;
id | column | data | x
----+--------+--------+------
4 | 5 | 0xeded | null
4 | 6 | 0xedea | 2
We can view the from the CLI to see how this lays out onto disk.
[default@atest] list wide;
RowKey: 4
=> (column=5:, value=, timestamp=1367552438840000)
=> (column=5:data, value=eded, timestamp=1367552438840000)
=> (column=6:, value=, timestamp=1367552725421000)
=> (column=6:data, value=edea, timestamp=1367552448811000)
=> (column=6:x, value=00000002, timestamp=1367552725421000)
Both approaches have some shortcomings. Here is a scenario that causes them both headaches.
All columns from a-b (a1,a2, aaaaaaaaaa...) are integers
All columns from c-g (c1,f190,...) are varchar
We do this quite often. Where a single row key supports static columns, and multiple sets of (possibly wide) dynamic columns. IE password is utf8, age is integer, columns named friends[0] to friends~ are a set of your friends, columns named likes to likes~ are a set of your likes. This is an alternative to creating separate column families for each of these relations.
I am experimenting with a feature called Ranged Assume. The concept is it provides support for the scenario described above.
This can be used in conjunction with standard validators (columns are utf8, columns named x have values of long).add( Operations.assumeRangedOp("myks", "columnasscf", "a", "b", "Int32Type") ) add( Operations.assumeRangedOp("myks", "columnasscf", "c", "g", "UTF8Type") )
Posted at 12:11AM May 03, 2013 by edwardcapriolo in General | Comments[0]
The Big Data World Tour continues
I started doing tech takes around 5 years ago. I realized this month that I actually have 3 speaking engagements:
Monday, April 29, 2013 Lecture at NYU (private event)
Monday, May 6, 2013 Cassandra Summit 2013 NYC Edition
Tuesday, May 14, 2013 Cassandra - High Performance big data solution
I have this running gag in my head; Instead of viewing these all as the disparate talks they are, I view them as a concert tour, the Big Data World Tour. I was thinking of printing up t-shirts, possibly getting a tour bus, going everywhere with an entourage.
Back to reality, On a side note, if anyone wants take the trek to NJ with me that would be cool. Ill drive.
Posted at 01:02PM Apr 28, 2013 by edwardcapriolo in General | Comments[0]
Cassandra 1,000,000 inserts/sec on raspberry pi
You may have hear that netflix achieved 1,000,000 inserts per second with cassandra. Datastax was nice enough to give me a raspberry pi loaded with cassandra. I decided the best possible thing to do with the device would be to run a benchmark. Because the device offers unique capabilities I decided to craft a special benchmark, the "edwardcapriolo.com roflscale, cloud serving, pi linksys, nosql, big data benchmark". I recorded the benchmark for posterity. In the first part, I explain the hardware required to run the test:
After the stage is set, we run the test.
Word of this test has spread quickly. There has been unconfirmed reports that enterprise vendors are scrambling in an attempts to defeat my benchmark with a beagleboard running Oracle NoSQL database connected to a Dell I SCSI San with SSD. I will post more details when they come available.
Posted at 12:48AM Apr 10, 2013 by edwardcapriolo in General | Comments[3]
RangeTombstones are coming (even to thrift)
This weekend I was poking through the cassandra source code. I know what your thinking...Isn't that what you do during the week? :) Well anyway I found this.
[edward@jackintosh cassandra-ed]$ find . -name "*RangeTombstone*"
./src/java/org/apache/cassandra/db/RangeTombstone.java
./test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
Wow wait! What? So I started digging around, besides the unit test I saw some other issues around this. https://issues.apache.org/jira/browse/CASSANDRA-4409 . In case you do not know what a tombstone is, a tombstone is a delete. A range tombstone is like a slice delete. For example say your data columns are named a,b,c,d,e. You can delete a range of columns like b-d, this is one tombstone entry in the database, and on read the user is only retuned columns a,e.
As it turns out thrift's Deletion has accepted a predicate for a while now.
struct Deletion {
1: optional i64 timestamp,
2: optional binary super_column,
3: optional SlicePredicate predicate,
}
Since the unit test was there I decided to give it a go.
InvalidRequestException(why:Deletion does not yet support SliceRange predicates.)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:20833)
Aw...sad...
I decided to get off my arse and do something about it. Being that I have been hacking on intravert lately I am bit more skilled navigating cassandra's internals (mostly StorageProxy). I did surprise myself by getting into the code and getting it done in a couple of hours (done meaning done in unit test, not done like ready for commit)
https://issues.apache.org/jira/browse/CASSANDRA-5435
Anyway until football comes back Sunday with cassandra and eclipse is good for me!
One thing to look out for, RangeTombstones get heavy on the read side if you have too many of them. Use them sparingly!
Posted at 09:41PM Apr 07, 2013 by edwardcapriolo in General | Comments[0]
I likely will not switch off legacy tables
http://www.datastax.com/docs/1.2/ddl/legacy_table#cql3-and-legacy-table
----------------------------snip-------------------------------------
Creating legacy tables
You can create Thrift/CLI-compatible tables in CQL 3 using the COMPACT STORAGE directive. The compact storage directive used with the CREATE TABLE command provides backward compatibility with older Cassandra applications; new applications should generally avoid it.
Compact storage stores an entire row in a single column on disk instead of storing each non-primary key column in a column that corresponds to one column on disk. Using compact storage prevents you from adding new columns that are not part of the PRIMARY KEY.
----------------------------snip-------------------------------------
I stumbled on this today and find it interesting. First, my beloved sstables have been dubbed "legacy" tables. Second, I do not agree with or understand some of the rational.
"Compact storage stores an entire row in a single column on disk...." This seems not true. I can not even digest the rest of the sentence.
"Using compact storage prevents you
from adding new columns that are not part of the PRIMARY KEY." This goes back to my last blog about what is a "row", or a "primary key" or a "row key" and now disambiguating all these terms. All I know is that you have always been able to add columns to rows with Cassandra, because that is what the ColumnFamily data model is.
Anyway, I reject the statement that you should "generally" avoid legacy tables. There are cases where "legacy" tables are better. When are they better? You might ask...
http://thelastpickle.com/2013/01/11/primary-keys-in-cql/
---------------snip----------------------
[default@dev] list events;
Using default limit of 100
Using default column limit of 100
-------------------
RowKey: 2:201302
=> (column=2013-02-20 10\:58\:40+1300:, value=, timestamp=1357869160739000)
=> (column=2013-02-20 10\:58\:40+1300:is_dam_dirty_apes, value=01, timestamp=1357869160739000)
=> (column=2013-02-20 10\:58\:40+1300:pressure, value=000011d0, timestamp=1357869160739000)
=> (column=2013-02-20 10\:58\:40+1300:temperature, value=00000015, timestamp=1357869160739000)
-------------------
RowKey: 3:201302
=> (column=2013-02-20 10\:58\:45+1300:, value=, timestamp=1357869161380000)
=> (column=2013-02-20 10\:58\:45+1300:is_dam_dirty_apes, value=01, timestamp=1357869161380000)
=> (column=2013-02-20 10\:58\:45+1300:pressure, value=00001ed2, timestamp=1357869161380000)
=> (column=2013-02-20 10\:58\:45+1300:temperature, value=0000001f, timestamp=1357869161380000)--------------snip-------------------------
You notice CQL3 puts this extra column everywhere, it was done so CQL will not show you tombstones (rows that have all the columns deleted). This is nice, although tombstones never actually bothered me. I am not sure I want to give up the disk/ram/memtable space for this.
In a worse case scenario imagine a row is one column. To write one column, I have to write two columns. Also remember that sstables are write once, if one upserts over the same column at a later time, now what would have been two rows, is now four rows until Cassandra gets around to compacting it. Doesn't sound that bad, what if your row is spread across 10 sstables?
As always, look at your data and understand how the data will layout on disk. Do some benchmarking. One extra column per row or a slightly less efficient method for storing data can make a huge performance difference when working with millions or billions of rows, or maybe it doesn't, but I am not on the train with calling compact storage "legacy" and the suggestion of generally avoiding it. I feel the decision is much like choosing MyISAM over InnoDB.
As the Oracle says: "You can't see past a choice you do not understand."
Posted at 12:13AM Apr 04, 2013 by edwardcapriolo in General | Comments[0]
When a row isn't a row
If you are reading my blog and you have not checked out Aaron Morton's site http://thelastpickle.com/ , you should go do that first.
I am stealing this little clip from his blog
--------------------steal-----------------------
PRIMARY KEY's in CQL
The final version of CQL 3 that ships with Cassandra v1.2 adds some new features to the PRIMARY KEY clause. It overloads the concept in ways that differ from the standard SQL definition, and in some places shares ideas with Hive. But from a Cassandra point of view it allows for the same flexibility as the Thrift API.
--------------------end steal-----------------------
As some of you may or may not know I have a long standing beef with CQL's 'primary key' keyword. I know what you thinking, "Ed for pete's sake not another CQL rant". Well relax it is not a rant, just a fact and a distinction that you will pick up on eventually.
Say you create a table...
cqlsh> create keyspace atest with replication = {'class':'SimpleStrategy', 'replication_factor':1};
cqlsh> use atest;
cqlsh:atest> create table try ( d_id int, pressure int, year int, primary key( (d_id,year) ) );
Now, in a standard relational database I typically call a compound index a phone book index. I say that because if you index on last_name, first_name, it makes searches fast. However if your WHERE clauses is only the first name it does not help, after all phone books only make it fast to search by last_name, then if you know the last name it makes first name searching fast. Technically speaking, I guess this is not always true, the fields in a primary key do not necessarily imply anything about table ordering.
So let's insert some data into this table.
cqlsh:atest> insert into try (d_id, year, pressure) values ( 9, 1981, 99);
cqlsh:atest> insert into try (d_id, year, pressure) values ( 9, 1982, 100);
And lets query it.
cqlsh:atest> select * from try;
d_id | year | pressure
------+------+----------
9 | 1982 | 100
9 | 1981 | 99
::wrong assumption coming:: Great apparently data is sorted by d_id then year. Just like a primary key in mysql ::wrong assumption over::
cqlsh:atest> insert into try (d_id, year, pressure) values ( 9, 1983, 1005);
cqlsh:atest> select * from try;
d_id | year | pressure
------+------+----------
9 | 1982 | 100
9 | 1983 | 1005
9 | 1981 | 99
Wait... How can this be? Well if you read Aaron's blog like I told you to, you would know why...
----------------steal--------------------
The two partitions each for devices 2 and 3 have been placed on different nodes. The partitions for device 1 are on the same node, but with enough nodes they would probably be on different ones.
--------------end steal--------------
What else does this mean... It means that none of this is actually possible.
cqlsh:atest> select * from try where d_id = 9 and year >1981;
Bad
Request: Only EQ and IN relation are supported on the partition key for
random partitioners (unless you use the token() function)
Perhaps you meant to use CQL 2? Try using the -2 option when starting cqlsh.
cqlsh:atest> select * from try where d_id = 9 ;
Bad Request: Partition key part year must be restricted since preceding part is
Perhaps you meant to use CQL 2? Try using the -2 option when starting cqlsh.
Now why does this bug me? Well there is this long standing saying in Cassandra,
"All the columns in a row live on the same node."
Now in terms of a "CQL row" that is not true. What is more true is to say...
"All columns of a row that have equal primary keys live on the same node."
Also, another saying kinda turned on it's head:
"All columns of a row are ordered"
To wrap it up, and restate the obvious, if you use compound primary keys all your data will not be on the same host, and you will not be able to page it, with only knowing the first part of the primary key.
If you want "phone book" ordering try,
create table try ( d_id int, pressure int, year int, primary key( d_id, year ) );
Then you do end up with one row, that is sorted by d_id, then sorted by year.
Maybe I am being a little unfair to CQL's "primary key" keyword, I mean
primary key( d_id, year )
is different then
primary key( (d_id, year ) )
But then again what it is really saying is (d_id, year ) is a composite column....
So then I really should be able to do this....
cqlsh:atest> create table try2 ( d_id int, pressure (int,string,long), year int, primary key( (d_id,year) );
Bad Request: line 1:51 missing EOF at ','
Since in reality C* allows composites everywhere row key, value, and column name. (Technically there are all types of composites hidden behind the scenes in CQL). So really in CQL a primary key could actually be called, A "row key column specifier" accept for this case...
CREATE TABLE device ( device_id int, PRIMARY KEY (device_id) );
Where it is only actually the row key. Again I just don't like it being called 'primary key' because the name implies it works like something else, also why I do not like calling column families 'tables' as they are not actually tabular...
Posted at 04:40PM Mar 31, 2013 by edwardcapriolo in General | Comments[0]
IntraVert: Server Side operations in Cassandra and more
I have been away from the blog game for a bit. There are two reasons for this. The first, I am hard at work on the second edition of the High Performance Cassandra Cookbook. The second is I have embarked on a skunkworks project which I think is going to bring a few game changers to Apache Cassandra.
Nate ( https://github.com/zznate ), who I internally think of as the Hector godfather, and a few others were brainstorming on features we would want for Cassandra. I learned a great lesson about improv from Tine Fey. I apply these rules to brainstorming and even technical conversations now. Agree, use 'and' to take the idea to the next level, and there are no mistakes.
Nate had the idea pretty much from that get-go that he wanted to use vertx. The older pre-Tina Fey inspired me might have tossed this idea thinking async-io is just part of some trendy new NodeJS craze, but the new me decided to role with it. Nate called the project IntraVert and we all just started cracking on cool things we wanted Cassandra to do.
(An older line of thinking that I associated with Cassandra was, "That should be done on the client side". This was an argument against co-processor or triggers, that it all could and should be done client side. This argument is valid, but now you may notice that CQL sets time stamps automatically on the server side, or that the Cassandra server can automatically satisfy a query using a secondary index, or that counters read before write, you can conclude this argument is not an absolute.)
My first long standing gripe that I wanted to tackle was what I consider the keyspace fallacy. In the thrift 0.6.X days keyspace was an argument in each thrift operation.
For example the signature of get used to look something like this:
get(String keyspace, String columnFamily, byte [] rowkey, byte [] column)
But by Cassandra 0.7.X the signature changed to something like:
set_keyspace(String s)
get(String columnFamily, byte [] rowkey, byte [] column)
Setting the keyspace moved to a thread local. I will be the first to admit that the old signature was cumbersome. However I never liked the change, the StorageProxy (the underlying server side cassandra object) can do cross keyspace mutations, but thrift and CQL can not! (CQL has limited support for operations that cross keyspaces). This resulted in a non-optimal situation for me, many of our column families had different replication factors and were stored in different keyspaces, and now we needed either a keyspace aware connection pool or had to have two connection pools. But the thing that bothered me most was wasted RPC operations.
Calling set_keyspace required a network round trip, and there are many cases where you want to batch all types of operations together in one RPC call. This was the biggest thing I set out to conquer. I wanted a format where I could batch together anything into one RPC call, not just writes to a single keyspace, writes and reads, meta operations, custom operations, etc. Today, as I was hacking on IntraVert, after I wrote a unit test , I took a look:
List<Map> batch = new ArrayList<Map>();
Map row = new HashMap();
row.put("keyspace","ks1");
row.put("columnfamily","cf1");
row.put("rowkey","mykey");
...
IntraReq req = new IntraReq();
req.add( Operations.setAutotimestampOp() )
.add( Operations.createKsOp("ks1", 1) )
.add( Operations.setKeyspaceOp("ks1") )
.add( Operations.createCfOp("cf1") )
.add( Operations.createKsOp("ks2", 1) )
.add( Operations.setKeyspaceOp("ks2") )
.add( Operations.createCfOp("cf2") )
.add( Operations.batchSetOp(batch).set("timeout", 10000) )
.add( Operations.assumeOp("ks1", "cf1", "value", "UTF-8") )
.add( Operations.assumeOp("ks2", "cf2", "value", "UTF-8") )
.add( Operations.getOp("mykey", "mycol")
.set("keyspace", "ks1")
.set("columnfamily", "cf1"));
I really love how this project is shaping up. First, API has builder pattern, awesome. Second, batching multiple operations into one request, nice. Third, setting timeouts per operation, check. Forth, types are easy, no byte buffer madness. Fifth, that above exchange might have been 10 network round trips/ request responses, now it is one.
Passing mention: IntraVert uses JSON over HTTP so end users do not have to worry about thrift or other native libraries. Users can read and write data to cassandra with basic HTTP libraries, or even work with Cassandra from JavaScript.
Even though I mentioned them first, the client API and the transport are not the most innovative parts of IntraVert. The ability to do processing server side is. IntraVert does this by allowing JVM languages like groovy (support for scala/clojure is coming) to upload functions to be compiled on the server. These functions can then be leveraged as part of other operations.
For example, suppose a column has JSON data in it. The user wishes to run a JSONPath query on the server side and only return the results to the client. Users can dynamically create a class that applies the transformation and apply this to a Cassandra slice.
req.add( Operations.sliceOp("jsonkey", "a", "z", 100));//6
req.add( Operations.createProcessorOp("JsonPathEx", "groovy",
"import com.jayway.jsonpath.*; \n" +
"public class JsonPathEx implements org.usergrid.vx.experimental.Processor { \n"+
" public List<Map> process(List<Map> input){" +
" List<Map> results = new ArrayList<HashMap>();"+
" for (Map row: input){" +
" Map newRow = new HashMap(); "+
// grovvy requires you to escape $
" Integer match = JsonPath.read(row.get(\"value\").toString(), \"\\$.[1].value\"); \n"+
" newRow.put(\"value\",match.toString()); \n "+
" results.add(newRow); \n"+
" } \n" +
" return results;"+
" }"+
"}\n"
));//7
req.add( Operations.processOp("JsonPathEx", Collections.EMPTY_MAP, 6));//8
There are multiple types of server side processors in IntraVert. They can be used on both the insert side and the read side to transform data. This allows the user to do things not possible before. For example if the user wishes to do a server side union of two slice results or two CQL queries, this can be done (in a single client server exchange like the other examples). Processors can also be used to filter results like a where clause, or possibly return only counter columns that have a value greater then X. Processors also have the ability to implement procedural logic on the server side. (Think PL-SQL meets Cassandra)
IntraVert is still in the brainstorming phase, (but we want to have an official 1.0 release soon). You can check it out on github https://github.com/zznate/intravert-ug and have fun hacking at it. We have a good deal of documentation to get you started. Coming soon hbase style scanners!
Posted at 03:38PM Feb 08, 2013 by edwardcapriolo in General | Comments[3]
Do want some scrubs
[17:53:01] <rcoli> mkjellman: any luck on the some-of-my-sstables-won't-scrub issue?
[17:53:24] <ecapriolo> rcoli: is that a new TLC song
[17:53:26] <mkjellman> rcoli: sigh, don't get me started on that one..
[17:53:50] <mkjellman> god i would LOVE a scrub right now
[17:53:54] <rcoli> ecapriolo: his sstables don't want to scrub.. a scrub is a process that can't access these un-live tables
[17:53:58] <ecapriolo> Dont want no tables that wont scrub,
[17:54:36] <mkjellman> hanging on the back side of my... okay i can't rhyme data directories in there very easily
[17:54:45] <rcoli> LOL
[17:54:49] <ecapriolo> hangin in my active side of my ssd drive taking page cache from me
[17:54:57] <mkjellman> HAHAHAHAHHAHAHAHA
[17:55:00] <rcoli> also known as a cluster!
Posted at 06:00PM Jan 08, 2013 by edwardcapriolo in General | Comments[3]
Cassandra and Yahoo Cloud Serving Benchmark Remix
When we get new servers in I also make a mental note to myself to work twice as hard during the week. You may ask why? The answer is if I can get the machines prepped before they are needed in production it gives me the chance to run Cassandra and the Yahoo Cloud Serving Benchmark on them.
What do you get when you cross, Apache Cassandra, solid state disks, a blade center loaded with 14 blades?.....
In this previous post we ran a benchmark against 8 server class machines with scsi disks. This time we have 14 blades, 32 GB RAM with SSDs! Ow it's on. It's on.
Last time, I did a lot of manual editing of the ycsb load testing scripts. This time I introduced some variables into the scripts that can be set from the command line. Which make it much easier to run the scripts on N machines without having to tweak them individually.
Some early results:
First, lets get this out of the way. This is BigData, not tiny little baby data. We are not inserting 1,000,000 rows. Because if the data fits in one machines main memory, its not BigData!
We are inserting 75,000,000 entries, which turns out to be 322 GB data (~23 GB node with replication factor 3) Now, our machines do have a good amount of RAM (32Gb), and the JVM is sized to 8GB for this test. (I need to make another test with about double the data so I can stress out the SSD drives more.
[OVERALL], RunTime(ms), 656811.0
[OVERALL], Throughput(ops/sec), 16312.58459435058
I actually need to revisit this test. I foo bared it a bit and launched 2 or 3 extra instances. In any case, I saw peaks of nearly 20,000 ops/ sec. I was impressed by this anyway, it took netflix 300 machines from amazons cloud to hit 1,000,000 inserts a second. My 14 node cluster hit was running 7 YCSB instances at once. I achieved 140,000 inserts per second without really trying to hard. I could say more about cloud vs bare metal but I would be digressing.
Bring able to write data fast is great, but reading it is more interesting. After all, I surely can write data fast using DD that is not saying much. The results are here , one node did:
[OVERALL], Throughput(ops/sec), 14835.694681403456
Since we have 7 instances of ycsb the total reads/sec was 98,000 reads/sec or to throw a big number at you 8,467,200,000 reads a day.
I'm not even done playing with the number of threads, and other tuning.
If you have a clever idea for a benchmark email me before I need to get these servers into production!
Updated:
Just bumped the threads to 60 and got
nohup ssh -l edward cdbeq101 "cd YCSB && sh step2.sh 75000000 7 0 60" > foo.out 2> foo.err < /dev/null &
[OVERALL], RunTime(ms), 35712.0
[OVERALL], Throughput(ops/sec), 28001.79211469534
Sweet!
Went to 100 threads and got!
0 sec: 0 operations;
10 sec: 233789 operations; 23320.6 current ops/sec; [READ AverageLatency(ms)=5.07]
20 sec: 537465 operations; 30141.54 current ops/sec; [READ AverageLatency(ms)=1.6]
30 sec: 864622 operations; 32715.7 current ops/sec; [READ AverageLatency(ms)=1.36]
37 sec: 1000000 operations; 17411.96 current ops/sec;
[OVERALL], RunTime(ms), 37877.0
[OVERALL], Throughput(ops/sec), 26401.24613881775
Crazy peaked at 32K!
Posted at 01:20AM Nov 22, 2012 by edwardcapriolo in General | Comments[5]
Zookeeper pseudo scalability and absolute correctness
A few weeks ago me and a few friends were bouncing back and forth some tweets about how awesome we think Cassandra's multi-datacenter replication is. I am not going to link to the tweet but more or less we got trolled by a statement to the effect of, "It is great when correctness does not matter". This statement drums up an old tired misinformation campaign against eventual consistency that has been debunked time and time again.
I was going to let this entire thing go, (which as most of you know I am bad at). However a situation came up in which I believe highlights how "correctness" is not as easy to achieve as many people seem to pretend it is.
We are using apache kafka a distributed messaging system. The way kafka works all writes are directly written to disk. Clients are organized into consumer groups. Consumers write entries to zookeeper every N seconds to record the offset. This way when clients die or topics are rebalanced the same data is not consumed twice by the same group.
We started ramping up kafka our kafka usage and we noticed something a very high amount of IOWait on our zookeeper servers.
$top
top - 13:40:27 up 157 days, 21:21, 1 user, load average: 0.22, 0.43, 0.64
Tasks: 176 total, 1 running, 175 sleeping, 0 stopped, 0 zombie
Cpu(s): 0.4%us, 0.2%sy, 0.0%ni, 94.7%id, 4.6%wa, 0.0%hi, 0.1%si, 0.0%st
However, if you look at the overall disk traffic the number is very low.
Device: rrqm/s wrqm/s r/s w/s rsec/s wsec/s avgrq-sz avgqu-sz await svctm %util
sda 0.00 20.40 0.00 109.40 0.00 1038.40 9.49 0.43 3.97 3.90 42.62
So now we are in a bit of a pickle. Unlike Cassandra, Zookeeper can NOT have nodes added dynamically. Nodes are defined statically in their configuration file
server.1=zeq81.eq.pl.pvt:2888:3888
server.2=zeq82.eq.pl.pvt:2888:3888
server.3=zeq83.eq.pl.pvt:2888:3888
Even if we could add nodes, each write goes to every server. Adding more nodes like mysql master/slave, only scales reads, it does not scale writes.
Since adding nodes will not help, we could upgrade our hardware. Lets investigate this. Adding RAM does not help, because the actual zookeeper database is less then 100 MB so this is not a caching issue. We could go out and purchase memory and get a RAID card with battery backed cache. This will help us to a point. We could purchase servers with large RAID systems.
All those options above kinda suck. Scalable to me does not mean forklift upgrades every time something gets slow. I decided to dig deeper because frankly I was very surprised that periodically writing offsets to Kafka from a few hundred clients would cause this much io/wait.
When I dug in a bit (with the help of some fomr kafka IRC) I found this:
Unsafe Options
The following options can be useful, but be careful when you use
them. The risk of each is explained along with the explanation of what
the variable does.
-
(Java system property: zookeeper.forceSync)
Requires updates to be synced to media of the transaction
log before finishing processing the update. If this option is
set to no, ZooKeeper will not require updates to be synced to
the media.
At this point, you you might be getting an idea where my story arc is going. I set forceSync to no.
top - 14:01:59 up 158 days, 22:58, 1 user, load average: 0.03, 0.06, 0.01
Tasks: 176 total, 1 running, 175 sleeping, 0 stopped, 0 zombie
Cpu(s): 0.1%us, 0.1%sy, 0.0%ni, 99.2%id, 0.5%wa, 0.0%hi, 0.0%si, 0.0%st
As you may have guessed the IO/WAIT dropped to near 0. But what does that mean? Am I a zookeeper tuning master? No it does not mean that at all.
IT MEANS THAT I HAVE SACRIFICED ABSOLUTE CORRECTNESS FOR REASONABLE PERFORMANCE!
It means that if I use this setting, and someone yanks the power cord from all my zookeeper servers at approximately the same time, it is possible that I could lose a write!
This type of thing is not uncommon, XEN for example can virtualize an fsync call.
As always, be on the lookout when someone scoffs at Cassandra's eventual consistency. You might want to ask them if they made the decision to spend $100,000 on a big iron zookeeper cluster, or turned off forceSync and lost durability in their quest for "absolute correctness".
Posted at 02:24PM Nov 11, 2012 by edwardcapriolo in General | Comments[8]
Schema VS Schema-Less
Schema Design is the cornerstone of making awesome databases. If you do not understand your data, do not understand what users need, and do not understand limitations of hardware and software you can not effectively design schema.
To understand schema design in Cassandra I think start with what Cassandra is:
http://wiki.apache.org/cassandra/
Cassandra is a highly scalable, eventually consistent, distributed,
structured key-value store. Cassandra brings together the distributed
systems technologies from Dynamo and the data model from Google's BigTable. Like Dynamo, Cassandra is eventually consistent. Like BigTable, Cassandra provides a ColumnFamily-based data model richer than typical key/value systems.
This statement used to be on the front page of http://cassandra.apache.org but the message gets re-crafted periodically. The current boilerplate is:
Cassandra's ColumnFamily data model offers the convenience of column indexes with the performance of log-structured updates, strong support for materialized views, and powerful built-in caching.
Saying the boilerplate changed is a bit of a misnomer. Cassandra has not fundamentally changed, but there is some window dressing on top of the core features.
For me, this all goes back to what "ColumnFamily model" means and how "schemaless" Cassandra should be. Lets dive into the column family model.
From http://research.google.com/archive/bigtable.html:
A Bigtable is a sparse, distributed, persistent multi-
dimensional sorted map. The map is indexed by a row
key, column key, and a timestamp; each value in the map
is an uninterpreted array of bytes.
(row:string, column:string, time:int64) ? string
A subtle difference between BigTable and Cassandra is that in BigTable (and
in Hbase) columns are multi-dimensional by time. That is to say that in
BigTable if one were to insert 'row:1, column:1, time1' and 'row:1,
column1:, and time:2' BigTable would have both versions of column1,where
in Cassandra only the last version of Column1 is kept. (In Cassandra we
can use UUIDs and composite columns to achieve a similar thing)
Lets take a moment to look at the simple API BigTable provides:
Table *T = OpenOrDie("/bigtable/web/webtable");
RowMutation r1(T, "com.cnn.www");
r1.Set("anchor:www.c-span.org", "CNN");
r1.Delete("anchor:www.abc.com");
Operation op;
Apply(&op, &r1);
The original Thrift API is very close to the BigTable API. Which is this simple yet powerful API to set/get and search opaque binary columns.
What makes the ColumnFamily schema-less awesome? One thing is sometimes schema gets in the way. For example, in a RDBMS you might spend a lot of time trying to craft the schema. You have to ask yourself questions like "Is field 3 an int? Is it a long? Is it a tiny int? Is it a varchar? Is it a char? Should field 3 be indexed so users can search on it?
The irony is that the answer to these questions are only micro optimizations. Imagine you chose field3 to be an int. More data will fit on disk then if you chose a long. But in the end if your database is not horizontally scalable it will not matter what you chose! Eventually after 100, 1000, or one billion records you will run out of disk, or your single node database will slow down for other reasons.
After I read the Big Table white paper I did what I think everyone should do: Try to understand it by doing it yourself. You can try to replicate it using Cassandra or Hbase it does not matter, just try to implement the WebData using the ColumnFamily model.
This is where the learning begins. You are probably used to relational database management systems and normalization. Every bone in your body is going to look for auto-ids and ways to break data up into normalized tables.
But now is the time to give up! DENORMALIZE! Watch, it is really easy! Say we model the web crawler database in SQL. Look how much work it is!
create table page (
id bigint auto_increment not null,
page varchar(1024) NOT NULL,
tstamp bigint NOT NULL,
primary key (page(300)),
unique(id),
index time_idx(tstamp)
);
create table image (
page_id bigint NOT NULL,
image_link varchar(1024) NOT NULL,
index img_idx (page_id,image_link(255) )
);
create table link (
page_id bigint NOT NULL,
link_text varchar(255) NOT NULL,
link_target varchar(1024) NOT NULL,
index link_idx (page_id,link_text)
);
create table raw_data (
page_id bigint NOT NULL,
raw_data text NOT NULL,
primary key(page_id)
);
Now watch how easy the de-normalized approach is!
create 'webdata', {NAME => 'image'},{NAME => 'anchor'},{NAME => 'raw_data'}
In fact it might even be easier!
create 'webdata' --or --
create column family webdata
WOW! All that schema does not help you scale if your database does not. When trying to store 1,000,000,000,000 URLS a tinyint instead of a bigint is not going to be the deciding factor, if the schema will meet your application needs for low latency reads!
Also you can't just add indexes after the fact! If it takes 40 minutes of blocked write downtime to add an index to a mysql table, how long is it going to take to add an index to a table with a trillion rows?
If you want to see my very simple head-to-head Relational DB vs Big Table proof of concept from about 3 years ago look here: http://www.jointhegrid.com/svn/hbench/trunk/src/com/jointhegrid/hbench/. It is a pretty neat application because you can just pump N urls with M random content into the system and see where data store falls over.
Here we see how schema gets in the way. But then why is schema being added to Cassandra, and is schema useful at all? My answers are "a lot of reasons" and "yes". I will explain the second answer first. Schema is useful for validation, it is a simple way to prevent yourself from packing the wrong bytes into a column. If we know that column 3 should always be a number there is no downside to enforcing this. This knowledge helps us display data reasonably in a CLI or client application, and if our data store wishes to create some type of reverse index on the type, knowing the type will help it do that.
What is CQL3 and why does it have so much schema? Well there is this matrix quote that sums up some of how I feel about CQL3 and Cassandra schemas. (see image below)
Schema's, relational databases, and built in indexes are the "System" to me. So are tools like Hibernate, ORMs, or Enterprise Java Beans, I will even take a shot at "the cloud" here. They are the 'system' because they seek not to help you understand how your data is organized on disk, rather they seek to abstract you from this.
This is not going to be a CQL rant. In fact I like a lot of CQL. I like how CQL will pack bytes for you, I like how CQL helps you insert and slice rather complex composite columns. What I do not particularly like is it s huge departure from BigTable white paper. CQL3 does not support SET (from BigTable white paper), instead you are writing INSERT queries. I do not always desire the ColumnFamily model to be abstracted or hidden from me.
I do not like how CQL uses the terms 'PRIMARY KEY' and 'TABLE'. These are terms from the 'System'. If you do not understand what a Column Family is you are blind to the truth, you will not be able to design an effective Cassandra data store.
'Add index' is a tool of the 'system' as well, to de-normalize you should be building your own indexes to understand what and index really means. You can learn more about Cassandra schema design from this SQLite document then you can by asking a tool to help you and remaining blind.
http://www.sqlite.org/queryplanner.html
An index is another table similar to the original "fruitsforsale" table
but with the content (the fruit column in this case) stored in front of the
rowid and with all rows in content order.
This is just another way of saying the most basic Cassandra schema advice, "Design your data as you wish to read it, eliminate seeks, use wide rows". An index is just another ordering of data. You lay down the data on disk in the way in which you wish to read it. If that means you have to lay it down on disk N times to make some searches faster, do that. (That is really what you have been doing all along with relational databases and indexes). Understanding how the data lays out on disk is more important then it being easy like mongo, or mysql, or whatever.
Posted at 11:52AM Oct 28, 2012 by edwardcapriolo in General | Comments[3]
Star wars: Attack of the Dremel Clones
One of the interesting subplots of "Hadoop Strata World" aka "Cassandra Denial World" was the battle for mindshare over who the HDCIC (Head Dremel Clone In Charge) is.
The two contenders are https://github.com/cloudera/impal and Apache Drill (of course its hard to say Drill is a contender since it is nothing but a proposal) Both are attempting to produce the best Dremel clone
If I had to sum up Dremel, I would sum it up in one sentence "Make protobufs query-able". Dremel does some clever stuff to break row orientated protobufs into a columnar form and store it in BigTable (for optimization on the read side as a more real time query system)
I had not recalled reading the dremel paper in a while, but after all the hoopla it was getting I decided to read it again. I came to a funny conclusion:
There is another open source Dremel implementation.
Like many of my "taco bell" exploits, my system does not require anything other then Hadoop and Hive. I know, I'm sorry, my system does not have 17 components and it's not 5,000,000 lines of source code. So its not sexy, I won't have 50 people in my company retweeting and calling it the most innovative thing in the last thousand years. I will not be unveiling it at a super huge conference with rap stars or anything like that.
Anyway here it is:
1) You write protobuf to a sequence file
2) You make a hive table declaring what protobuf objects are in the file
3) The protobuf is converted to nested struct types
4) You query the hive table using familiar hive syntax and lateral views
Let us see how this works by making some protobuf objects. Cars make great examples.
message TireMaker {optional string maker = 1;optional int32 price = 2;}message Tire {optional int32 tirePressure = 1;optional TireMaker tireMaker = 2;}message Accessory {optional string name = 1;optional string value = 2;}message Car {repeated Tire tires = 1;repeated Accessory accessories = 2;}
Now write those to a sequence file.
Then we create a hive table on top of this sequence file.
client.execute("load data local inpath '" + p.toString() + "' into table "+table+" ");
Then we query it.
Wow look at that! A system to query protobuf objects directly!
Why is this nice? Well. It works. You can query it. You do not need a stack of 40 components to make it work. It's not a work in progress.
Dremel and its clones "normalize" the object on insert by writing it to multiple tables/ column families/what have you. Hive-protobuf takes the other approach. We write the entire row as is to Hadoop. This is a trade off we are all familiar with. We are trading insertion speed for query speed. Both approaches have merit, possibly both can be used together.
The inspiration for hive-protobuf came from the hive-avro work recently added to hive trunk. Which one could argue is another Dremel clone since it lets you query complex structured avro docs.
Posted at 11:56PM Oct 27, 2012 by edwardcapriolo in General | Comments[4]
Programming Hive published!
Programming Hive has been published!
I wanted to take some time to talk about this book and my last Cassandra book. When I write about open source I only have a tinge of guilt that many people are involved in building this software and I get a disproportionate amount of credit for writing about it. I take great pride in the complements over this book and the last, but make no mistake, I stand on the shoulders of giants.
Many of my ideas are hatched from conversations with others, a blog, or something I may have never worked on had I not had the position I have now.
So Programming Hive, how did it happen? I am listed as the top author but most of the up front work was done by Dean Wampler. He started the contract with the publisher and got me on board. Towards the beginning of the process I wanted to capture some of the more complex and hidden features of hive. I wanted to talk about the thrift service, and writing User defined functions, and other things like getting your compression settings correct.
Dean and Jason went from the other direction. They hammered out the first few chapters (and a bunch in the middle) which essentially serve as a very complete version of the hive language manual that appears on the wiki. Dean is way better at making a narrative then me. He did a great job fitting all the sections together as I dug into hive's change log trying to pull out every undocumented feature that I could uncover!
The book was originally supposed to be about 200 pages. But this is BIG DATA so I thought GO BIG or go $HOME! So we went big. Then to boot towards the end case studies started poring in! So that was the story.
5 case studies, 3 chapters submitted by others, three authors, this makes me feel great. I kept calling it a book "for the people by the people". Thanks all.
Posted at 10:21PM Sep 29, 2012 by edwardcapriolo in General | Comments[3]
Making the case for open source at your organization and in your life
I started formulating this post a few weeks back. The draft for how I would argue my point was all over the place. It was not until last night that I realized why. It is because open source is not just something I do it is something that I am. Being involved in open source is a powerful transforming force in both your career and your life.
Last night, I was at a wedding and Cliff, someone I had met a while back, came up to me and said something like "I really enjoy reading your blog", and I remembered a couple people telling me the same thing at Cassandra Summit this year as well. As we were talking I took a quick trip down memory lane.
The wedding I was at was my co-worker Jay. Jay was not always my co-worker we knew each other through the Hadoop project and because Jay and I both spoke at Hadoop world in 2009 and from various NYC meetups. Congrats Jay and Maggie (Cliff and Jay used to work together).
The bitching time we have had at the wedding is a great example of how open source brings people together. Without open software, mailing lists, and user groups we might not all know each other. While our personal life and professional careers would not be devastated without open source, this community a helped us to expand in these areas.
There is much more to open source then just social well being and increasing your blog's readership. One of incredible things open source gave me was a type of code mentoring. An instrumental person in my open source development was Jeff Hammerbacher, at about the time I was learning about Hadoop and Hive, Jeff came to NYC to give a talk on the freshly open sourced hive. In his presentation he talked about the web interface component to hive.
After the talk I asked him about that since I did not see this component. He told me that facebook had an internal piece that was never open sourced and suggested that I could contribute that to hive. That suggestion was most responsible for tipping the dominions of my open source career. By working with hive I got code review from some of the top programmers in the world and increased my skill set tremendously. I grew from making a dinky web interface, to a slightly less dinky web interface, to writing user defined functions, to writing SerDe's to writing Hive Storage Handlers, to getting the opportunity to write a book on hive, to become a hive committer.
As a committer this comes full circle as I reviews others code and help them join the open source community. When we do the NYC Cassandra meetup and I speak to someone green to Cassandra, Big Data, or open source it is a great to know we are I might be helping to bring people into the next generation of growth in open source.
The above our great person reasons to get involved in open source, but what does an company stand to benefit from open source? I feel that many companies may be hesitant as they feel open source is 'giving away' code. It is certainly true that by open sourcing something a competitor could use it. I am also not making an argument that everything you do should be a candidate for open source, but in many cases there is much more to be gained then to loose.
The first thing to be gained is notoriety. My company m6d.com has allowed me to continue my work with hive and also given back to the community. On several occasions, I have read over a slide deck given at a major conference and on the hive contributor slide the thank you/credit has Facebook, Cloudera, Horton Works, M6d, Intuit. There is a good deal of pride in seeing our names with all those big companies. It is a nice acknowledgement. It makes us look good and it makes us feel good both on the business and the tech side.
But the main reason to open sourcing code is that other people help you make it better. This is truly a win/win situation! For example, we open sourced our rank udf in hive. Someone using it noticed that it has a bug, it can not rank NULL columns. They sent me the enhancement and now our code was made better. This really turns the "afraid to open source code" argument on its head. By open sourcing you gained a free contract programmer. It gave you a bug fix before you specifically ran into that problem. Many have been putting open source code on github and getting this type of snowball effect where a piece of software takes on a life of its own. That would have never happened if it was kept under lock and key.
Where do you go from here if you want to be more involved with open source? If you do not have a github account with code in it, get one. It is the new tech resume. Get involved in open source, and start conversations at your company about open sourcing software.
Posted at 10:51AM Sep 09, 2012 by edwardcapriolo in General | Comments[4]
Trying to find a fit for Yarn and Mesos
I have been following the development of Yarn and Mesos and done some tinkering over the past few months. If you have not ever heard of these projects get some information here:
http://nosql.mypopescu.com/post/27840903966/hadoop-yarn-beyond-mapreduce
https://github.com/mesos/mesos/wiki/
I have a good conceptual understanding of what can be done with these projects, but I have some trouble fitting them into my current infrastructure. There is no one specific reason but more of a question of 'What can this thing do that puppet can not?'.
I look at Yarn and I see a tool with one use a tool to stand up 3 revs of hadoop on the same hardware mainly because the migration path off a release is ugly. This is something I can already do with configuration management.
I look at the mesos examples and I see a container that 'starts a jail, installs http and installs ha-proxy'. Again, something I can already do with configuration management.
Maybe I have just been standing up clusters for too long so everything looks the same to me, but in my own head I have trouble sorting these things out. The big questions are:
- Can a technology like yarn or mesos be used together with puppet or chef?
- What at the best practices when using these two things together?
- In YARNs case. How many current software packages can yarn manage outside hadoop?
- MPI?
- Then what?
- Aren't yarn/mesos just sneaky forms of devops/noops?
- With clusters spinning up and falling on command how do we monitor this environment and guarantee quality of service?
- Couldn't AWS/open stack do this on a more general scale?
- Shouldn't we just all be using solaris zones?
Thinking deeper on #6. Really one of the things about solaris is they spent a lot of time making a virtualized environment. They spent time making resource controls. Controlling RAM, sockets, open files per process. Currently AFAIK there is no support in the mainline linux kernel for sharing/limiting disk IO like solaris has. When I look at yarn the only resource constraint I see is units of memory.
How are these platforms supposed to be successful when Quality of Service is an afterthought? Lets say you use YARN to spin up hbase or Cassandra and want low latency. Then randomly a map task lands on the machine and crushes the node. Just putting a cap on memory is not going to help as the map task is crushing your IO subsystem and degrading your service. This is like bringing the noisy-neighbors problem home to your private cloud.
Posted at 11:52AM Aug 10, 2012 by edwardcapriolo in General | Comments[7]