Thursday, October 22, 2009

Analysis of the "MapReduce Online" paper

I recently came across a paper entitled MapReduce Online” written by Tyson Condie, Neil Conway, Peter Alvaro, Joe Hellerstein, Khaled Elmeleegy, and Russell Sears at Berkeley (University of California). Since I’m very interested in Hadoop-related research (see my group’s work on HadoopDB) and this Berkeley group have historically produced reliably good research papers, I immediately downloaded the paper and read it carefully. The paper demonstrates the impact of several improvements the group made to Hadoop for interactive queries, and since Hadoop is becoming increasingly popular, I expect this paper to have wide interest. Therefore, I think it might be useful to post a summary of the paper and some analysis on this blog. If you have also read this paper, I encourage discussion in the comment thread of this post.


The authors argue that since MapReduce’s (and therefore Hadoop’s) roots are in batch processing, design decisions were made that are cause problems as Hadoop gets used more and more for interactive query processing. The main problem the paper addresses is how data is transferred between operators --- both between ‘Map’ and ‘Reduce’ operators within a job, and also across multiple MapReduce jobs. This main problem has two subproblems:

  1. Data is pulled by downstream operators from upstream operators (e.g., a Reduce task requests all the data it needs from predecessor Map tasks) instead of pushed from upstream operators to downstream operators.
  2. Every MapReduce operator is a blocking operator. Reduce tasks cannot begin until Map tasks are finished, and Map tasks for the next job cannot get started until the Reduce tasks from the previous job have completed.

The paper does not really distinguish between these two subproblems, but for the purposes of this discussion, I think it is best to separate them.

Subproblem 2 causes issues for interactive queries for four reasons:

  • It eliminates opportunities for pipelined parallelism (where, e.g., Map and Reduce tasks from the same job can be running simultaneously). There are some cases (such as the example query in the paper --- more on that later) where pipelined parallelism can significantly reduce query latency.
  • It causes spikes in network utilization (there is no network traffic until a Map task finishes, and then the entire output must be sent at once to the Reduce nodes). This is particularly problematic if Map tasks across nodes in a cluster are synchronized
  • When every operator is a blocking operator, it is impossible to get early estimations of a query result until the very end of query execution. If data instead can be pipelined through query operators as it is processed, then we can start receiving query results almost immediately, and these early query results might be sufficient for the end user, who can then stop the query before it runs to completion.
  • If the Map part of a MapReduce job reads from a continuous (and infinite) data source instead of from a file, no operator ever “completes”, so a blocking operator that doesn’t produce results until it completes is entirely useless.

Subproblem 1 (pull vs push) is problematic mostly due to performance reasons. Data that is pushed can be shipped as soon as it is produced, while it is still in cache. If it is pulled at a later time, accessing the data often incurs an index lookup and a disk read, which can reduce performance. There is also slightly more coordination (and therefore network traffic) across nodes in the pull model relative to the push model.

The goal of the paper is to fix these subproblems that arise on interactive query workloads, while maintaining Hadoop’s famous query-level fault tolerance (very little query progress is lost upon a node failure). Essentially, this means that we want to continually push data from producer operators to consumer operators on the fly as the producer operator produces output. However, in order to maintain Hadoop’s fault tolerance, data that is pushed is also checkpointed locally to disk before being sent, so that if a downstream node fails, the task assigned to this node can be rescheduled on a different node, which will begin by pulling data from this upstream checkpoint just as standard Hadoop does.

The previous two sentences is the basic idea suggested in the paper. There are some additional details that are also described (1. to avoid having too many TCP connections between Map and Reduce nodes, not all data can be immediately pushed; 2. data that is pushed should be sent in batches and “combined” before being shipped; 3. in order to handle potential Map node failures, Reduce tasks that receive data before Map tasks have completed have to treat this data as “tentative” until the Map task “commits”), but these details do not change the basic idea: push data downstream as it is produced, but still write all data locally to disk before it is shipped.

