Thursday, October 22, 2009

Analysis of the "MapReduce Online" paper

I recently came across a paper entitled MapReduce Online” written by Tyson Condie, Neil Conway, Peter Alvaro, Joe Hellerstein, Khaled Elmeleegy, and Russell Sears at Berkeley (University of California). Since I’m very interested in Hadoop-related research (see my group’s work on HadoopDB) and this Berkeley group have historically produced reliably good research papers, I immediately downloaded the paper and read it carefully. The paper demonstrates the impact of several improvements the group made to Hadoop for interactive queries, and since Hadoop is becoming increasingly popular, I expect this paper to have wide interest. Therefore, I think it might be useful to post a summary of the paper and some analysis on this blog. If you have also read this paper, I encourage discussion in the comment thread of this post.


The authors argue that since MapReduce’s (and therefore Hadoop’s) roots are in batch processing, design decisions were made that are cause problems as Hadoop gets used more and more for interactive query processing. The main problem the paper addresses is how data is transferred between operators --- both between ‘Map’ and ‘Reduce’ operators within a job, and also across multiple MapReduce jobs. This main problem has two subproblems:

  1. Data is pulled by downstream operators from upstream operators (e.g., a Reduce task requests all the data it needs from predecessor Map tasks) instead of pushed from upstream operators to downstream operators.
  2. Every MapReduce operator is a blocking operator. Reduce tasks cannot begin until Map tasks are finished, and Map tasks for the next job cannot get started until the Reduce tasks from the previous job have completed.

The paper does not really distinguish between these two subproblems, but for the purposes of this discussion, I think it is best to separate them.

Subproblem 2 causes issues for interactive queries for four reasons:

  • It eliminates opportunities for pipelined parallelism (where, e.g., Map and Reduce tasks from the same job can be running simultaneously). There are some cases (such as the example query in the paper --- more on that later) where pipelined parallelism can significantly reduce query latency.
  • It causes spikes in network utilization (there is no network traffic until a Map task finishes, and then the entire output must be sent at once to the Reduce nodes). This is particularly problematic if Map tasks across nodes in a cluster are synchronized
  • When every operator is a blocking operator, it is impossible to get early estimations of a query result until the very end of query execution. If data instead can be pipelined through query operators as it is processed, then we can start receiving query results almost immediately, and these early query results might be sufficient for the end user, who can then stop the query before it runs to completion.
  • If the Map part of a MapReduce job reads from a continuous (and infinite) data source instead of from a file, no operator ever “completes”, so a blocking operator that doesn’t produce results until it completes is entirely useless.

Subproblem 1 (pull vs push) is problematic mostly due to performance reasons. Data that is pushed can be shipped as soon as it is produced, while it is still in cache. If it is pulled at a later time, accessing the data often incurs an index lookup and a disk read, which can reduce performance. There is also slightly more coordination (and therefore network traffic) across nodes in the pull model relative to the push model.

The goal of the paper is to fix these subproblems that arise on interactive query workloads, while maintaining Hadoop’s famous query-level fault tolerance (very little query progress is lost upon a node failure). Essentially, this means that we want to continually push data from producer operators to consumer operators on the fly as the producer operator produces output. However, in order to maintain Hadoop’s fault tolerance, data that is pushed is also checkpointed locally to disk before being sent, so that if a downstream node fails, the task assigned to this node can be rescheduled on a different node, which will begin by pulling data from this upstream checkpoint just as standard Hadoop does.

The previous two sentences is the basic idea suggested in the paper. There are some additional details that are also described (1. to avoid having too many TCP connections between Map and Reduce nodes, not all data can be immediately pushed; 2. data that is pushed should be sent in batches and “combined” before being shipped; 3. in order to handle potential Map node failures, Reduce tasks that receive data before Map tasks have completed have to treat this data as “tentative” until the Map task “commits”), but these details do not change the basic idea: push data downstream as it is produced, but still write all data locally to disk before it is shipped.

The authors implement this idea in Hadoop, and then run some experiments to demonstrate how these changes (1) improve query latency (2) enable early approximations of query results (3) enable the use of Hadoop for continuous data stream of data. In other words, all of the problems that Hadoop has for interactive query workloads described above are fixed.


The paper evaluates mainly subproblem (2): every operator in MapReduce is a blocking operator. The main focus of the paper is, in particular, on turning the Map operator from a blocking operator to an operator that can ship query results as they are produced. This technique gets more and more useful the fewer map tasks there are. If there are more nodes than Map tasks (as is the case for every experiment in this paper) then there are entire nodes that have only Reduce tasks assigned to them, and without pipelining, these nodes have to sit around and do nothing until the Map tasks start to finish.

Even if there are as many nodes as Map tasks (so every node can work on a Map task during the Map phase), each node can run a Map task and a Reduce task in parallel and the multicore resources of the node can be more fully utilized if the Reduce task has something to do before the Map task finishes. However, as there are more Map tasks assigned per node (Google reports 200,000 Map tasks for 2,000 nodes in the original MapReduce paper), I would predict that the performance improvement from being able to start the Reduce nodes early gets steadily smaller. I also wonder if the need to send data from a Map task before it completes for the purposes of online aggregation also gets steadily smaller, since the first Map tasks finish at a relatively earlier time in query processing when nodes have to process many Map tasks.

In general, the performance studies that are presented show the best case scenario for the ideas presented in the paper: (1) fewer than 1 Map task assigned per node (2) the Map task does not filter any data so there is a lot of data that must be shipped between the Map and Reduce phases (3) there is no combiner function (4) there are no concurrent MapReduce jobs. Although it is useful to see performance improvement in this best-case scenario, it would also be interesting to see how performance is affected in less ideal settings. I’m particularly interested in seeing what happens when there are many more Map tasks per node.

Even in a less ideal setting I would still expect to see performance improvement from the ideas presented in this paper due to the authors' solution to subproblem 1 (switching from a pull-model to a push-model). It would be great if the experiments could be extended so that the reader can understand how much of the performance improvement is from the authors' solution to subproblem 2 (the Reduce nodes can start early) and how much of the performance improvement is from the disk seeks/reads saved by switching from a pull-model to a push-model.

Another performance related conclusion one can draw from this paper (not mentioned by the authors, but still interesting) is that Hadoop still has plenty of room for improvement from a raw performance perspective. The main task in this paper involved tokenizing and sorting a 5.5 GB file. Hadoop had to use 60 (seriously, 60!) nodes to do this in 900 seconds. The techniques introduced in the paper allowed Hadoop to do this in 600 seconds. But does anyone else think they can write a C program that runs on a single node that runs in half the time on 1/60th of the machines?


The ideas presented in this paper provide clear advantages for interactive query workloads on Hadoop. For this reason, I look forward to the modifications made by the authors making it into the standard Hadoop distribution. Since HadoopDB is able to leverage improvements made to Hadoop, these techniques should be great for HadoopDB as well. While I think the performance advantages in the paper may be a little overstated, I do believe that switching from pull to push will improve performance by at least a small amount for many queries, and the ability to support online aggregation and continuous queries is a nice contribution. My suggestion to the authors would be to spend more time focusing on online aggregation and continuous queries, and less time extolling the performance advantages of their techniques. For example, one question I had about online aggregation is the following: the running aggregation is done using the Map tasks that finish first. But the problem is that Map tasks run on congruous chunks of an HDFS file --- data in the same partition are often more similar to each other than data on other partitions. Hence, might data skew become a problem?


  1. Hi Daniel,
    Nice post!

    We had a chance to talk in-depth with Tyson and Neil (see ) and they have other related improvements in mind. Among other things, they are working on algorithms that will allow them to identify the "optimal degree of parallelism" -- akin to a continuous/adaptive optimizer.

    Any new developments in the HadoopDB front?

  2. Ben,

    They also alluded to some future work on adaptive optimization in their MapReduce Online paper. I'm glad that they're working on this --- I have no doubt that their results will have immediate impact.

    Your writeup of your chat with Tyson and Neil is high quality (as usual). I appreciate that you make an effort to write about some of the work that's being done in academic settings (including your writeup of HadoopDB).

    BTW: the O'Reilly books you sent me on "Beautiful Data" and "Hadoop: The Definitive Guide" are very popular around the office.


  3. Oh, and wrt HadoopDB updates, there's nothing we're ready to talk about (research-wise) publicly. Development-wise, we're working on a a roadmap that will uploaded to SourceForce soon.

  4. Dan:

    I like the close analysis! Your good work may haunt you -- program committee chairs are taking notice, this could increase your workload :-)

    There are a bunch of technical issues people could debate about maximizing batch performance via a mix of pipeline and partition parallelism. That's fun. But we explicitly didn't dive into that study. The potential batch-processing benefits of pipelining here are sort of a "freebie" or side-benefit. The batch performance experiment in the paper is just there to illustrate that added potential.

    The real message of the paper is architectural: pipelining can be made to coexist peacefully with Dean and Ghemawat's checkpoint/restart model. This is big, because it enables significant new user-facing features within MapReduce. You can have a crystal ball into the final results of long-running jobs (a la online aggregation). You can do continuous stream processing. And you keep the clean FT/performance-availability features that made the original Google work on MapReduce interesting.

    So I hope the paper gets our big picture across, in addition to sparking good technical followup discussion on performance.

    I do encourage interested folks to dive in and write the SIGMETRICS paper you're hinting at here. M/R scheduling has a bunch more degrees of freedom than the Exchange-based scheduling studied in the DB literature, due to the decoupling of pipeline stages. (DBs traditionally spread every task like peanut butter across all nodes in a cluster, and iterators synchronize individual nodes in a stylized manner.) Hadoop decouples all that, and our pipelining adds yet another dimension to that scheduling problem.

    One challenge in pursuing a performance-oriented research agenda is that Hadoop is still young, and there are a bunch of simple things that could be tuned that might move the needle on performance (esp resource utilization) more than anything that requires serious science. We'll live through an era of papers like that -- it's forward progress for sure and the beauty of open-source-meets-academia. But I'm not sure I'd encourage you to spend time on it yourself.

    - Joe Hellerstein

    PS: One important correction: Khaled and Rusty are both at Yahoo Research, and we're very happy to have them on board on this!

  5. FYI, we've released the source code for the Hadoop Online Prototype (HOP). It can be found at

  6. Can it be used for interactive query processing? I mean, can we run map/reduce tasks continuously to eliminate start up job/tasks overhead?

    -Vladimir Rodionov

  7. Vladimir: Yes, MapReduce Online can be used for that. We discuss this idea in Section 5 of the technical report.