Skip to content

Conversation

Mytherin
Copy link
Collaborator

@Mytherin Mytherin commented Jul 9, 2024

This PR extends the hash table build in PhysicalHashJoin to compute the min and max value of join keys - and then uses the generated min/max to dynamically generate table filters that are pushed into the probe side. This can greatly improve performance for selective joins.

JoinFilterPushdownOptimizer

The JoinFilterPushdownOptimizer is responsible for setting up the link between the PhysicalComparisonJoin (build) and the PhysicalTableScan (probe). Currently it has a number of limitations (that can likely be alleviated somewhat in the future):

  • Join filters are only generated for equality predicates (col1 = col2)
  • Join filters are only generated for INNER, RIGHT, SEMI and RIGHT_SEMI joins.
  • Join filters are only generated when the probe column is directly referenced (i.e. col1 + 1 = col2 does not generate join filters)

JoinFilterPushdownInfo

The optimizer generates a JoinFilterPushdownInfo that contains the min/max aggregates to compute, as well as functions to set up the necessary state. The DynamicTableFilterSet is the link between the table scan and the join. The JoinFilterPushdownInfo then computes the min/max in parallel during the build, and in the Finalize uses the min/max to push the table filters into the DynamicTableFilterSet. In the table scan, we then access the DynamicTableFilterSet if available and use that to generate a new set of table filters.

UngroupedAggregateState

In order to facilitate the computation of min/max, the relevant code to compute ungrouped aggregates is extracted from PhysicalUngroupedAggregate into the UngroupedAggregateState class, that is now also used in the JoinFilterPushdown.

Benchmarks

Below are some benchmarks on TPC-H SF15:

SELECT * FROM lineitem WHERE l_orderkey=(SELECT MAX(l_orderkey) FROM lineitem);
v1.0 New
1.08s 0.07s
SELECT * FROM lineitem JOIN orders ON (l_orderkey=o_orderkey) WHERE o_totalprice >= 570000;
v1.0 New
1.05s 0.01s

Future Work

  • The generated filters are not yet pushed into the MultiFileList as that relies on the complex filter pushdown. As such, partitioned files are not yet pruned based on the generated filters. (CC @samansmink)
  • The filters generated in this manner are not necessary for correctness but only an optimization. There are situations where these filters can actually slow down execution. It could be beneficial from a performance perspective not to execute them and to e.g. only use them for zonemap pruning. This could e.g. be done adaptively in the scan, or based on some heuristics of selectivity.

@Mytherin
Copy link
Collaborator Author

Mytherin commented Jul 9, 2024

CC @lnkuiper @Tmonster

This decreases performance in some cases - in particular when the filters we push are very non-selective, as we do extra work in the scans for no effect (executing table filters unnecessarily to filter 0 rows). In particular TPC-H sees a few performance degradations:

benchmark/tpch/sf1/q04.benchmark
Old timing: 0.047415
New timing: 0.067873


benchmark/tpch/sf1/q21.benchmark
Old timing: 0.14677
New timing: 0.192795

Because of the uniform random nature of the data the added filters are often very non-selective.

I see some options towards improving this:

  • Labeling the generated filters as "optional" (since they are not required for correctness), and adaptively running/not running the filters depending on selectivity
  • Or even only doing zonemap skipping using these filters
  • Not executing table filters if the zonemaps indicate they are useless (e.g. if min = 1000, max = 1100 and the filter is x > 0 AND x < 10000 - we know all rows pass the filter - so executing it is unnecessary).

I would suggest making these optimizations in a separate PR.

