Wednesday, January 20, 2010

New England Database Summit 2010 Program

I just finished putting together the program for New England Database Summit, 2010 (thanks to the PC: Yanlei Diao, Olga Papaemmanouil, and Elke Rundensteiner). The schedule is really packed this year, with three keynotes/invited talks, eight technical session talks, and approximately thirty posters from students and researchers. The featured talks are from Raghu Ramakrishnan (Chief Scientist for Audience & Cloud Computing at Yahoo!), Curt Monash (President, Monash Research), and C. Mohan (IBM Fellow and IBM India Chief Scientist).

Registration for New England Database Summit is free (thanks Netezza!), but we need to know how much food, coffee, appetizers, and drinks to order (breakfast and lunch is included in the free admission, and there will be free beer and wine during the poster session), so registration will be closed after 5PM on Friday, January 22nd.

In the past we've had 200+ attendees, so it's a great opportunity to come network with researchers, academics, and database professionals from all over New England.

A summary of the current program is listed below. However, the NEDBSummit Website has more details (including talk abstracts).


Time Event
9:00 AM Welcoming remarks
9:10-10:10 Raghu Ramakrishnan (Chief
Scientist for Audience & Cloud Computing at Yahoo!) Cloud Data Serving

10:10-10:35 Coffee Break
Technical Session 1
10:35-10:55 Carlo Curino, Evan Jones, Yang
Zhang, Eugene Wu, Sam Madden RelationalCloud: The case for a database
service

10:55-11:15 Mike Dirolf An Introduction to MongoDB
11:15-11:35 Elke Rundensteiner, R. Nehme,
and E. Bertino The Query Mesh Project: A
Powerful Multi-Route Query Processing Paradigm

11:35-11:55 Andy Pavlo MapReduce and Parallel
DBMSs: Together At Last

11:55-12:15 Gregory Malecha, Greg
Morrisett, Avraham Shinnar, and Ryan Wisnesky Toward a Verified
Relational Database Management System

12:15 PM Lunch
1:10-2:10 Curt Monash (President, Monash Research). Database and analytic technology: The state of the union

Technical Session 2
2:10-2:30 Paul Brown SciDB: Massively
Parallel Array Data Storage, Processing and Analysis

2:30-2:50 Coffee Break
2:50-3:10 Julia Stoyanovich, William Mee,
Kenneth A. Ross Semantic Ranking and Result Visualization for Life
Sciences Publications

3:10-3:30 Mirek Riedewald, Alper Okcan,
Daniel Fink Scalable Search and Ranking for Scientific Data
3:30-4:30 C. Mohan (IBM Fellow and
IBM India Chief Scientist). Implications of Storage Class
Memories (SCMs) on Software Architectures

4:30 PM Poster Session and Appetizers / Drinks (Building 32, R&D Area, 4th Floor)
6:00 PM Adjourn

Thursday, January 7, 2010

Exadata's columnar compression

I recently came across a nice whitepaper from Oracle that describes the Exadata columnar compression scheme. I wrote up a brief overview of Oracle's columnar compression in the past (in my hybrid storage layout post), but I was pleased to see the whitepaper give some additional details. Before discussing the main content of the whitepaper, I want to correct a few technical inaccuracies in the article:

"Storing column data together, with the same data type and similar characteristics, drastically increases the storage savings achieved from compression. However, storing data in this manner can negatively impact database performance when application queries access more than one or two columns, perform even a modest number of updates, or insert small numbers of rows per transaction."

The first part of the above statement is usually true; columns tend to be self-similar since they contain data from the same attribute domain, and therefore laying out columns contiguously on storage typically yields lower entropy and higher compression ratios than storing rows contiguously. However, the second part of the statement is quite misleading and only true for the most naïve of column-oriented layout schemes. True, the benefit of storing data in columns decreases as a query accesses a higher percentage of columns from a table. However, the number of columns that need to be accessed such that storing data in columns impacts performance negatively relative to a storing data in rows is quite a bit larger than one or two columns. There’s a VLDB 2006 paper by Stavros Harizopoulos et. al. that runs a bunch of benchmarks to understand this tradeoff in more detail. There are a bunch of factors that affect whether column-storage or row-storage is best beyond simply the number of columns accessed (e.g. prefetch size, query selectivity, compression types), most notably the fact that column-stores struggle on needle-in-a-haystack queries since the multiple I/Os needed to get data from different columns are not amortized across the retrieval of multiple rows, but the bottom line is that the query space where pure column-oriented layout outperforms pure row-oriented layout is quite a bit larger than what the Oracle whitepaper claims. Regarding updates and inserts, the VLDB 2005 C-Store paper and OSDI 2006 Google Bigtable paper discuss how to alleviate this column-store disadvantage by temporarily storing the updates in a write-optimized store.

