-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Pushdown table filters into probe based on min/max data found during hash build of hash joins #12908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…eal with recursive CTEs
… conditions before join filter pushdown so the indexes do not get invalidated
This decreases performance in some cases - in particular when the filters we push are very non-selective, as we do extra work in the scans for no effect (executing table filters unnecessarily to filter 0 rows). In particular TPC-H sees a few performance degradations:
Because of the uniform random nature of the data the added filters are often very non-selective. I see some options towards improving this:
I would suggest making these optimizations in a separate PR. |
Merge pull request duckdb/duckdb#12908 from Mytherin/joinfilterpushdown
Merge pull request duckdb/duckdb#12908 from Mytherin/joinfilterpushdown
Merge pull request duckdb/duckdb#12908 from Mytherin/joinfilterpushdown
… `AGGREGATE` (#14453) Follow-up from #12908 Allow dynamically generated join filters to be pushed through `UNION`, `UNNEST` and `AGGREGATE` nodes in the plan. For `UNION`, this means we need to potentially push filters into multiple table scans. For example, in this case we push the filter into both scans over `lineitem`: ```sql CREATE VIEW double_lineitem AS FROM lineitem UNION ALL FROM lineitem; SELECT * FROM double_lineitem WHERE l_orderkey=(SELECT MAX(l_orderkey) FROM lineitem) ORDER BY ALL ``` We can push filters through aggregates as well, as long as the filters are on grouping columns. However, in this case, the pipeline dependencies are not correctly set up yet to ensure the join upstream is executed before the aggregate - meaning the filter has no effect yet as the aggregate is computed before we know which rows we can filter out (CC @lnkuiper perhaps something you want to look into at some point).
Follow-up from #12908 This PR extends the join filters that get generated by hash joins to include an `IN` filter when the hash table is small enough. Rather than generating only a `min-max` filter, we generate an `IN` filter with **all** values in the hash table. This can greatly improve performance over the `min-max` filter when there are few values that are far apart. For example, if the hash table contains the values `1` and `100000`, the `IN` filter might be much more effective as we can prune many more row groups. The threshold for which we generate an `IN` filter is determined by the `dynamic_or_filter_threshold` setting, and defaults to 50 rows. The `IN` filter is pushed as an `OPTIONAL_FILTER`, which is currently only evaluated for zone-map pruning. As a result, the performance impact of pushing this filter is minimal, while the performance improvement from extra zone-map pruning can be significant. ### Benchmark Below is a benchmark that we run over TPC-H SF10 in which we run a join that is generated through an `IN` clause - we join on the `min` and `max` values of `l_orderkey` (meaning the generated min/max filters will not be effective in pruning rows). ```sql SELECT * FROM lineitem WHERE l_orderkey IN ( SELECT UNNEST([MIN(l_orderkey), MAX(l_orderkey)]) FROM lineitem) ORDER BY ALL; ``` | v1.1 | New | |-------|-------| | 0.22s | 0.04s |
When executing a Top-N query, we need to find the top or bottom `N` values for a given set of conditions. The Top-N operator builds the result through a heap in which we keep track of the top-N values seen so far. For any given heap of `N` values, we know that any row that has an ordering column value that is larger (or smaller, for `DESC` ordering) will not be in the result set at that moment. In the Top-N operator, we already keep track of this value - called the `boundary value` - and use it to prune out rows that we know are not going to be inserted into the heap early on in the `Sink` call. While this works in speeding up the Top-N operator itself, we still need to scan the rows from the base storage before finding out these rows will not make it into the final result. This PR extends the Top-N operator to push the boundary value into the scans as a table filter. This is similar to the dynamic table filters generated through joins (introduced in #12908), but "more dynamic". While the filters generated through joins are generated once (when the HT build is complete), the Top-N filters are updated whenever the boundary value is updated - which can happen for every `Sink` call. #### Implementation The implementation works through a new table filter - the `DynamicFilter`. This is a regular `TableFilter` that holds a shared pointer to a `DynamicFilterData ` - which contains a child table filter together with a lock: ```cpp struct DynamicFilterData { mutex lock; unique_ptr<TableFilter> filter; bool initialized = false; }; ``` This filter is generated in the `TopN` optimizer. The `TopN` contains a shared pointer to the `DynamicFilterData` as well. The `TopN` operator then updates the underlying filter during the `Sink` phase whenever the global boundary value is updated to a new value. ##### Performance Below is an illustration of the performance gain we can obtain here, running this query over TPC-H SF10: ```sql SELECT * FROM lineitem ORDER BY l_orderkey LIMIT 5; ``` | main | New | |-------|-------| | 0.19s | 0.02s | Note that this is not always beneficial, e.g. when looking in descending order in this scenario. Since the table is naturally ordered and we scan data in the table's natural order, we are never able to prune any row groups: ```sql SELECT * FROM lineitem ORDER BY l_orderkey DESC LIMIT 5; ``` | main | New | |-------|-------| | 0.23s | 0.23s | #### Limitations * When the `ORDER BY` clause has multiple order conditions, we can only generate the filter for the first order condition (since the value of the remaining ones is unknown). * We currently only support `NULLS LAST` ordering. It is possible to extend to `NULLS FIRST`, but this is more tricky as we need to take `NULL` values into account in the generated filters/boundary value.
This PR extends the hash table build in
PhysicalHashJoin
to compute themin
andmax
value of join keys - and then uses the generatedmin/max
to dynamically generate table filters that are pushed into the probe side. This can greatly improve performance for selective joins.JoinFilterPushdownOptimizer
The
JoinFilterPushdownOptimizer
is responsible for setting up the link between thePhysicalComparisonJoin
(build) and thePhysicalTableScan
(probe). Currently it has a number of limitations (that can likely be alleviated somewhat in the future):INNER
,RIGHT
,SEMI
andRIGHT_SEMI
joins.col1 + 1 = col2
does not generate join filters)JoinFilterPushdownInfo
The optimizer generates a
JoinFilterPushdownInfo
that contains the min/max aggregates to compute, as well as functions to set up the necessary state. TheDynamicTableFilterSet
is the link between the table scan and the join. TheJoinFilterPushdownInfo
then computes themin/max
in parallel during the build, and in theFinalize
uses themin/max
to push the table filters into theDynamicTableFilterSet
. In the table scan, we then access theDynamicTableFilterSet
if available and use that to generate a new set of table filters.UngroupedAggregateState
In order to facilitate the computation of min/max, the relevant code to compute ungrouped aggregates is extracted from
PhysicalUngroupedAggregate
into theUngroupedAggregateState
class, that is now also used in theJoinFilterPushdown
.Benchmarks
Below are some benchmarks on TPC-H SF15:
Future Work
MultiFileList
as that relies on the complex filter pushdown. As such, partitioned files are not yet pruned based on the generated filters. (CC @samansmink)