Episode #1: http://gebski.tumblr.com/post/62260870845/bakin-bakin-bakin-logging-for-enterprise-scale
Episode #2: http://gebski.tumblr.com/post/63404293910/bakin-bakin-bakin-logging-for-enterprise-scale

Let’s get back to our enterprise logging scenario: we’ve kinda already sketched the overall topology: we’re gonna collect events using MOM (RabbitMQ), process them in Twitter Storm (aka Apache Storm) and finally we’re going to put the processed events in distributed document database (Cassandra) and file storage (HDFS).

All of that to get rid of all good SQL Niiiiice, and surely we can enumerate many pros (I believe I did it in prior posts), but aren’t we missing something? Let’s challenge ourselves:

Challenge #1: We don’t need backup! Erm, do we?

We don’t want to archive data. Archived data is not immediately available for processing and usually cherrypicking data to archive is time consuming. That’s why we decided to go for out-scalable solution that can grow (in theory indefinitely). Keeping in mind the redundancy we’ve introduced we don’t need backups. BUT. What about disaster recovery? (http://en.wikipedia.org/wiki/Disaster_recovery)

Sure, we can get a nice replication factor, but what happens if something takes down (irreversibly) whole data center? Do we have to start over? I don’t think anyone of the business decision makers would find that acceptable… Fortunately we’re not as doomed as we may seem to be:

  1. Let’s start with our SVoT - Cassandra: what we can do is to split our cluster into two groups (http://www.datastax.com/docs/0.7/operations/datacenter), each with required redundancy, one of them actual production environment (that supports all the reads) and the other one - actual hot-swappable backup site. Each group can operate on its own (if the other one goes down) and has a suitable replication factor (if one or more of its own nodes go down): http://stackoverflow.com/questions/13647921/configuring-apache-cassandra-for-disaster-recovery
  2. What about other data store - HDFS? In theory - this is far more tricky as we’re talking about data streamed into files, that is not immediately flushed. But the most important question is - do we need to set up a DRP for the secondary data store? Not in our case, because: data lost from HDFS can be recovered from Cassandra (it’s the same data, but in a different format) and data in HDFS isn’t critical in terms of availability (Cassandra is).

Challenge #2: Distributed Transactions, hooooo!

So, there’s an event in RabbitMQ, Storm picks it up and all the mayhem starts - the actual processing of the event *forks* between several bolts and their children-bolts, etc. - in the end, the final bolts write / send the processed events to storage or some other services. The question is: what happens when some of them happen to fail?

Should we go for distributed transactions (http://en.wikipedia.org/wiki/Distributed_transaction)? Hell no.

Fortunately Storms saves the day here with its guaranteed message processing (http://www.slideshare.net/nathanmarz/storm-11164672) - if event is not fully processed within whole bolt sub-tree, processing gets replayed. Surely, there are 2 problems with that:

  1. Some child-events may be processed more than once (record may appear more than once in the storage) - but as long as you have an unequivocal event identification, you can filter it out later.
  2. If the error is caused outside of your Storm cluster (for instance, Hadoop’s Name Node is not accessible) you have to bail out anyway (after a given number of retries) - but as long as you mark such events in a clear way (DEAD_LETTER?) you can deal with them later (for instance, using a reconciliation procedure). Of course you should apply a bit different strategy to your SVoT and secondary storages.

Challenge #3: Out-scaling multi-tier processing

Yupp, it’s all about several stages of dependant processing: event producers put events in RabbitMQ queus, several Storm Supervisors process the events picked up from RabbitMQ and those end-up written to two kinds of storage: how should we manage M:N relationship between node numbers of subsequent tiers? How does M RabbitMQ nodes should got read by N Storm spouts?

To be honest, we went for the easiest approach: wherever we could, we went for M=N. We’ve created the number of RabbitMQ queues that corresponded well to number of spouts and we’ve prepared a pre-set spout to queue assignment. Event is classified to a queue based on some trivial criteria (last digit in event ID or sthng). Works like charm - easiest solutions are always the best.

[ To be continued - next time I’ll write a bit about event ordering & high availability ]

Share this post