The authors implement this idea in Hadoop, and then run some experiments to demonstrate how these changes (1) improve query latency (2) enable early approximations of query results (3) enable the use of Hadoop for continuous data stream of data. In other words, all of the problems that Hadoop has for interactive query workloads described above are fixed.


The paper evaluates mainly subproblem (2): every operator in MapReduce is a blocking operator. The main focus of the paper is, in particular, on turning the Map operator from a blocking operator to an operator that can ship query results as they are produced. This technique gets more and more useful the fewer map tasks there are. If there are more nodes than Map tasks (as is the case for every experiment in this paper) then there are entire nodes that have only Reduce tasks assigned to them, and without pipelining, these nodes have to sit around and do nothing until the Map tasks start to finish.

Even if there are as many nodes as Map tasks (so every node can work on a Map task during the Map phase), each node can run a Map task and a Reduce task in parallel and the multicore resources of the node can be more fully utilized if the Reduce task has something to do before the Map task finishes. However, as there are more Map tasks assigned per node (Google reports 200,000 Map tasks for 2,000 nodes in the original MapReduce paper), I would predict that the performance improvement from being able to start the Reduce nodes early gets steadily smaller. I also wonder if the need to send data from a Map task before it completes for the purposes of online aggregation also gets steadily smaller, since the first Map tasks finish at a relatively earlier time in query processing when nodes have to process many Map tasks.

In general, the performance studies that are presented show the best case scenario for the ideas presented in the paper: (1) fewer than 1 Map task assigned per node (2) the Map task does not filter any data so there is a lot of data that must be shipped between the Map and Reduce phases (3) there is no combiner function (4) there are no concurrent MapReduce jobs. Although it is useful to see performance improvement in this best-case scenario, it would also be interesting to see how performance is affected in less ideal settings. I’m particularly interested in seeing what happens when there are many more Map tasks per node.

Even in a less ideal setting I would still expect to see performance improvement from the ideas presented in this paper due to the authors' solution to subproblem 1 (switching from a pull-model to a push-model). It would be great if the experiments could be extended so that the reader can understand how much of the performance improvement is from the authors' solution to subproblem 2 (the Reduce nodes can start early) and how much of the performance improvement is from the disk seeks/reads saved by switching from a pull-model to a push-model.

Another performance related conclusion one can draw from this paper (not mentioned by the authors, but still interesting) is that Hadoop still has plenty of room for improvement from a raw performance perspective. The main task in this paper involved tokenizing and sorting a 5.5 GB file. Hadoop had to use 60 (seriously, 60!) nodes to do this in 900 seconds. The techniques introduced in the paper allowed Hadoop to do this in 600 seconds. But does anyone else think they can write a C program that runs on a single node that runs in half the time on 1/60th of the machines?


The ideas presented in this paper provide clear advantages for interactive query workloads on Hadoop. For this reason, I look forward to the modifications made by the authors making it into the standard Hadoop distribution. Since HadoopDB is able to leverage improvements made to Hadoop, these techniques should be great for HadoopDB as well. While I think the performance advantages in the paper may be a little overstated, I do believe that switching from pull to push will improve performance by at least a small amount for many queries, and the ability to support online aggregation and continuous queries is a nice contribution. My suggestion to the authors would be to spend more time focusing on online aggregation and continuous queries, and less time extolling the performance advantages of their techniques. For example, one question I had about online aggregation is the following: the running aggregation is done using the Map tasks that finish first. But the problem is that Map tasks run on congruous chunks of an HDFS file --- data in the same partition are often more similar to each other than data on other partitions. Hence, might data skew become a problem?

Wednesday, October 14, 2009

Greenplum announces column-oriented storage option

I checked Curt Monash’s blog today (as I do on a somewhat daily basis) and saw a new post announcing Greenplum’s new column-oriented storage option. In my opinion, this is pretty big news. I was amused to see that Curt, in his post, correctly predicted pretty much everything I was going to say, but nonetheless, I feel obligated to post my reactions to this news:

Quick hit reactions:

(1) Congrats to Greenplum and their strong technical team for adding
column-oriented storage. They have essentially created a hybrid storage layer --- now you can store data in rows (write or read optimized) or in columns. I have previously written a long (and surprisingly popular --- 1,638 unique visits) blog post on hybrid column/row-oriented storage schemes. I would put the Greenplum scheme under the “Approach 3: fine-grained hybrids” classification. This puts them in the same category as the Vertica Flexstore scheme (more on this in a second).

(2) I strongly agree with the Greenplum philosophy that religious wars of columns vs rows will not get you anywhere. The fact of the matter is that for some workloads, row-oriented storage can get you an order of magnitude performance improvement relative to column-stores, and for other workloads, column-oriented storage can get you an order of magnitude performance improvement relative to row-stores. Hybrid schemes can theoretically get you the best of both worlds, and that can be a big win.

(3) For a few years now, I've lumped Greenplum and Aster Data together in my data warehouse market world view, even though Greenplum is slightly older and has more customers. Both are software-only solutions, and are big proponents of commodity hardware. Both partner with hardware vendors and will sell you bundled hardware with their software in a pre-optimized appliance if you prefer that to the software-only solution. Both started with vanilla PostgreSQL and turned it into a shared-nothing, MPP analytical DBMS. Both are players in the "big data" game, and heavily market their scalability. Both are west coast companies. Both support in-database MapReduce capabilities (they even announced this on the same day!). Both had research papers in the major DBMS research conferences this year. They both have embraced "the cloud". They even share some of the same customers (e.g. Fox Interactive Media).

The word on the street is that the main difference between the two companies is that Greenplum has made significant modifications to the PostgreSQL code-base and the integration of the DBMS with the distribution (parallelization) layer is more "monolithic", whereas Aster Data has kept more of the original PostgreSQL code, and treats the database more like a black box. This announcement by Greenplum is further evidence that they are more than willing to edit the PostgreSQL code, as significant modifications to the storage layer was required.

(4) Kudos to Greenplum for being open and honest about the limitations of their column-store option. It is a column-store at the storage layer only. This certainly is still a big win for queries that access few attributes from wide tables. However, before I had my own blog, I wrote a guest article for Vertica’s blog explaining that column-oriented storage only gets you part of the way towards column-store performance --- writing a query executor optimized for column-oriented storage can get you an additional order of magnitude performance improvement. I explain this at length in two academic papers (see here and here). I think papers coined the terms "early materialization" and "late materialization" for explaining whether columns are being put back together into tuples (rows) at the beginning or end of execution of a query plan. In retrospect I have regretted using this term ("tuple construction" is a little more descriptive and easier to understand than "tuple materialization"). The fact that Greenplum uses the term “early materialization” to describe their column-oriented scheme is evidence that they’ve read these papers (more kudos!) and will start to implement the low-hanging fruit in the query executor to get increased performance improvement from their column-oriented storage. Hence, I expect that their column-oriented feature (while probably already useful now) will continue to get better in future releases. In the short-term, one should not expect performance approaching regular column-stores.

(5) In my previous blog post on hybrid column/row-oriented storage schemes, I mentioned that it is far easier for a column-store to implement the “Approach 3: fine-grained hybrids” scheme than a row-store, since the row-store would have to make modifications at the storage, query executor, and query optimizer layers, while the column-store (that implements both early and late materialization) would only have to make modifications at the storage layer (since the query executor and optimizer are already capable of working with rows). This would lead me to believe that the Vertica FlexStore scheme is more immediately useful than the Greenplum hybrid storage scheme. But, as I have written before, my history with Vertica gives me some bias, so be careful what you do with that statement.

(6) This has been a pretty big month for column-oriented storage layers. First Oracle announced their new column-oriented compression scheme (classified in the “Approach 1: PAX” category in my previous blog post) and now Greenplum adds column-oriented storage. This should put more pressure on the other vendors in the data warehousing space (especially Microsoft and IBM) to come up with some sort of column-oriented storage option in the near future.