Producing a lot of data doesn't bring value by itself, even if data actually makes sense.
Storing a lot of data doesn't bring value by itself either.
The ability to process all this data is the key to bring some actual value.
What is more, the processing has to:
- be efficient
- be ready for continuous adjustments, enhancements, improvements & further development
Honestly, if you're dealing with relatively simple data & straightforward analysis, you don't really need sophisticated tools: we've found out that in case of Hadoop (or to be more precise: Hadoop over HDFS), a simple abstraction layer like Hive brings you at least 80% of what you need. But Hive has its limitations: in its chase to be as close to SQL as possible, it lost extensibility of traditional Map/Reduce jobs - you can't plug-in your own code in the processing that conveniently (if it's not HQL).
That's why I was pro-actively looking for different ways to write efficient & flexible data processing jobs, preferrably not restricted to Java. And that's how I've found ...
Cascading & Scalding
A short clarification first: Both those libraries represent the same approach - Cascading was the initial implementation with an API in Java & Scalding is just a Scala API to Cascading made by Twitter (AFAIR). It should be clear that there's no point in writing about those two separately, I'll just focus on Scalding, because I prefer to use Scala over Java (by freakin' far).
O'righty, to the point then: Scalding is the library that allows you to stream the content of files & transform them using the functional paradigm. If you've just thought about 'monads', you're freaking right. What's so cool about that then? Except the hype about functional programming of course ...
Well,
- you're not using DSL like in Hive, so you're able to plug your code in directly
- transforming streams using Java/Scala codes seems more convenient than mapping everything manually to series of M/R jobs (even if that's what is actually happening under the hood of Cascading ...)
- Scalding doesn't care whether it's working on local files or HDFS: it works in exactly the same way - that, plus the usage of Java/Scala makes it really easy to test
"Talk is cheap, show me the code"
First, inputs & outputs - these work pretty much like what you can experience in Pig: you have a source and a destination, but you have slightly better control over the format. For instance:
val source1 = TextLine(fileName1) // you can read/write whole lines from file ...
val source2 = Tsv(filename2) // ... or read lines with fields separated by tabs ...
val source3 = Csv(filename3) // ... or commas, etc.
You can directly specify the schema of fields you expect:
val schema = ('exchange, 'stock_symbol, 'date, 'stock_price_open, 'stock_price_high, 'stock_price_low, 'stock_price_close, 'stock_volume, 'stock_price_adj_close)
val input = Tsv("/input.tsv", schema)
As you can see API doesn't care where is the file taken from (this can be overriden) - typically you're switching between two parameters while you run Scalding run script. For instance (scald.rb is a default script to run Scalding jobs, that comes together with Scalding):
./scripts/scald.rb --local /local_dir/Playground.scala
./scripts/scald.rb --hdfs /hdfs_dir/Playground.scala
Enough of reading, let's process
The simplest option is ... to do nothing (but still: in a nice, fluent way ;D):
input
.read
.write(output)
But it doesn't seem to be very helpful. Anyway, we have few more interesting options, like creating additonal field based on the information from another fields in the same row:
input
.read
.map(('stock_price_high, 'stock_price_low) -> 'stock_price_avg) { prices: (Double, Double) => (prices._1 + prices._2) / 2 }
.write(output)
filtering, flat mapping (row can be mapped to several items, but they are flattened to one level of output rows) or grouping
input
.read
.groupBy(('exchange, 'stock_symbol)) { group => group.size.max('avg -> 'max_avg).min('avg -> 'min_avg) }
.write(output)
You may have noticed the odd syntax in lambda: there are 3 aggregations on the group level (size, min, max), but they are chained one after another (even if they happen on the same dataset in parallel. Well, that's how it works and you can apply that syntax for even more sophisticated scenarios (size.min(...).max(...).average(...). ..., etc.).
Btw. doesn't the code above seem like a typical functional, fluent processing in Scala? If so, what's the gain, it's just processing collections - seems basic stuff.
Well, there's a point: MEMORY. In a typical small-scale scenario you're dealing with hundreds or thousands of elements: C/S is supposed to deal with streaming data from Big Data storage: keeping anything like that in memory is not an option. And that's what C/S covers for you.
When 1 data stream is not enough ...
... the fun part begins ;D
We're not talking about the RDBMS, but still, it may occur that you'll have to merge some related data (using joins, aided with foreign keys) - in Scalding it works in quit a similiar way. Here comes the short example:
First, let me create a tiny dictonary-like pipe, straight from the list in memory:
val descs = IterableSource(List(("AA", "AA something"),
("AAI", "AAI something"),
("AAN", "AAN something")), ('dict_stock_symbol, 'dict_desc))
It takes two parameters:
- the 1st one is the content of pipe (list of something)
- the 2nd one is the list of columns in something
Joining is trivial, but you have to use a specialized function, optimized for your purpose. For instance, in my case it'd be:
input
.read
.leftJoinWithTiny('stock_symbol -> 'dict_stock_symbol, descs)
.write(output)
Code seems to be pretty much self-explaning: just field mapping and the pipe that gets sucked in. Voila.
Fasten your seatbelt
These atomic operations are just cogwheels in your new machine. The simpliest ones. Scalding makes you also able to (not a complete list, just examples):
- freely convert your field lists to objects and back again using pack & unpack
- reduce the rows using custom functions you write (reduce, foldLeft)
- change the data orientation using pivot & unpivot
- filter, sort, concatenate & project pipes
- use so-called type-safe API (tuple based) if you don't want (regardless of the reason) pre-determined named schema
- use Matrix library to treat the pipes as the source of sparse matrix data (!) (row index, column index, cell value) to perform various processings on huge matrices
Summary
I've been playing with Scalding for few days now, so I'll try to form an initial opinion:
- if you expect Cascading/Scalding to bring you a completely different possibilities than M/R - nah, it doesn't
- if you expect Cascading/Scalding to make you able to do something more (superset) than traditional M/R - nah, it doesn't either
C/S is just a layer of abstraction over M/R - its main value added is being perfectly suited to get composed into functional programs you write: it just feels more natural, concise & doesn't give much overhead. Plus, you're not giving up on the extensibility Hive rejects. Personally, I feel encourage & I think I'll be giving Scalding a go.