"Oracle’s Exadata Hybrid Columnar Compression technology is a new method for organizing data within a database block."

I feel that this is not giving enough credit to the VLDB 2001 paper by Ailamaki et. al. on PAX. Even though the PAX paper didn’t apply compression to this layout, and PAX didn’t include multiple blocks inside a single compression unit, the basic idea is the same.

One of the key benefits of the Exadata Hybrid Columnar approach is that it provides both the compression and performance benefits of columnar storage without sacrificing the robust feature set of the Oracle Database. For example, while optimized for scan-level access, because row data is self-contained within compression units, Oracle is still able to provide efficient row-level access, with entire rows typically being retrieved with a single I/O.

To people who have even a rudimentary knowledge of column-stores, the juxtaposed statements “entire rows being retrieved with a single I/O” and “provides the performance benefits of columnar storage” is clearly inaccurate. The most often cited advantage of column-stores (indeed cited even more often than the compression advantages) is that if you have a query that has to scan two out of twenty columns in a table (i.e. “tell me the sum of revenue per store”), a column-store only has to read in those two columns while a row-store has to scan the entire table (because multiple entire rows are picked up with each I/O). Hence, a column-store is very I/O efficient, an increasingly important bottleneck in database performance (albeit somewhat reduced with good compression algorithms). Again, Oracle’s statement is true for needle-in-a-haystack queries, but false for the more frequent scan-based queries one finds in data warehouse workloads (which is where Exadata is primarily marketed).


Despite the above overstatements, it is clear that Exadata’s Hybrid Columnar scheme is a big win (relative to vanilla Exadata) for most Exadata datasets and workloads, for the following reasons:

  1. Storage costs are lowered by having a smaller data footprint.
  2. I/O performance is improved by having to read less data from storage.
  3. Data is kept compressed as it is scanned in the storage layer. Selections and projections performed in the storage layer can be performed directly on the compressed data. (Those of you who are familiar with my academic work will not be surprised I like this, given that I wrote a paper on integrating compression and execution for column-oriented storage layouts in a SIGMOD 2006 paper). This yields all the storage costs savings and runtime I/O savings of compression without the performance disadvantage of runtime decompression.

    The whitepaper is vague when it comes to the question of whether
    data remains in compressed form after it is scanned in the Exadata Storage Servers as it is shipped over the Infiniband network to Oracle RAC for further processing. It appears from the way I read the paper (I could be wrong about this) that data is decompressed before it is shipped over the network, which means that Oracle is not gaining all of the potential performance from operating directly on compressed data, as data does have to be decompressed eventually, though the amount of data that needs to be decompressed is smaller, since selections and projections have occurred. If Oracle RAC could operate directly on the columnar compressed format, then decompression would never have to occur, which would further increase performance (query results would obviously have to be decompressed, but usually the magnitude of query results is much smaller than the magnitude of data processed to produce them). Not only is Oracle missing out on potential performance by decompressing data before sending it over the network, but also network bottlenecks could be introduced.
  4. If storage costs are the most important consideration (e.g. for historical data that would otherwise be archived to tape or other offline device), there’s a knob that allows you to further increase compression at the cost of having to decompress data at runtime (and this decompression is heavyweight), thereby reducing runtime performance. Furthermore, the same table can be compressed using the lightweight (high-performance) compression and heavyweight (low-performance) compression using Oracle’s partitioning feature (e.g. you could partition by date and have the historical partitions use the heavyweight compression scheme and the active partitions use the lightweight compression scheme). Note that lightweight still yields good compression --- Oracle claims up to 10X, but this is obviously going to be very dependent on your data.

The most interesting part of the whitepaper was the figure of a compression unit on page four. It is clear that one complication that arises from compressing each column separately is that for a fixed number of rows (e.g. tuples numbered 1-1000), the column sizes are going to vary wildly (depending on how compressible the data from each column is). Figuring out how to fit all columns from a set of rows into a fixed-size block (or set of blocks inside a compression unit) without leaving large chunks of the block empty is a nontrivial optimization problem, and something that Oracle probably had to think about in detail in their implementation.

Anyway, if you’re interested in Oracle’s approach to hybrid columnar storage, the whitepaper is worth a read. With rumors of additional developments in Oracle’s hybrid columnar storage starting to surface, it is likely that this will not be my last blog post on this subject .


Wednesday, December 30, 2009

2009's top blog posts

Below are my top six blog postings of 2009, in order of the number of page views as calculated by Google Analytics:

  1. Announcing release of HadoopDB (longer version), and (shorter version). Combined visits: 31,228 (26,650 and 4,638 for the longer and shorter versions respectively).

    The post gave an overview of the HadoopDB project that was released from my research group at Yale over the summer. The basic idea is to combine the scalability and ease-of-use of Hadoop with the performance on structured data of relational database systems.

  2. A tour through hybrid column/row-oriented DBMS schemes. 2,922 visits.

    This might be the post I'm most proud of, from the ones on this list. It went through different ways one can combine row-store and column-store technology in a single DBMS. I believe that hybrid database systems along the lines described in the post are going to take off in 2010, and the predictions made in the post will soon come to fruition.

  3. ParAccel and their puzzling TPC-H results. 2,499 visits.

    Of the posts on this list, this is the one I like the least, and the one that needs to be rewritten the most. I don't recommend reading it, but if you do, make sure you read the corrections in the comment thread in addition to the main article. However, the post is very stale at this point, since it discusses some TPC-H results from ParAccel that were challenged by a competitor and ParAccel later withdrew in September. ParAccel has not yet rerun these 30TB TPC-H benchmark results.

  4. Watch out for VectorWise. 1,595 visits.

    This post discussed the technology behind Ingres' new column-store storage engine designed for analytical workloads, based on some research out of CWI. I am very high on this technology and my research group has had a chance to play around with it a little this winter.

  5. Analysis of the "MapReduce Online" paper. 1,108 visits.

    I'm surprised this post got so many visits, since it was really geared only for readers in the research community. This post reviewed some research performed at the University of California, Berkeley, which explored how to pipeline results from different phases in MapReduce jobs to improve performance and enable early estimations of results. Rumor has it that this paper was accepted to NSDI 2010. I think the model of releasing papers as technical reports and independently reviewing and recommending them in public on venues like blogs is an interesting model to consider for the next decade, rather than the private 'accept' or 'reject' reviewing process currently used today.

  6. Kickfire’s approach to parallelism. 1,042 visits.

    This post takes a deeper dive into Kickfire's technology than you'll find in most other places. I find the way that they use FPGA technology to maximize the parallelism that can be achieved on analytical queries in a single-box machine to be quite impressive.


The post from 2009 that I feel is the most underrated (meaning that the number of visits did not match up with what I felt was the quality of the post) was:
  • What is the right way to measure scale?

    I really feel that people thought about scale in the wrong way in 2009. People assume that if a database can fit a lot of data inside of it (i.e. many petabytes), it must be really scalable. But if the data is not very accessible (i.e. it is stored on slow media, or it takes forever to scan through it all because there are not enough disk spindles or CPUs to process it), then the system is not nearly as scalable as it would seem.


Bottom line, if you only have time to read three of my postings from 2009, I would like them to be:

  1. The longer HadoopDB post (this is the only post about my own research)
  2. The hybrid column/row-store post
  3. The post on measuring scale
Overall, I'm pleased with the impact my blog seems to have had, and I intend to continue write posts for it in 2010.

Tuesday, November 24, 2009

Deadlines approaching for two upcoming summits

There are two upcoming events that I suspect will be of interest to readers of this blog.


  • First, the first annual ACM Symposium on Cloud Computing 2010 (ACM SOCC 2010) will be held June 10th and 11th in Indianapolis, IN (co-located with SIGMOD 2010). This symposium will focus on systems and data management issues within the context of cloud computing. If the quality of the papers comes close to matching the quality of the people listed as organizers and program committee members of the event, then this will be a can't-miss highlight of 2010. If you are doing research in software as a service, virtualization, or scalable cloud data services, this venue will likely be a nice, high-profile place to publish your findings (the paper submission deadline is January 15th, 2010).

  • Second, on January 28th, 2010, the third annual New England Database Summit will be held at MIT in Cambridge, Massachusetts. This will be an all day conference-style event where participants from the research community and industry in the New England area can come together to present ideas and discuss their research and experiences. Registration for the event is free (thank you Netezza), and anyone is welcome to attend. The event will feature a keynote talk from Raghu Ramakrishnan (Chief Scientist for Audience & Cloud Computing at Yahoo!) and an invited talk from Curt Monash (President, Monash Research).

    I'm serving as PC Chair for the event this year, but I don't expect to make any radical changes to how the summit has been run in previous years, with the day beginning with a keynote talk, followed by a series of 15-25 minute talks from different summit participants (submit a talk abstract here if you want to give a talk --- the program committee will select the set of talks from this pool of abstracts based on what we expect will appeal most to the summit audience), followed by another invited talk and then a poster session over appetizers and drinks at the end of the day.

    We had over 300 registered participants at last year's event, reflecting the vibrancy of database systems activity in the New England area. We expect similar numbers at this year's event. Lunch, drinks, and appetizers are all included for free.

    Poster and talk proposal submissions must be made by January 11, 2010. All reasonable posters will be accepted; talk invitations will be made by January 20, 2010.

Thursday, October 22, 2009

Analysis of the "MapReduce Online" paper

I recently came across a paper entitled MapReduce Online” written by Tyson Condie, Neil Conway, Peter Alvaro, Joe Hellerstein, Khaled Elmeleegy, and Russell Sears at Berkeley (University of California). Since I’m very interested in Hadoop-related research (see my group’s work on HadoopDB) and this Berkeley group have historically produced reliably good research papers, I immediately downloaded the paper and read it carefully. The paper demonstrates the impact of several improvements the group made to Hadoop for interactive queries, and since Hadoop is becoming increasingly popular, I expect this paper to have wide interest. Therefore, I think it might be useful to post a summary of the paper and some analysis on this blog. If you have also read this paper, I encourage discussion in the comment thread of this post.

Overview

The authors argue that since MapReduce’s (and therefore Hadoop’s) roots are in batch processing, design decisions were made that are cause problems as Hadoop gets used more and more for interactive query processing. The main problem the paper addresses is how data is transferred between operators --- both between ‘Map’ and ‘Reduce’ operators within a job, and also across multiple MapReduce jobs. This main problem has two subproblems:

  1. Data is pulled by downstream operators from upstream operators (e.g., a Reduce task requests all the data it needs from predecessor Map tasks) instead of pushed from upstream operators to downstream operators.
  2. Every MapReduce operator is a blocking operator. Reduce tasks cannot begin until Map tasks are finished, and Map tasks for the next job cannot get started until the Reduce tasks from the previous job have completed.

The paper does not really distinguish between these two subproblems, but for the purposes of this discussion, I think it is best to separate them.

Subproblem 2 causes issues for interactive queries for four reasons:

  • It eliminates opportunities for pipelined parallelism (where, e.g., Map and Reduce tasks from the same job can be running simultaneously). There are some cases (such as the example query in the paper --- more on that later) where pipelined parallelism can significantly reduce query latency.
  • It causes spikes in network utilization (there is no network traffic until a Map task finishes, and then the entire output must be sent at once to the Reduce nodes). This is particularly problematic if Map tasks across nodes in a cluster are synchronized
  • When every operator is a blocking operator, it is impossible to get early estimations of a query result until the very end of query execution. If data instead can be pipelined through query operators as it is processed, then we can start receiving query results almost immediately, and these early query results might be sufficient for the end user, who can then stop the query before it runs to completion.
  • If the Map part of a MapReduce job reads from a continuous (and infinite) data source instead of from a file, no operator ever “completes”, so a blocking operator that doesn’t produce results until it completes is entirely useless.

Subproblem 1 (pull vs push) is problematic mostly due to performance reasons. Data that is pushed can be shipped as soon as it is produced, while it is still in cache. If it is pulled at a later time, accessing the data often incurs an index lookup and a disk read, which can reduce performance. There is also slightly more coordination (and therefore network traffic) across nodes in the pull model relative to the push model.

The goal of the paper is to fix these subproblems that arise on interactive query workloads, while maintaining Hadoop’s famous query-level fault tolerance (very little query progress is lost upon a node failure). Essentially, this means that we want to continually push data from producer operators to consumer operators on the fly as the producer operator produces output. However, in order to maintain Hadoop’s fault tolerance, data that is pushed is also checkpointed locally to disk before being sent, so that if a downstream node fails, the task assigned to this node can be rescheduled on a different node, which will begin by pulling data from this upstream checkpoint just as standard Hadoop does.

The previous two sentences is the basic idea suggested in the paper. There are some additional details that are also described (1. to avoid having too many TCP connections between Map and Reduce nodes, not all data can be immediately pushed; 2. data that is pushed should be sent in batches and “combined” before being shipped; 3. in order to handle potential Map node failures, Reduce tasks that receive data before Map tasks have completed have to treat this data as “tentative” until the Map task “commits”), but these details do not change the basic idea: push data downstream as it is produced, but still write all data locally to disk before it is shipped.

The authors implement this idea in Hadoop, and then run some experiments to demonstrate how these changes (1) improve query latency (2) enable early approximations of query results (3) enable the use of Hadoop for continuous data stream of data. In other words, all of the problems that Hadoop has for interactive query workloads described above are fixed.

Analysis

The paper evaluates mainly subproblem (2): every operator in MapReduce is a blocking operator. The main focus of the paper is, in particular, on turning the Map operator from a blocking operator to an operator that can ship query results as they are produced. This technique gets more and more useful the fewer map tasks there are. If there are more nodes than Map tasks (as is the case for every experiment in this paper) then there are entire nodes that have only Reduce tasks assigned to them, and without pipelining, these nodes have to sit around and do nothing until the Map tasks start to finish.

Even if there are as many nodes as Map tasks (so every node can work on a Map task during the Map phase), each node can run a Map task and a Reduce task in parallel and the multicore resources of the node can be more fully utilized if the Reduce task has something to do before the Map task finishes. However, as there are more Map tasks assigned per node (Google reports 200,000 Map tasks for 2,000 nodes in the original MapReduce paper), I would predict that the performance improvement from being able to start the Reduce nodes early gets steadily smaller. I also wonder if the need to send data from a Map task before it completes for the purposes of online aggregation also gets steadily smaller, since the first Map tasks finish at a relatively earlier time in query processing when nodes have to process many Map tasks.

In general, the performance studies that are presented show the best case scenario for the ideas presented in the paper: (1) fewer than 1 Map task assigned per node (2) the Map task does not filter any data so there is a lot of data that must be shipped between the Map and Reduce phases (3) there is no combiner function (4) there are no concurrent MapReduce jobs. Although it is useful to see performance improvement in this best-case scenario, it would also be interesting to see how performance is affected in less ideal settings. I’m particularly interested in seeing what happens when there are many more Map tasks per node.

Even in a less ideal setting I would still expect to see performance improvement from the ideas presented in this paper due to the authors' solution to subproblem 1 (switching from a pull-model to a push-model). It would be great if the experiments could be extended so that the reader can understand how much of the performance improvement is from the authors' solution to subproblem 2 (the Reduce nodes can start early) and how much of the performance improvement is from the disk seeks/reads saved by switching from a pull-model to a push-model.

Another performance related conclusion one can draw from this paper (not mentioned by the authors, but still interesting) is that Hadoop still has plenty of room for improvement from a raw performance perspective. The main task in this paper involved tokenizing and sorting a 5.5 GB file. Hadoop had to use 60 (seriously, 60!) nodes to do this in 900 seconds. The techniques introduced in the paper allowed Hadoop to do this in 600 seconds. But does anyone else think they can write a C program that runs on a single node that runs in half the time on 1/60th of the machines?

Conclusion

The ideas presented in this paper provide clear advantages for interactive query workloads on Hadoop. For this reason, I look forward to the modifications made by the authors making it into the standard Hadoop distribution. Since HadoopDB is able to leverage improvements made to Hadoop, these techniques should be great for HadoopDB as well. While I think the performance advantages in the paper may be a little overstated, I do believe that switching from pull to push will improve performance by at least a small amount for many queries, and the ability to support online aggregation and continuous queries is a nice contribution. My suggestion to the authors would be to spend more time focusing on online aggregation and continuous queries, and less time extolling the performance advantages of their techniques. For example, one question I had about online aggregation is the following: the running aggregation is done using the Map tasks that finish first. But the problem is that Map tasks run on congruous chunks of an HDFS file --- data in the same partition are often more similar to each other than data on other partitions. Hence, might data skew become a problem?

Wednesday, October 14, 2009

Greenplum announces column-oriented storage option

I checked Curt Monash’s blog today (as I do on a somewhat daily basis) and saw a new post announcing Greenplum’s new column-oriented storage option. In my opinion, this is pretty big news. I was amused to see that Curt, in his post, correctly predicted pretty much everything I was going to say, but nonetheless, I feel obligated to post my reactions to this news:

Quick hit reactions:

(1) Congrats to Greenplum and their strong technical team for adding
column-oriented storage. They have essentially created a hybrid storage layer --- now you can store data in rows (write or read optimized) or in columns. I have previously written a long (and surprisingly popular --- 1,638 unique visits) blog post on hybrid column/row-oriented storage schemes. I would put the Greenplum scheme under the “Approach 3: fine-grained hybrids” classification. This puts them in the same category as the Vertica Flexstore scheme (more on this in a second).

(2) I strongly agree with the Greenplum philosophy that religious wars of columns vs rows will not get you anywhere. The fact of the matter is that for some workloads, row-oriented storage can get you an order of magnitude performance improvement relative to column-stores, and for other workloads, column-oriented storage can get you an order of magnitude performance improvement relative to row-stores. Hybrid schemes can theoretically get you the best of both worlds, and that can be a big win.

(3) For a few years now, I've lumped Greenplum and Aster Data together in my data warehouse market world view, even though Greenplum is slightly older and has more customers. Both are software-only solutions, and are big proponents of commodity hardware. Both partner with hardware vendors and will sell you bundled hardware with their software in a pre-optimized appliance if you prefer that to the software-only solution. Both started with vanilla PostgreSQL and turned it into a shared-nothing, MPP analytical DBMS. Both are players in the "big data" game, and heavily market their scalability. Both are west coast companies. Both support in-database MapReduce capabilities (they even announced this on the same day!). Both had research papers in the major DBMS research conferences this year. They both have embraced "the cloud". They even share some of the same customers (e.g. Fox Interactive Media).

The word on the street is that the main difference between the two companies is that Greenplum has made significant modifications to the PostgreSQL code-base and the integration of the DBMS with the distribution (parallelization) layer is more "monolithic", whereas Aster Data has kept more of the original PostgreSQL code, and treats the database more like a black box. This announcement by Greenplum is further evidence that they are more than willing to edit the PostgreSQL code, as significant modifications to the storage layer was required.

(4) Kudos to Greenplum for being open and honest about the limitations of their column-store option. It is a column-store at the storage layer only. This certainly is still a big win for queries that access few attributes from wide tables. However, before I had my own blog, I wrote a guest article for Vertica’s blog explaining that column-oriented storage only gets you part of the way towards column-store performance --- writing a query executor optimized for column-oriented storage can get you an additional order of magnitude performance improvement. I explain this at length in two academic papers (see here and here). I think papers coined the terms "early materialization" and "late materialization" for explaining whether columns are being put back together into tuples (rows) at the beginning or end of execution of a query plan. In retrospect I have regretted using this term ("tuple construction" is a little more descriptive and easier to understand than "tuple materialization"). The fact that Greenplum uses the term “early materialization” to describe their column-oriented scheme is evidence that they’ve read these papers (more kudos!) and will start to implement the low-hanging fruit in the query executor to get increased performance improvement from their column-oriented storage. Hence, I expect that their column-oriented feature (while probably already useful now) will continue to get better in future releases. In the short-term, one should not expect performance approaching regular column-stores.

(5) In my previous blog post on hybrid column/row-oriented storage schemes, I mentioned that it is far easier for a column-store to implement the “Approach 3: fine-grained hybrids” scheme than a row-store, since the row-store would have to make modifications at the storage, query executor, and query optimizer layers, while the column-store (that implements both early and late materialization) would only have to make modifications at the storage layer (since the query executor and optimizer are already capable of working with rows). This would lead me to believe that the Vertica FlexStore scheme is more immediately useful than the Greenplum hybrid storage scheme. But, as I have written before, my history with Vertica gives me some bias, so be careful what you do with that statement.

(6) This has been a pretty big month for column-oriented storage layers. First Oracle announced their new column-oriented compression scheme (classified in the “Approach 1: PAX” category in my previous blog post) and now Greenplum adds column-oriented storage. This should put more pressure on the other vendors in the data warehousing space (especially Microsoft and IBM) to come up with some sort of column-oriented storage option in the near future.

Sunday, September 13, 2009

Kickfire’s approach to parallelism

I was chatting with Raj Cherabuddi, founder of Kickfire recently about Kickfire’s approach to parallelism, and I think that some of the problems they have to deal with regard to parallelizing queries are quite different from standard parallel database systems, and warrant talking about in a blog post.

Parallel databases typically achieve parallelism via “data-partitioned parallelism”. The basic idea is that data is horizontally partitioned across processors, and each processor executes a query on its own local partition of the data. For example, let’s say a user wants to find out how many items were sold over $50 on August 8th 2009:

SELECT count(*)
FROM lineitem
WHERE price > $50 and date = ‘08/08/09’.

If elements of the line item table are partitioned across different processors, then each processor will execute the complete query locally, computing a count of all tuples that pass the predicates on its partition of the data. These counts can then be combined in a “merge” final step into a global count. The vast majority of the query (everything except the very short final merge step) can be processed completely in parallel across processors.