@Mytherin Mytherin requested a review from lnkuiper July 9, 2024 11:23
@Mytherin Mytherin merged commit 9937fcf into duckdb:main Jul 9, 2024
krlmlr added a commit to duckdb/duckdb-r that referenced this pull request Jul 11, 2024
Merge pull request duckdb/duckdb#12908 from Mytherin/joinfilterpushdown
krlmlr added a commit to duckdb/duckdb-r that referenced this pull request Jul 16, 2024
Merge pull request duckdb/duckdb#12908 from Mytherin/joinfilterpushdown
krlmlr added a commit to duckdb/duckdb-r that referenced this pull request Jul 16, 2024
Merge pull request duckdb/duckdb#12908 from Mytherin/joinfilterpushdown
@Mytherin Mytherin deleted the joinfilterpushdown branch August 4, 2024 08:33
Mytherin added a commit that referenced this pull request Oct 21, 2024
… `AGGREGATE` (#14453)

Follow-up from #12908

Allow dynamically generated join filters to be pushed through `UNION`,
`UNNEST` and `AGGREGATE` nodes in the plan.

For `UNION`, this means we need to potentially push filters into
multiple table scans. For example, in this case we push the filter into
both scans over `lineitem`:

```sql
CREATE VIEW double_lineitem AS
FROM lineitem
UNION ALL
FROM lineitem;

SELECT *
FROM double_lineitem
WHERE l_orderkey=(SELECT MAX(l_orderkey) FROM lineitem) ORDER BY ALL
```

We can push filters through aggregates as well, as long as the filters
are on grouping columns. However, in this case, the pipeline
dependencies are not correctly set up yet to ensure the join upstream is
executed before the aggregate - meaning the filter has no effect yet as
the aggregate is computed before we know which rows we can filter out
(CC @lnkuiper perhaps something you want to look into at some point).
Mytherin added a commit that referenced this pull request Nov 18, 2024
Follow-up from #12908


This PR extends the join filters that get generated by hash joins to
include an `IN` filter when the hash table is small enough. Rather than
generating only a `min-max` filter, we generate an `IN` filter with
**all** values in the hash table. This can greatly improve performance
over the `min-max` filter when there are few values that are far apart.
For example, if the hash table contains the values `1` and `100000`, the
`IN` filter might be much more effective as we can prune many more row
groups.

The threshold for which we generate an `IN` filter is determined by the
`dynamic_or_filter_threshold` setting, and defaults to 50 rows. The `IN`
filter is pushed as an `OPTIONAL_FILTER`, which is currently only
evaluated for zone-map pruning. As a result, the performance impact of
pushing this filter is minimal, while the performance improvement from
extra zone-map pruning can be significant.

### Benchmark

Below is a benchmark that we run over TPC-H SF10 in which we run a join
that is generated through an `IN` clause - we join on the `min` and
`max` values of `l_orderkey` (meaning the generated min/max filters will
not be effective in pruning rows).

```sql
SELECT *
FROM lineitem
WHERE l_orderkey IN (
    SELECT UNNEST([MIN(l_orderkey), MAX(l_orderkey)])
    FROM lineitem)
ORDER BY ALL;
```

| v1.1  |  New  |
|-------|-------|
| 0.22s | 0.04s |
Mytherin added a commit that referenced this pull request Dec 3, 2024
When executing a Top-N query, we need to find the top or bottom `N`
values for a given set of conditions. The Top-N operator builds the
result through a heap in which we keep track of the top-N values seen so
far. For any given heap of `N` values, we know that any row that has an
ordering column value that is larger (or smaller, for `DESC` ordering)
will not be in the result set at that moment.

In the Top-N operator, we already keep track of this value - called the
`boundary value` - and use it to prune out rows that we know are not
going to be inserted into the heap early on in the `Sink` call. While
this works in speeding up the Top-N operator itself, we still need to
scan the rows from the base storage before finding out these rows will
not make it into the final result.

This PR extends the Top-N operator to push the boundary value into the
scans as a table filter. This is similar to the dynamic table filters
generated through joins (introduced in
#12908), but "more dynamic". While
the filters generated through joins are generated once (when the HT
build is complete), the Top-N filters are updated whenever the boundary
value is updated - which can happen for every `Sink` call.

#### Implementation

The implementation works through a new table filter - the
`DynamicFilter`. This is a regular `TableFilter` that holds a shared
pointer to a `DynamicFilterData ` - which contains a child table filter
together with a lock:

```cpp
struct DynamicFilterData {
	mutex lock;
	unique_ptr<TableFilter> filter;
	bool initialized = false;
};
```

This filter is generated in the `TopN` optimizer. The `TopN` contains a
shared pointer to the `DynamicFilterData` as well. The `TopN` operator
then updates the underlying filter during the `Sink` phase whenever the
global boundary value is updated to a new value.

##### Performance

Below is an illustration of the performance gain we can obtain here,
running this query over TPC-H SF10:

```sql
SELECT * FROM lineitem ORDER BY l_orderkey LIMIT 5;
```

| main  |  New  |
|-------|-------|
| 0.19s | 0.02s |

Note that this is not always beneficial, e.g. when looking in descending
order in this scenario. Since the table is naturally ordered and we scan
data in the table's natural order, we are never able to prune any row
groups:

```sql
SELECT * FROM lineitem ORDER BY l_orderkey DESC LIMIT 5;
```

| main  |  New  |
|-------|-------|
| 0.23s | 0.23s |


#### Limitations

* When the `ORDER BY` clause has multiple order conditions, we can only
generate the filter for the first order condition (since the value of
the remaining ones is unknown).
* We currently only support `NULLS LAST` ordering. It is possible to
extend to `NULLS FIRST`, but this is more tricky as we need to take
`NULL` values into account in the generated filters/boundary value.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant