Exadata Join Offloading in Action

Jun 14, 2013 / By Christo Kutrovsky

Tags: , , , , , , ,

One of the biggest selling features of Oracle’s flagship engineered system – Exadata – is the storage offloading capabilities. Storage offloading allows data to be filtered as soon as it is read from storage, reducing the amount of data that database hosts need to process. The storage layer is a shared-nothing MPP architecture and in theory can scale linearly to extremely large data volumes through Exadata Storage Expansion units.

Offloading is applicable only to full table scans and fast index scans. Index access paths that rely on following B-TREE pointers (RANGE SCAN, FULL SCAN) use block access, thus do not benefit in any way from storage offloading.

Offloading performs the following filter operations:
– Projections – extracting only the columns which are present in the select list
– Predicate Filtering – applying all fixed predicate filters like “city=’Boston'” or “city=:the_city”. Bind variables are supported – but must match datatypes. Otherwise, implicit conversion will happen. Oracle’s Data Mining scoring can also leverage this feature.
– Join filtering via bloom filters – this only happens for parallel operations, and it’s a CBO (Cost Based Optimizer) activated decision.

Storage indexes and predicate filtering have been covered on this blog before. This post will focus on join filtering.

Bloom Filters

Join filtering at the storage layer in Exadata is implemented via bloom filters. Here’s a nice presentation explaining the concepts of bloom filters. There are many types of variations and approaches to implementing bloom filtering. Bloom filtering functionality actually predates Exadata, first introduced in Oracle 10g. They were utilized to reduce the amount of traffic between parallel query processes when joining tables. This functionality was extended to the Exadata Storage layer, but still requires parallel query, in contrast to other Exadata-specific filtering that happens for serial queries too. The decision to use bloom filtering is done by the Oracle optimizer and is based on estimated CPU and I/O usage. As with all other optimizer decisions, the Oracle cost-based optimizer (CBO) can get it wrong. If the CBO uses bloom filters when not needed, there’s a slight increase in CPU usage with minimal memory overhead. If the CBO does not use a bloom filter-based join when it should, you may get dramatic performance difference.

The Test Data and System

In this post I will be using my favorite CKK test table. It is a table with 10 million rows (10,000 * 1,000) with 2 rows per block. I have a couple of columns in this table that allow various tests. For example, mod5_id is a column that has the values 0 to 4 and can be used in various tests. It is built as follows:

create table ckk nologging tablespace ckbig as
select rownum id, mod(rownum,5) mod5_id, mod(rownum,5000) mod5000_id, sysdate dt_fixed, sysdate - rownum/24/60 dt_dec, sysdate + rownum/24/60 dt_pos, sysdate + ora_hash(rownum,65,535)/24 dt_rand, sysdate+mod(rownum,10) dt_mod10, rpad('x',3500,'x') filler
from (select rownum r from dual connect by level <= 10000) r1, (select rownum r from dual connect by level <= 1000)
;

I am running these tests on an Exadata X2-2 quarter rack with high-capacity disks. This system can read at 4 GB/sec from disk and 14.5 GB/sec from disk and flash combined. These are the numbers I calculated during my testing. Given that my CKK table is 39 GB, it can be scanned in about 10 seconds from disk and in about 3.5 seconds from flash+disk. Note that to use flash+disk, the table needs to have the cell_flash_cache keep attribute defined.

The Test

Warming up the cell flash cache and confirming scan speeds:

select /*+ FULL(c) parallel(4) */ count(*)
from ckk c; --> ~4.5 seconds

Given that I use a relatively low parallelism, level I get 4.5 seconds. This is good enough for my testing.

Creating a simple 2-row table for joining:

create table small (id number);
insert into small select rownum from dual connect by dual <=2;
commit;

First join experiment:

select /*+ FULL(c) parallel(4) */count(*)
from ckk c join small s on (s.id = c.id);
-->; 4.1 seconds
-- cell IO uncompressed bytes - 40,960,229,376
-- cell physical IO interconnect bytes - 645,974,512
-- CPU used by this session - 4.06 seconds

Notice the column projection at work here. The table is 39 GB; however, since I am doing only a count(*) and joining on a single key, the storage layer returns only the “rowid” and “id” columns. We have a reduction in data volume of nearly 99%. Projections really make a difference in storage layer traffic.

Next, a proper join experiment:

select /*+ FULL(c) parallel(4) */ *
from ckk c join small s on (s.id = c.id);
--> 12.8 seconds
-- cell IO uncompressed bytes - 41,249,734,656
-- cell physical IO interconnect bytes - 37,131,346,192
-- CPU used by this session - 27.64 seconds -> 27.64 / 4 = 6.9

In the example above, we ask for all data from both tables. As a result, we send much more data to the client session. I am not sure why the uncompressed bytes are nearly 300 MB higher, but I am not going to investigate this now. Notice that we still get some data-size savings from sending a rowset, as opposed to individual data blocks. Not all data blocks are full, and there’s metadata in the blocks. By sending a rowset, we can avoid sending this data to the database servers.

Also note the increased execution time. It takes time to transfer all that data – at about 3.2 GB/sec (single 40mbit infiniband) per NODE, this gives me a 6.4 GB/sec consumption rate. At this rate it should have run in about 6 seconds. Why the extra delay?

Well, it takes time to process this data. At 4 CPUs, we can only process so much – if we increase parallelism, we can reach higher speeds:

select /*+ FULL(c) parallel(8) */*
from ckk c join small s on (s.id = c.id);
--> 8.1 seconds
-- CPU used by this session - 29.67 -> 29.67/8 = 3.7

We need about 28 CPU seconds to receive 37GB of data, process it, and join it to our small table. As we increase parallelism, the “penalty” for receiving large amount of data reduces, but we end up consuming more resources on the database nodes.

select /*+ FULL(c) parallel(16) */*
from ckk c join small s on (s.id = c.id);
--> 6.8 seconds
-- CPU used by this session - 29.52 -> 29.52/16 = 1.8

As we reach parallelism 16, we are no longer limited by the CPU processing but by the infiniband network transfer time.

A quick recap:
– 4.1 seconds to read data
– 6.0 to transfer the data
– 1.8 seconds (at parallel 16) to process the data

These things all happen at the same time, as data is being streamed.

In this case, no join offloading is happening. This is one of those cases where the CBO decided it’s not worthwhile. We can use a PX_JOIN_FILTER optimizer hint to force it though:

select /*+ FULL(c) parallel(4) PX_JOIN_FILTER(c)*/* from ckk c join small s on (s.id = c.id);
--> 0.3 seconds
-- cell IO uncompressed bytes - 57,098,240
-- cell physical IO interconnect bytes - 1,808,488
-- CPU used by this session - 0.23 seconds

Whoa. What happened here? How come we joined a 39 GB table with no indexes in 0.3 seconds by reading 57 MB and sending back 2 MB back database layer? A few things:

1. When join filtering is happening at the storage layer, Oracle can leverage storage indexes and skip entire regions of the table. In this case, this would be “cell physical IO bytes saved by storage index – 40,903,131,136″. The exact mechanism of this is not clear to me, but there are technical articles on the internet about range based bloom filters.

2. Join filtering eliminates most of the data transfer to database servers, since only 2 rows match and they are pretty much on the same block – most of the data sent back is probably metadata of “empty”.

As you can see, a simple optimization can have dramatic effect in both wallclock time and overall system resource utilization. In this case we did not perform 39 GB of I/O. We did not send 37 GB of data over the InfiniBand network. And we did not use nearly 30 CPU seconds to receive and process. This is all due to a join filter optimization available exclusively to Exadata.

Here’s the explain plan for reference:

--------------------------------------------------------------------------------------------------------------------------
| Id  | Operation                       | Name     | E-Rows |E-Bytes| Cost (%CPU)| E-Time   |    TQ  |IN-OUT| PQ Distrib |
--------------------------------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT                |          |      2 |  7114 |   376K  (1)| 00:00:15 |        |      |            |
|   1 |  PX COORDINATOR                 |          |        |       |            |          |        |      |            |
|   2 |   PX SEND QC (RANDOM)           | :TQ10001 |      2 |  7114 |   376K  (1)| 00:00:15 |  Q1,01 | P->S | QC (RAND)  |
|*  3 |    HASH JOIN                    |          |      2 |  7114 |   376K  (1)| 00:00:15 |  Q1,01 | PCWP |            |
|   4 |     PX RECEIVE                  |          |      2 |     6 |     2   (0)| 00:00:01 |  Q1,01 | PCWP |            |
|   5 |      PX SEND BROADCAST          | :TQ10000 |      2 |     6 |     2   (0)| 00:00:01 |  Q1,00 | P->P | BROADCAST  |
|   6 |       PX BLOCK ITERATOR         |          |      2 |     6 |     2   (0)| 00:00:01 |  Q1,00 | PCWC |            |
|   7 |        TABLE ACCESS STORAGE FULL| SMALL    |      2 |     6 |     2   (0)| 00:00:01 |  Q1,00 | PCWP |            |
|   8 |     PX BLOCK ITERATOR           |          |     10M|    33G|   376K  (1)| 00:00:15 |  Q1,01 | PCWC |            |
|*  9 |      TABLE ACCESS STORAGE FULL  | CKK      |     10M|    33G|   376K  (1)| 00:00:15 |  Q1,01 | PCWP |            |
--------------------------------------------------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------

3 - access("S"."ID"="C"."ID")
9 - storage(SYS_OP_BLOOM_FILTER(:BF0000,"C"."ID"))
filter(SYS_OP_BLOOM_FILTER(:BF0000,"C"."ID"))

The key element that shows bloom filters will be offloaded is step 9 – and the function storage (SYS_OP_BLOOM_FILTER(:BF0000,”C”.”ID”)) – this means data will be offloaded at storage layer.

In most cases, the speed difference will not be this dramatic. It is very dependent on physical data ordering. However, armed with the knowledge that such dramatic optimizations are possible, you can build an ETL process to, for example, take advantage of this. Or, have the ETL prepare the data so that, when queried, these optimizations can be leveraged.

Here’s a slightly different query – notice the join key is now mod5000_id – which spreads values from 0 to 5000 equally in the entire dataset

select /*+ FULL(c) parallel(4) PX_JOIN_FILTER(c)*/* from ckk c join small s on (s.id = c.mod5000_id);
--> ~8.8 seconds (includes all fetching)
-- cell IO uncompressed bytes - 9,047,941,120
-- cell physical IO interconnect bytes - 859,995,712
-- CPU used by this session - 0.97

In this case, we still benefit significantly from storage indexes. We only had to read about 1/4 of the data – 9GB. Projections and bloom filtering further reduced it to 850 MB that were finally sent to the database host, which in turn produced some 4,000 rows at about 4,000 bytes each – 16 MB to the end client.

Limitations

There are some restrictions in bloom filtering. For example, they don’t work with outer joins. Why is this the case? In inner joins, any table can be “the second” table and bloom filters applied to it. In Oracle outer joins can also be performed in any order. However, even in the case where the outer joined table is second, bloom filters appear to be unusable.

The CBO is your best friend and your worst enemy. As you saw in my simple example, there are obvious cases where join-based bloom filtering should happen, but doesn’t. I suspect it has something to do with the original purpose of join bloom filters and the way costing was implemented.

And finally – bloom filters are based on hash values and thus very data-dependent. If you have skewed data and there are some significant hash collisions with popular values, this will make the join filtering much less efficient at the storage layer.