Overview
The authors argue that since MapReduce’s (and therefore Hadoop’s) roots are in batch processing, design decisions were made that are cause problems as Hadoop gets used more and more for interactive query processing. The main problem the paper addresses is how data is transferred between operators --- both between ‘Map’ and ‘Reduce’ operators within a job, and also across multiple MapReduce jobs. This main problem has two subproblems:
- Data is pulled by downstream operators from upstream operators (e.g., a Reduce task requests all the data it needs from predecessor Map tasks) instead of pushed from upstream operators to downstream operators.
- Every MapReduce operator is a blocking operator. Reduce tasks cannot begin until Map tasks are finished, and Map tasks for the next job cannot get started until the Reduce tasks from the previous job have completed.
The paper does not really distinguish between these two subproblems, but for the purposes of this discussion, I think it is best to separate them.
Subproblem 2 causes issues for interactive queries for four reasons:
- It eliminates opportunities for pipelined parallelism (where, e.g., Map and Reduce tasks from the same job can be running simultaneously). There are some cases (such as the example query in the paper --- more on that later) where pipelined parallelism can significantly reduce query latency.
- It causes spikes in network utilization (there is no network traffic until a Map task finishes, and then the entire output must be sent at once to the Reduce nodes). This is particularly problematic if Map tasks across nodes in a cluster are synchronized
- When every operator is a blocking operator, it is impossible to get early estimations of a query result until the very end of query execution. If data instead can be pipelined through query operators as it is processed, then we can start receiving query results almost immediately, and these early query results might be sufficient for the end user, who can then stop the query before it runs to completion.
- If the Map part of a MapReduce job reads from a continuous (and infinite) data source instead of from a file, no operator ever “completes”, so a blocking operator that doesn’t produce results until it completes is entirely useless.
Subproblem 1 (pull vs push) is problematic mostly due to performance reasons. Data that is pushed can be shipped as soon as it is produced, while it is still in cache. If it is pulled at a later time, accessing the data often incurs an index lookup and a disk read, which can reduce performance. There is also slightly more coordination (and therefore network traffic) across nodes in the pull model relative to the push model.
The goal of the paper is to fix these subproblems that arise on interactive query workloads, while maintaining Hadoop’s famous query-level fault tolerance (very little query progress is lost upon a node failure). Essentially, this means that we want to continually push data from producer operators to consumer operators on the fly as the producer operator produces output. However, in order to maintain Hadoop’s fault tolerance, data that is pushed is also checkpointed locally to disk before being sent, so that if a downstream node fails, the task assigned to this node can be rescheduled on a different node, which will begin by pulling data from this upstream checkpoint just as standard Hadoop does.
The previous two sentences is the basic idea suggested in the paper. There are some additional details that are also described (1. to avoid having too many TCP connections between Map and Reduce nodes, not all data can be immediately pushed; 2. data that is pushed should be sent in batches and “combined” before being shipped; 3. in order to handle potential Map node failures, Reduce tasks that receive data before Map tasks have completed have to treat this data as “tentative” until the Map task “commits”), but these details do not change the basic idea: push data downstream as it is produced, but still write all data locally to disk before it is shipped.
The authors implement this idea in Hadoop, and then run some experiments to demonstrate how these changes (1) improve query latency (2) enable early approximations of query results (3) enable the use of Hadoop for continuous data stream of data. In other words, all of the problems that Hadoop has for interactive query workloads described above are fixed.
Analysis
The paper evaluates mainly subproblem (2): every operator in MapReduce is a blocking operator. The main focus of the paper is, in particular, on turning the Map operator from a blocking operator to an operator that can ship query results as they are produced. This technique gets more and more useful the fewer map tasks there are. If there are more nodes than Map tasks (as is the case for every experiment in this paper) then there are entire nodes that have only Reduce tasks assigned to them, and without pipelining, these nodes have to sit around and do nothing until the Map tasks start to finish.
Even if there are as many nodes as Map tasks (so every node can work on a Map task during the Map phase), each node can run a Map task and a Reduce task in parallel and the multicore resources of the node can be more fully utilized if the Reduce task has something to do before the Map task finishes. However, as there are more Map tasks assigned per node (Google reports 200,000 Map tasks for 2,000 nodes in the original MapReduce paper), I would predict that the performance improvement from being able to start the Reduce nodes early gets steadily smaller. I also wonder if the need to send data from a Map task before it completes for the purposes of online aggregation also gets steadily smaller, since the first Map tasks finish at a relatively earlier time in query processing when nodes have to process many Map tasks.
In general, the performance studies that are presented show the best case scenario for the ideas presented in the paper: (1) fewer than 1 Map task assigned per node (2) the Map task does not filter any data so there is a lot of data that must be shipped between the Map and Reduce phases (3) there is no combiner function (4) there are no concurrent MapReduce jobs. Although it is useful to see performance improvement in this best-case scenario, it would also be interesting to see how performance is affected in less ideal settings. I’m particularly interested in seeing what happens when there are many more Map tasks per node.
Even in a less ideal setting I would still expect to see performance improvement from the ideas presented in this paper due to the authors' solution to subproblem 1 (switching from a pull-model to a push-model). It would be great if the experiments could be extended so that the reader can understand how much of the performance improvement is from the authors' solution to subproblem 2 (the Reduce nodes can start early) and how much of the performance improvement is from the disk seeks/reads saved by switching from a pull-model to a push-model.
Another performance related conclusion one can draw from this paper (not mentioned by the authors, but still interesting) is that Hadoop still has plenty of room for improvement from a raw performance perspective. The main task in this paper involved tokenizing and sorting a 5.5 GB file. Hadoop had to use 60 (seriously, 60!) nodes to do this in 900 seconds. The techniques introduced in the paper allowed Hadoop to do this in 600 seconds. But does anyone else think they can write a C program that runs on a single node that runs in half the time on 1/60th of the machines?
Conclusion
The ideas presented in this paper provide clear advantages for interactive query workloads on Hadoop. For this reason, I look forward to the modifications made by the authors making it into the standard Hadoop distribution. Since HadoopDB is able to leverage improvements made to Hadoop, these techniques should be great for HadoopDB as well. While I think the performance advantages in the paper may be a little overstated, I do believe that switching from pull to push will improve performance by at least a small amount for many queries, and the ability to support online aggregation and continuous queries is a nice contribution. My suggestion to the authors would be to spend more time focusing on online aggregation and continuous queries, and less time extolling the performance advantages of their techniques. For example, one question I had about online aggregation is the following: the running aggregation is done using the Map tasks that finish first. But the problem is that Map tasks run on congruous chunks of an HDFS file --- data in the same partition are often more similar to each other than data on other partitions. Hence, might data skew become a problem?