Cassandra

Bulk Loading Options for Cassandra

Cassandra's bulk loading interfaces are most useful in two use cases: initial migration or restore from another datastore/cluster and regular ETL. Bulk loading assumes that it's important to you to avoid loading via the thrift interface, for example because you haven't integrated a client library yet or because throughput is critical. 

There are two alternative techniques used for bulk loading into Cassandra: "copy-the-sstables" and sstableloader. Copying the sstables is a filesystem level operation, while sstableloader utilizes Cassandra's internal streaming system. Neither is without disadvantages; the best choice depends on your specific use case. If you are using Counter columnfamilies, neither method has been extensively tested and you are safer writing via thrift.

The key to understanding bulk-loading throughput is that potential throughput depends significantly on the nature of the operation as well as the configuration of source and target clusters and things like number of sstables, sstable size and tolerance to potentially duplicate data. Notably but not significantly, sstableloader in 1.1 is slightly improved over the (freshly re-written) version in 1.0. [1]

Below are good cases for and notable aspects of each strategy.

Copy-the-sstables/"nodetool refresh" can be useful if:

  1. Your target cluster is not running, or if it is running, is not sensitive to latency from bulk loading at "top speed" and associated operations.
  2. You are willing to manually, or have a tool to, de-duplicate sstable names and are willing to figure out where to copy them to in any non copy-all-to-all case. You are willing to run cleanup and/or major compaction understand that some disk space is wasted until you do. [2]
  3. You don't want to deal with the potential failure modes of streaming, which are especially bad in non-LAN deploys including EC2.
  4. You are restoring in a case where RF=N, because you can just copy one node's data to all nodes in the new RF=N cluster and start the cluster without bootstrapping (auto_bootstrap: false in  cassandra.yaml).
  5. The sstables you want to import are a different version than the target cluster currently creates. Example : trying to sstableload -hc- (1.0) sstables into a -hd- (1.1) cluster is reported to not work. [3]
  6. You have your source sstables in something like s3 which can easily parallelize copies to all target nodes. s3<>ec2 is fast and free, close to best case for the inefficiency during copy stage.
  7. You want to increase RF on a running cluster, and are ok with running cleanup and/or major compaction after you do.
  8. You want to restore from a cluster with RF=[x] to a cluster whose RF is the same or smaller and whose size is a multiple of [x]. Example: restoring a 9 node RF=3 cluster to a 3 node RF=3 cluster, you copy 3 source nodes worth of sstables to each target node.

sstableloader/JMX "bulkload" can be useful if:

  1. You have a running target cluster, and want the bulk loading to respect for example streaming throttle limits.
  2. You don't have access to the data directory on your target cluster, and/or JMX to call "refresh" on it.
  3. Your replica placement strategy on the target cluster is so different from the source that the overhead of understanding where to copy sstables to is unacceptable, and/or you don't want to call cleanup on a superset of sstables.
  4. You have limited network bandwidth between the source of sstables and the target(s). In this case, copying a superset of sstables around is especially ineffecient.
  5. Your infrastructure makes it easy to temporarily copy sstables to a set of sstableloader nodes or nodes on which you call "bulkLoad" via JMX. These nodes are either non-cluster-member hosts which are otherwise able to participate in the cluster as a pseudo-member from an access perspective or cluster members with sufficient headroom to bulkload. 
  6. You can tolerate the potential data duplication and/or operational complexity which results from the fragility of streaming. LAN is best case here. A notable difference between "bulkLoad" and sstableloader is that "bulkLoad" does not have sstableloader's "--ignores" option, which means you can't tell it to ignore replica targets on failure. [4]
  7. You understand that, because it uses streaming, streams on a per-sstable basis, and streaming respects a throughput cap, your performance is bounded in terms of ability to parallelize or burst, despite "bulk" loading.

Exploring Configuration Management with Ansible

 

What is Ansible?

Ansible is a configuration management and deployment system, like Puppet, Capistrano, Fabric, and Chef. Its aim is to be radically simple and let you use your existing scripts to help with cluster configuration and software deployment whenever possible. Here are the ways that Ansible differentiates itself.

Simplicity

Ansible does not include a client/server architecture with pull-based clients (although in more recent versions, it does include pull-based configuration and deployment). Rather, it uses pre-existing network infrastructure: SSH. Every company has SSH installed on their cluster servers, and Ansible simply rides on top of this infrastructure to get the code and configuration out to the nodes.

Language Agnostic

You can write modules for Ansible in whatever language you desire. You simply tie into its API and go. If you have wanted to use configuration management tools like Chef, but felt put off by the need to learn Ruby, this would be an ideal choice for you.

Configuration Management and Deployment

Ansible playbooks can run steps in a defined order between roles, allowing you to be sure that, for example, the database is set up before the WWW servers start and attempt to create their database schemas. 

Why Recommend Ansible?

Ansible fills a very useful niche. It is good for companies that have grown to the point where configuration management and deploy tools would save time and help manage complexity, but not large enough to hire specialised Ruby-speaking configuration management experts.

It is good for companies that already have invested effort into an SSH infrastructure for inter-node communication. Because Ansible piggybacks onto your already-existing SSH communications infrastructure, there is no need to build large, complex server infrastructure and schedule client runs.

Finally, Ansible is good for companies that don't have a lot of time to invest into learning a new tool. There is no need to spend weeks reading tutorials and watching videos from conferences; the tool is radically simple. A few hours with the documentation, and you're off and running.

If you'd like to hear more about how Palomino can help you with installing and configuring Ansible, feel free to contact us! If you have experience with Ansible, feel free to comment, and let's start a dialogue.

Quick script to get list of all jmx beans

Recently I've needed to get information from running Cassandra processes, in order to determine which parameters to monitor. jconsole can be used for this, however if you're using a host on AWS, or in a bandwidth-limited environment, you might find that it takes way too long, especially if all you want to do for the moment is get a list of all beans and their attributes. jmxterm is a good solution for this - a command line interface to your jmx port. So here's a couple of utility scripts that you can use to get all the beans and each of their attributes using jmxterm. You may need to change $jar, $host, and $port in the perl script to fit your environment, and also maybe change the domains in the bash script. Then you should be able to simply run the bash script to get a list of each bean and its attributes. I found this useful when looking for the correct syntax to set up some nagios monitoring for cassandra. For example, here is a snippet of the output:
org.apache.cassandra.internal
.
.
.
Checking bean org.apache.cassandra.internal:type=FlushWriter
  %0   - ActiveCount (int, r)
  %1   - CompletedTasks (long, r)
  %2   - CurrentlyBlockedTasks (int, r)
  %3   - PendingTasks (long, r)
  %4   - TotalBlockedTasks (int, r)
.
.
.
And I was able to figure out the syntax for a nagios check:
check_jmx!$HOSTADDRESS$!7199!-O org.apache.cassandra.internal:type=FlushWriter -A CurrentlyBlockedTask
Hopefully, these scripts might be useful to someone else trying to query cassandra (or any java process that uses jmx). get_all_bean_info.sh:
#!/bin/bash
 
DOMAINS=(
org.apache.cassandra.db
org.apache.cassandra.internal
org.apache.cassandra.net
org.apache.cassandra.request
)
 
for domain in ${DOMAINS[@]}
do
    echo "-------------------"
    echo $domain
    output=$(./query_jmx.pl $domain 2>/dev/null | tr ' ' '+' | grep '=')
    for line in $output
    do
      bean=$(echo $line | tr '+' ' ')
      echo "Checking bean $bean"
      ./query_jmx.pl $domain $bean 2>/dev/null | grep -v "#" | grep -v "Got domain"
    done
done
query_jmx.pl:
#!/usr/bin/env perl
use strict;
my $jar = "/home/ubuntu/jmxterm-1.0-alpha-4-uber.jar";
my $host = "10.176.66.219";
my $port = 7199;
 
my $domain = shift @ARGV;
my @beans = ();
my $bean;
my $size;
for my $arg (@ARGV) {
  if ($arg =~ /^\w/) {
    push (@beans, $arg);
  } else {
    last;
  }   
}     
$size = @beans;
$bean = join(' ',@beans) if ($size > 0);
open JMX, "| java -jar $jar -n";
print JMX "open $host:$port\n";
print JMX "domain $domain \n";
if (defined $bean && length $bean > 0) {
  print JMX "bean $bean \n";
  print JMX "info \n";
} else {
  print JMX "beans \n";
}
 
print JMX "close\n";
close JMX;

New England Database Summit

The New England Database Summit is an all day conference-style event where participants from the research community and industry in the New England area can come together to present ideas and discuss their research and experiences working with on data-related problems.  It is an academic conference with applications to real life, and includes any type of database.

The 5th annual NEDB will be held in Cambridge, MA MIT (in 32-123) on Friday, February 3, 2012.  Anyone who would like is welcome to present a poster (registration required), or submit a short paper for review.  We plan to accept 8--10 papers for presentation (15 minutes) at the meeting.   All posters will be accepted.

For more details, and to register and / or upload a paper, see:

http://db.csail.mit.edu/nedbday12/

Liveblogging: Cassandra Internals

Cassandra Internals by Gary Dusbabek of Rackspace
Questions?
What's the best way to access data if you're running a program in the same JVM as Cassandra? -- will talk about it during StorageProxy section of the talk
Performance characteristics of using MMAP vs. not using it? - won't cover it.
When does repair happen?  will talk about it during repair part of the talk
How do Snitch and replication strategy work together? -- will discuss though there is no slide on it.
Ring services - services that go throughout the ring.  These are in a class called StorageService.
Storage services - things that happen locally.  In a class called StorageProxy.
The cassandra executable in /bin executes cassandra.in.sh, which does:
- sets $CLASSPATH
- looks for the .jar files
- sets $CASSANDRA_CONF (mandatory, where yaml file lives)

 

- sets $CASSANDRA_HOME (not mandatory)
then it looks for another file [didn't get what it was] which:
- determines heap size
- sets max heap size by default to 1/2 available memory
- sets the size for the young generation for Java GC
- sets "a whole bunch of other -X options for Java"
... then it goes to the main() class, org.Apache.Cassandra.Thrift.CassandraDaemon, which:
extends AbstractCassandraDaemon, the guts of the startup sequence.  Has a method called setup(), raises config file from a Database Descriptor class.  
"Database Descriptor is an awful class."
- loads yaml file, reads into a config object, gets all the settings.
- then calls DatabaseDescriptor.loadSchemas() and loads the schema based on the last versionID, and sets them up to store them in the system column families (in the system datadir, schema column family).
- scrubs the data directories, takes out the trash (e.g. leftovers from compaction, bits and pieces from other SS tables)
- initializes the storage (keyspaces + CFs)
- Commit log recovery: CommitLog.recover() (row mutations)
- StorageService.initServer() and StorageService.joinTokenRing -- this is where the magic of joining the ring happens
  -- starts gossip
  -- starts MessagingService
  -- Negotiates bootstrap
  -- knowledge of ring topology is in StorageService.tokenMetadata_ (btw underscore at end of a member variable means it's old facebook stuff, b/c that's their naming convention)
  -- partitioner is also here.
Configuration
- in DatabaseDescriptor, really a side effect of AbstractCassandraDaemon.setup
- reads config settings from yaml
- defines system tables
- changes regularly
It uses a static initializer, so we might end up making a change that happens when we're not ready for it.
MessagingService
- Verb handlers live here (initialized from StorageService)
 -- main event handlers, haven't changed much
- Socket listener
 -- 2 threads per ring node
- Message gateway
 -- MessagingService.sendRequestResponse()
 -- MessagingService.sendOneWay()
 -- MessagingService.receive() -- when another node contacts you, this is the method that's used to pass the message to a verb handler
- Messages are versioned starting in 0.8
 -- with IncomingTCPConnection
StageManager - fancy java ThreadPoolExecutor
- SEDA design: http://www.eecs.harvard.edu/~mdw/papers/seda-sosp01.pdf
Adding the API Methods
- open up cassandra.thrift file in the interface directory, this is where you describe methods and new data structures
- regenerate files with ant gen-thrift-java gen-thrift-py
- implement stubs: o.a.c.thrift.CassandraServer
StorageProxy - where local reads and writes happen.
- Called from o.a.c.thrift.CassandraServer
- write path changed in new version b/c of counters
 -- notion of WritePerformer
- eventually to Table and others
- for reads, there's a local read path and remote read path
 -- Socket->CassandraServer.  Looks at permissions, request validation, and marshalling.  
ReadCommands created in CS.multigetSiceinternal, passed to StorageProxy -- 1 per key.
StorageProxy iterates over the ReadCommands, then runs StorageProxy.read(), .fetchRows(), determines endpoints.
Locally, StorageProxy:
- READ stage executes a LocalReadRunnable
- True read vs. digest
- Table, ColumnFamilyStore
Remotely, StorageProxy:
- serializes read command
- Response handler
- Send to remote nodes
ReadRepair happens in StorageProxy.fetchRows()
Writing -- follows similar pattern to reads -- there is a local path and remote path.
- The marshalling turns into row mutations in CS.doInsert()
- StorageProxy.sendToHintedEndpoints
- RowMutation - one key per row (several CFs), so it calls ColumnFamilyStores.apply() to update the memtables.
RowMutation is serialized into a Message.
Then he goes into the challenges of working with the code, which I won't reproduce here.

Liveblogging: Migration from 100s to 100s of Millions

Migration From 100s to 100s of Millions – Messaging to Mobile Devices with Cassandra at Urban Airship Urban Airship - provides hosting for mobile services that devs should not build themselves (e.g. push notifications, content delivery services, etc). Provide a unified API for all these services across all platforms (Andriod, iOS, RIMM, etc). Starting to have SLAs for throughput and latency. Transactional intake system at Urban Airship: API - Apache/Python/django+piston+pycassa Device Negotiation layer - Java NIO+Hector Message Delivery layer = Python, Java NIO + Hector Device Data checkins = java HTTPS endpoint Persistence - sharded postgreSQL, Cassandra 0.7, "diminishing footprint" of MongoDB 1.7 What do they use? Cassandra HBase redis (analytics internal measurements) MongoDB (phasing out) They don't use Riak. Converging on Cassandra + PostgreSQL for transactions, HBase for data warehousing. They started with PostgreSQL on EC2, but had so many writes that after 6 months they couldn't scale, so they went to MongoDB. MongoDB had: heavy disk I/O problems, non-sophisticated locking caused locking, deadlocking and replication slave lag that was just not working out for them. So they moved to Cassandra. Why? - Well-suited to data model - simple DAG's - lots of uuids and hashes which partition well - retrievals don't need ordering beyond row keys or time-series data (e.g. doesn't matter what order 10 million devices are retrieved, just need to retrieve them!) - Rolling minor version upgrades are easy in Cassandra, no downtime. - Column TTLs were huge for them (and resulting expiration) - Particularly well-suited to working around EC2 availability problems - They needed to partition across multiple availability zones, they didn't trust fault containment within one zone. - Read repair and handoff generally did the right thing when a node would flap (Ubuntu #708920) - No single point-of-failure - Ability to alter consistency levels (CL) on a per-operation basis - some things aren't important to be consistent right away, others are very important. Cassandra tips: - Know your data model - creating indexes after the fact is a PITA - design around wide rows (but be careful of I/O, Thrift, Count problems) - Favor JSON over packed binaries if possible (unless you're Twitter) - Be careful using Thrift in the stack - having other services that use Thrift that have to talk to Cassandra has some painful versioning limitations. - Don't fear the StorageProxy. - Looking at the Cassandra source code and getting your hands dirty with the Java code is a MUST. - Assume the client will fail (difference between read timeout and connection refused) - When maintaining your own indexes, try and clean up after failure. (i.e. have a good rollback strategy) - Be ready to clean up inconsistencies anyway - Verify client library assumptions and exception handling, make sure that you know what's going on when the client communicates that it couldn't do a write. Understand what the client is doing so you can figure out whether to retry now or later or what. - Embedding Cassandra for testing really helped Cassandra in EC2: - Ensure Dynamic Snitch is enabled (also make sure you check your config files during upgrades...they had Dynamic Snitch off in 0.6 due to bugs, when they upgraded to 0.7 they didn't turn it on) - Disk I/O - avoid EBS except for snapshot backups ... or use S3. Stripe ephemerals, not EBS volumes, because Cassandra is network I/O heavy (b/c EBS is a networked disk). - Avoid smaller instances all together -- i.e. avoid virtualization if you can - Don't assume that traversing a close-proximity availability zone is more expensive than in the same availability zone -- it is sometimes, often isn't. (No comment on different regions, haven't tested yet) - Balance RAM costs vs. the costs of additional hosts. Spend time with the GC logs. Java best practices: - ALL Java services, including Cassandra, are managed via the same set of scripts. For them, in most cases, operators don't treat cassandra different from HBase, one mechanism to take a thread or heap dump, all logging is consistent for GC, application, stdx for HBase and Cassandra, even init scripts use the same scripts that the operators do. - Bare metal will rock your world - configure +UseLargePages will be good to (on bare metal) - Get familiar with GC logs (-XX:+PrintGCDetails), understand what degenerate CMS collection looks like, and what promotion failures look like. Urban Airship settled at -XX:CMSInitiatingOccupancyFraction=60, lowered from the default of 75, to do CMS collection before there's a problem, to avoid promotion failures. Operations: - Understand when to compact - Understand upgrade implications foro data files - Watch hinted handoff closely - Monitor JMX religiously Looking forward: - Cassandra is a great hammer, but not everything is a nail - Co-processors would be awesome (hint hint!) - They still spend too much time worrying about GC - Glad to see the ecosystem around the product evolving, CQL, Pig, Brisk

Liveblogging: Indexing in Cassandra by Ed Anuff

Indexing in Cassandra

First, a brief history:

Cassandra 0.6

- No built-in secondary

- all indexes were custom-built like using supercolumn

Cassandra 0.7

- new users flocked to the built-in secondary indexes

pros - easy to use out of the box

cons - not the same as SQL indexes but they look similar

- reinforce data modeling that plays *against* cassandra's strengths

Present Day

- new users can get started with Cassandra w/out understanding internals, using CQL

- Veteran users are using advanced techniques like composites that aren't really documented anywhere.

- New user panic mode when they try to use the next level and find themselves in the deep end.

 

Quick review of Indexing

2 ways of finding rows - primary index and alternate indexes

primary index (row keys):

- sometimes it's meaningful (natural key)

- usually not, like a uuid

 

Get vs. find:

using row key is best to retrieve info if you've got precise and immutable 1:1 mapping

if you plan to iterate over keys, you're probably doing something wrong. (that's finding, not getting)

So search shouldn't really use primary keys, just 'get'.

 

 

alternate indexes (everything else)

Native Secondary indexes:
- easy to use, looks like SQL
- every index is stored as its own "hidden" column family (CF)
- nodes index the rows they store
- when you issue a query it gets sent to ALL nodes (no partition pushdown)
- Currently does equality ops, range get performed by memory coordinator node.
This behavior contributes to these limitations:
- Not recommended for high cardinality values (timestamps, birthdays, keywords, etc)
- Requires AT LEAST one equality comparison in a query, not efficient for less than, greater than or range queries
- Unsorted - results are in token order, not query value order
- Limited to search on data types Cassandra *natively* understands.
wide rows as lookup and grouping tables
"Why would a row need 2B columns?
- It's the basis of all indexing, organizing and relationships in Cassandra?
- if your data model has no rows with >100 columns, you're probably doing it wrong (you're thinking in relational terms!)
Inherently, wide rows work as a simple index -- 
indexes = {
"User_Keys_by_last_name": {
 "aaaa"
"aaaab"
etc...
CF as indexes
-cf column ops very fast
-column slices can be retrieved by range, are always sorted, can be reversed, etc.
-if a target key a TimeUUID you get both grouping AND sorting by timestamp.  Good for inboxes, feeds, logs, etc
- This is the best option when you need to combine groups, sort and search, such as a friends list, inbox, etc.
But...what about 2 people with the same last name?  (non-unique keys)
In the docs, the first answer you might find is SuperColumns - lets you have your col name have >1 col value.  So, 2 row keys for people with 1 last name
Use SuperColumns with caution.
- Not officially deprecated, but they're not highly recommended either.
- sorts only on the supercolumn, not the subcolumn
- some performance issues
- Cannot do more nesting, can only have 1 level of subcolumns
- Many projects have moved away from supercolumns b/c of these limitations.
So, let's revisit regular CF's -- what happens with >1 person with the same last name?  you can't have 2 cols with the same column name.  You can do a composite column name:
"User_Keys_by_last_name": {
("alden", 1): "e5d",
("adams", 1): "et3",
("anderson", 1): "e5f",
("anderson", 2): "e71..",
("doe", 1): "a4f",
("franks", 1): "f4e",
Composite column names
Comparator = "CompositeType" or "DynamicCompositeType"
 - you don't lose the sort capability - sorts by component values using each component type's sort order
2 types of composites, static and dynamic
column_families:
name: My_Composite_Index_CF
compare_with: CompositeType(UTF8Type, UUIDType)
-- note in static composite types, fixed # and order of columns in the definition
name: My_Dynamic_Composite_Index_CF
compare_with: DynamicCompositeType(s=>UTF8Type, u=>UUIDType)
-- Any # and order of types at runtime, the definitions are just for convenience and smaller serialized component names.
The main difference is whether you need to create one CF per index s, or one CF for all indexes with one row per index
How does this work?
Queries are easy - just regular column slice ops
Updates are harder - need to remove old value and insert the new value -- this is why they recommend starting with the built-in native secondary indexes. You have to know how to remove old values then insert new values, which involves a read before write.
Example - Users by Location
Use 3 CFs, not 2 for safe concurrent access
First 2 CF's are natural:
Users
Indexes
We also need a 3rd:
User_Index_Entries
Users = {
"username": "..."
"location": <location>
}
Indexes = {
Users_By_Location" : {
  {<location>, <user_key>, <ts>} : ..., ...: ..., 
  }
}
Users_Index_Entries = {
<user_key>: {
 {"location", <ts 1>}: <location 1>,
 {"location", <ts 2>}: <location 2>,
 {"location", <ts N>}: <location N>,
Allows you to read the previous index value from Users_Index_Entries CF and delete the previous one. 
Read from Users_I_E WHERE KEY=<user_key>;
DELETE FROM Users_Index_Entries
DELETE FROM Users_By_Location
UPDATE Users_Index_Entries
UPDATE Users_By_Location
B/c there's a timestamp, doing it >1 time has no consequence, so eventually consistent.
What if something goes wrong?  Repeat batch operation until it completes.
False positive?  possible, so if it's a problem, filter on the reads.
This approach is VERY common -- with some variations.  So use this *idea*, but not necessary to be an exact copy of this example.
At least now, composite indexing is now standard. 
Can do derived indexes -- create a "last_name, first_name" index from a "fullname" column.    Can also unroll a JSON object to construct deep indexes of serialized JSON structures.
- Include additional denormalized values in the index for faster lookups
- use composites for column values, too -- not just column names.
custom secondary indexes
Note: no official alternate index "way".  Everything talked about here is using an official Cassandra feature/property.
How can I learn more?
Sample using Hector:
JPA implementation for this using Hector:

 

Jira entry on this:

http://issues.apache.org/jira/browse/CASSANDRA-2231

Liveblogging: A dozen databases in 45 minutes

 

Actually, the slide is 12 databases in 25 minutes (and 20 minutes of theory) by Eric Redmond (@inviite).  

Complex data:  A lot of data isn't really complex, it's just modeled in a complex way.

"Complexity is a symptom of confusion, not a cause" Jeff Hawkins.

 

NoSQL

  Linear Scalability

  Ability to be Distributed

  Low Latency

 

SQL 

  Not NoSQL

 

ACID (transaction-based)

  Atomic - Transactions are "all or nothing"

  Consistent - system data will have integrity

  Isolated - Transactions can't see each other

  Durability - changes aren't lost

 

BASE (request based)

  Basically Available

  Soft state

  Eventual consistency

Redmond says: ACID is negative (avoid negative things), BASE is mostly positive, things will be good, not great.

CAP theorem - it's a fact :D

Consistent, Available, Partition-tolerant web services.  "It is impossible to reliably provide atomic, consistent data when there are partitions in the network.  It is feasible, however, to achieve

Note that "consistent" is not the same as "consistent" in ACID, it's more like Atomicity.

Strong consistency: when an update completes, subsequent access returns the new result.  [to my mind this is actually durability]

Weak consistency - eventual consistency

"Correct consistency" - is the most important part.  DNS, for example, is eventual consistency.

Common patterns:

Replication

 CouchDB has an amazing ability to do this, Mongo is also good but not as good.

- copying data amongst nodes in a distributed database.  Lazy (optimistic) replication, gossip (nodes communicate to stay in sync). - master/slave (mongo)

- master/master (riak, couch)

  - vector clocks (keep track of write order per client

  - mvcc (mysql) 

N/R/W

  N - Nodes to write to (per bucket)

  R - Nodes read from before success

  W - Nodes written to before success

Amazon Dynamo does this (Cassandra and Riak do this) - supports both CP and AP in one db (from the CAP theorem)

Consistent Hashing

Balance your servers, and when you hash your keys, if a server goes down or is added you don't have to rebalance ALL nodes, just some % of them.

Mapreduce

 

Relational Models:

"Nothing beats relational databases for raw queryability."  The tradeoff -- you have to structure your data and tell the system how it is structured.

PostgreSQL (full featured) - http://bitbucket.org/ged/ruby-pg, http://github.com/Casecommons/pg_search, http://github.com/tenderlove/texticle

MySQL (lighter) - http://gitub.com/oldmoe/mysqlplus, http://github.com/brianmario/mysql2

Drizzle (lightest) - http://www.drizzle.org

 

Bigtable/Columnar Style

 

What makes it columnar?  well, a primary key is really a row key, and then you have column families, which are columns, stored together.  (each column's values are stored together as opposed to the row being stored together.)  You can set expiry for a column family too, after which the data expires (which is why it's great for Google).

HBase - http://hbase.apache.org - Google's BigTable implementation, which was born of Hadoop (Java mapreduce engine).  If you want to use HBase in production, use Thrift (http://thirft.apache.org) which Cassandra also uses). This is CP, but configurable to AP.  Does sequential reads and column versioning, strong but flexible columnar schema.

Cassandra - hybrid.  Node architecture like dynamo - data structure like BigTable w/column families - http://cassandra.apache.org - Good for hundreds of nodes in the same data center, if there is more than that or different data centers, use HBase (that's what Digg and Facebook are running into).  In cassandra you set up your schemas with an XML file, not with DDL.  Benefits - sequential reads of ordered keys, also has versioning.  It's AP, configurable to CP.

 

Documentation Datastores:

MongoDB (AP focused - master/slave)

http://www.mongodb.org - created to be huge (huMONGous).  Made to be partitioned, distributed, needed ad hoc queries.  Wasn't built to be durable.

 

CouchDB

Not made to be distributed, originally, was meant to be very durable.  AP focused (master/master)

http://couchdb.apache.org

http://tilgovi.github.com/couchdb-lounge (clustering)

MapReduce in Mongo is an ad hoc query, comfortable for relational db ppl.  In CouchDB, you make views and then request data from those views.  

Riak - The most "architecturally cool" database out there.  It's a dynamo implementation that is purely REST based.  It's a key-value store, but it's not descriptive enough -- it has map-reduce built in, metadata and links you can walk.  You can store ANYTHING in riak -- not just text.  example: getting a JPG file from the web and putting it as the value for the key "firefox.jpg".  Neat demo.

Riak has a ring, eventual consistency, can pull nodes in and take nodes out, without having to invalidate all the ids.  It has quorum consistency, which blows Eric's mind, but we didn't have 

 

Key/value stores

memcached - don't use it

Kyoto Cabinet - don't use it

Redis - use it - http://redis.io - it can handle lists, hashes, can intersect the value of 2 keys (such as person and pet, to find out who owns which set).

 

Graph datastores - you walk the graph instead of querying or doing mapreduce.

Neo4j

FlockDB  - distributed, "unless you're twitter, you don't need to use it".  It's not really possible to distribute a graph database, you can't walk it and do node traversals, you can just walk edges (you can do friends, but not friends of friends, etc).

Slides are available at https://github.com/coderoshi/holy-grail-dbs

 

 

Cassandra RandomPartitioner Tokenizing

 

So I'm creating a new cluster, and after setting up I needed to get my tokens.  As we're told in http://wiki.apache.org/cassandra/Operations:

 

Token selection:

Using a strong hash function means RandomPartitioner keys will, on average, be evenly spread across the Token space, but you can still have imbalances if your Tokens do not divide up the range evenly, so you should specify InitialToken to your first nodes as i * (2**127 / N) for i = 0 .. N-1. In Cassandra 0.7, you should specify initial_token in cassandra.yaml.  

Here's a nice simple code snippet to figure out your RandomPartitioner tokens based on the size of your cluster:

 

#! /usr/bin/python

 

#nodes = int(raw_input( "How many nodes?" ))

import sys

nodes=int(sys.argv[1])

def tokens(nodes):

    for i in range(1, nodes + 1):

        print (i * (2 ** 127 - 1) / nodes)

 

 

This should give something like this:

 

[root@cassandra06 conf]# ./tokenizer.py 6

28356863910078205288614550619314017621

56713727820156410577229101238628035242

85070591730234615865843651857942052863

113427455640312821154458202477256070484

141784319550391026443072753096570088105

170141183460469231731687303715884105727

Using SSH Proxying with Jconsole to remote Cassandra instances

This guide leans heavily on the work of http://simplygenius.com/2010/08/jconsole-via-socks-ssh-tunnel.html — what I’ve done is collect his work into something a little more manageable for our environment.

Prerequisites

This guide assumes a few things are set up:

  1. That you’ve got ssh keys pushed around to do passwordless logins between your machine and your intermediate client machine
  2. That you’ve got cassandra up and running remotely
  3. That cassandra is listening on 8080 for it’s JMX service port

The Meat

This hunk of bash script is the meat of making this work.  Put the following in your .bashrc.  Make sure to edit proxy_host= to match your environment.

function jc {
    # set this to the host you'll proxy through.
    proxy_host="remoteuser@remotehost -p 22"    host=$1

    jmxport=8080
    proxy_port=${2:-8123}

    if [ "x$host" = "x" ]; then
        echo "Usage: jc <remote server> [proxy port]"
        return 1
    fi 

    # start up a background ssh tunnel on the desired port
    ssh -N -f -D$proxy_port $proxy_host 

    # if the tunnel failed to come up, fail gracefully.
    if [ $? -ne 0 ]; then
        echo "Ssh tunnel failed"
        return 1
    fi

    ssh_pid=`ps awwwx | grep "[s]sh -N -f -D$proxy_port" \
        | awk '{print $1}'`     echo "ssh pid = $ssh_pid"     # Fire up jconsole to your remote host     jconsole -J-DsocksProxyHost=localhost -J-DsocksProxyPort=$proxy_port \         service:jmx:rmi:///jndi/rmi://${host}:${jmxport}/jmxrmi     # tear down the tunnel     kill $ssh_pid }

Then, either close your shell, or source your .bashrc.  Then you should be able simply to call your function like so:

    host$ jc cassandra-host01

Jconsole will pop up, and log you in.

-Gabriel

Originally available at http://gabrielcain.com/blog/

Syndicate content
Website by Digital Loom