Let’s assume that there are no helpful indexes for this query. A naïve implementation would use the iterator model to implement this query on each processor. The query plan would consist of three operators: a table scan operator, a selection operator (for simplicity, let's assume it is separate from the scan operator), and an aggregation (count) operator. The aggregation operator would call “getNext” on its child operator (the selection operator), and the selection operator would in turn call getNext on its child operator (the scan operator). The scan would read the next tuple from the table and return control along with the tuple to the selection operator. The selection operator would then apply the predicate on the tuple. If the predicate passes, then the selection operator would return control, along with the tuple to the count operator which increments the running count. If the predicate fails, then instead of returning control to the count operator, the selection operator would instead call getNext again on its child (the scan operator) and apply the predicate to the next tuple (and keep on doing so until a tuple passes the predicate).

It turns out that the iterator model is quite inefficient from an instruction cache and data cache perspective, since each operator runs for a very short time before yielding control to a parent or child operator (it just processes one tuple). Furthermore, there is significant function call overhead, as “getNext” is often called multiple times per tuple. Consequently modern systems will run each operator over batches of tuples (instead of on single tuples) to amortize initial cache miss and function call overheads over multiple tuples. Operator output is buffered, and a pointer to this buffer is sent to the parent operator when it is the parent operator’s turn to run.

Whether the iterator model is used or the batched/staged model is used, there is only one operator running at once (per processor). Thus, the only form of parallelism is the aforementioned data-partitioned parallelism across processors. Even if a processor has multiple hardware contexts/cores, each processing resource will be devoted to processing the same one operator at a time (for cache performance reasons --- see e.g. this paper).

Kickfire, along with other companies that perform database operations directly in the hardware using FPGA technology, like Netezza and XtremeData, need to take a different approach to parallelism.

Before discussing the effect on parallelism, let’s begin with a quick overview of FPGA (field programmable gate array) technology, At the lowest level, a FPGA contains a array of combinational logic, state registers, and memory, that can be configured via a mesh of wires to implement desired logical functions. Nearly any complex algorithm, including database operations such as selection, projection, and join, can be implemented in a FPGA and in doing so, can be run at the speed of the hardware. Not only is performance improved by running these algorithms in the hardware, but the chip can also be run at orders of magnitude lower clock frequencies, which result in commensurate gains in power efficiency. In many cases, operations that take hundreds to thousands of CPU instructions can be performed in a single clock cycle in FPGA logic.

Kickfire therefore employs direct transistor-based processing engines in the FPGA to natively execute complete pipelines of relational database operations. The scale and density of the VLSI processes used in today FPGA’s enable a large number (order of hundreds) of these custom operations to occur in parallel, enabling the use of parallel processing algorithms that can improve performance even further.

The ability to have a large number of operations occurring in parallel means that the query processing engines do not need to switch back and forth between different operators (as described in the iterator and blocked/staged schemes above). However, if you want to get the most out of the parallelism, data partitioned parallelism can only get you so far.

For example, if every processing unit is devoted to performing a selection operation on a different partition of the data, then the result of the selection operator will build up, eventually exceeding the size of on-chip and off-chip buffers, thereby starving the execution engines. Consequently, Kickfire implemented efficient pipelined parallelism in addition to data partitioned parallelism, so that all operators in a query plan are running in the hardware at the same time, and data is being consumed at the approximate rate that it is being produced. Kickfire implemented advanced networking techniques in the areas of queuing and flow control to manage the data flow between the multiple producers and consumers, ensuring that the data, in most cases, stays on the chip, and only occasionally spills to the memory (off-chip buffers). Keeping the intermediate datasets live on the chip prevents memory latency and bandwidth from becoming a bottleneck.

However, data-partitioned parallelism is still necessary since operators consume data at different rates. For example, if a selection predicate has 50% selectivity (1 in 2 tuples pass the predicate) followed by an aggregation operator (as in the example above), then one wants to spend approximately twice as much time doing selection as aggregation (since the aggregation operator will only have half as many tuples to process as the selection operator), so Kickfire will use data-partitioned parallelism to have twice as many selection operators as the parent operator.

For example, a hardcoded Kickfire query might look like the figure below (ignoring the column-store specific operators which is a story for another day):




Note that the selection operations on T1 and T2 along with the join between these two tables occurs four times in the query plan (this is data-partitioned parallelism). Since the join has a reasonably low cardinality, there is no need to have the parent selection operator also appear four times in the query plan; rather it can appear twice, with each operator processing the results from two child join operators. Similarly, since the selection operators produce fewer outputs than inputs, the parent operator only needs to appear once. Data from one operator in the query plan is immediately shipped to the next operator for processing.

Kickfire also claims to be able to devote hardware to running multiple queries at the same time (inter-query parallelism). Getting the right mix of data-partitioned parallelism, pipelined parallelism, and inter-query parallelism is a tricky endeavor, and is part of Kickfire’s “secret sauce”. Clearly, this requires some amount of knowledge about the expected cardinality of each operator, and the Kickfire software uses information from the catalog to help figure all of this out. One would expect this process to get more difficult for complex queries --- it will be interesting to see how Kickfire performs on complex workloads as they continue to gain customer traction (Raj makes a compelling case that, in fact, it is the most complex queries where FPGA technology can shine the brightest).

In a nutshell, Kickfire uses column-oriented storage and execution to address I/O bottlenecks (column-oriented storage has been covered extensively elsewhere in my blog, but you can read about the specifics of Kickfire’s column-store on their blog), and FPGA-based data-flow architecture to address processing and memory bottlenecks. Their “SQL chip” acts as a coprocessor and works in conjunction with the x86 processors (which runs a SQL execution engine in the software when needed, though this is usually the exception path) in their base server. By alleviating these three important bottlenecks, Kickfire is able to deliver high performance; yet still achieves tremendous power efficiency thanks to the low clock frequencies.

Overall, although I have openly questioned Kickfire’s go-to-market strategy in past posts (see here and here), their non-technical departments seem a little disorganized at times (see Jerome Pineau’s experience), and some highly visible employees are no longer with the company (notably Ravi Krishnamurthy who presented their SIGMOD paper and Justin Swanhart who did a nice job explaining the Kickfire column-store features in the aforementioned write-up), I remain a fan of their technology. If they make it through the current difficult economic climate, it will be at the virtue of their technology and the tireless work of people like Raj. As the rate of clock speed increases of commodity processors continues to slow down, being able to perform database operations in the hardware becomes an increasingly attractive proposition.