Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/include/duckdb/optimizer/filter_pushdown.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ class Optimizer;

class FilterPushdown {
public:
explicit FilterPushdown(Optimizer &optimizer, bool convert_mark_joins = true);
explicit FilterPushdown(Optimizer &optimizer, LogicalOperator &plan, bool convert_mark_joins = true);
FilterPushdown(Optimizer &optimizer, const unordered_set<idx_t> &projected_mark_indexes,
bool convert_mark_joins = true);

//! Perform filter pushdown
unique_ptr<LogicalOperator> Rewrite(unique_ptr<LogicalOperator> op);
Expand All @@ -40,6 +42,7 @@ class FilterPushdown {
private:
Optimizer &optimizer;
FilterCombiner combiner;
unordered_set<idx_t> projected_mark_indexes;
bool convert_mark_joins;

vector<unique_ptr<Filter>> filters;
Expand Down
78 changes: 75 additions & 3 deletions src/optimizer/filter_pushdown.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,89 @@
#include "duckdb/optimizer/filter_pushdown.hpp"

#include "duckdb/optimizer/filter_combiner.hpp"
#include "duckdb/optimizer/optimizer.hpp"
#include "duckdb/planner/expression_iterator.hpp"
#include "duckdb/planner/operator/logical_filter.hpp"
#include "duckdb/planner/operator/logical_join.hpp"
#include "duckdb/planner/operator/logical_window.hpp"
#include "duckdb/optimizer/optimizer.hpp"
#include "duckdb/planner/operator/logical_projection.hpp"
#include "duckdb/planner/operator/logical_comparison_join.hpp"

namespace duckdb {

using Filter = FilterPushdown::Filter;

FilterPushdown::FilterPushdown(Optimizer &optimizer, bool convert_mark_joins)
static unordered_set<idx_t> GetMarkJoinIndexes(LogicalOperator &plan, unordered_set<idx_t> &table_bindings) {
unordered_set<idx_t> projected_mark_join_indexes;
switch (plan.type) {
case LogicalOperatorType::LOGICAL_COMPARISON_JOIN: {
auto &join = plan.Cast<LogicalComparisonJoin>();
if (join.join_type != JoinType::MARK) {
break;
}
// if the projected table bindings include the mark join index,
if (table_bindings.find(join.mark_index) != table_bindings.end()) {
projected_mark_join_indexes.insert(join.mark_index);
}
break;
}
// you need to store table.column index.
// if you get to a projection, you need to change the table_bindings passed so they reflect the
// table index of the original expression they originated from.
case LogicalOperatorType::LOGICAL_PROJECTION: {
// when we encounter a projection, replace the table_bindings with
// the tables in the projection
auto plan_bindings = plan.GetColumnBindings();
auto &proj = plan.Cast<LogicalProjection>();
auto proj_bindings = proj.GetColumnBindings();
unordered_set<idx_t> new_table_bindings;
for (auto &binding : proj_bindings) {
auto col_index = binding.column_index;
auto &expr = proj.expressions.at(col_index);
vector<ColumnBinding> bindings_to_keep;
ExpressionIterator::EnumerateExpression(expr, [&](Expression &child) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExpressionIterator only iterates one level - I'm not sure if that is sufficient here.

The following query still seems to throw an internal exception:

CREATE OR REPLACE TABLE BaseData AS (
        SELECT
          '10' AS my_key,
          '20' AS parent_key,
          '30' AS payload,
          '40' as foo,
          '50' as foo2,
          '60' as foo3
    );
D WITH
      Example AS (
          SELECT
              c.my_key,
              (c.parent_key IN (SELECT my_key FROM BaseData)) AND c.my_key>'10' AS parentExists,
              p.my_key IS NOT NULL AS parentExists2,
          FROM BaseData AS c
          LEFT JOIN BaseData AS p ON c.parent_key = p.my_key
      )
      SELECT *
      FROM Example
      WHERE parentExists;

The LogicalOperatorVisitor can do recursive visiting of expressions that should likely be used here.

if (expr->expression_class == ExpressionClass::BOUND_COLUMN_REF) {
auto &col_ref = expr->Cast<BoundColumnRefExpression>();
bindings_to_keep.push_back(col_ref.binding);
}
});
for (auto &expr_binding : bindings_to_keep) {
new_table_bindings.insert(expr_binding.table_index);
}
table_bindings = new_table_bindings;
}
break;
}
default:
break;
}

// recurse into the children to find mark joins and project their indexes.
for (auto &child : plan.children) {
auto extra_mark_indexes = GetMarkJoinIndexes(*child, table_bindings);
for (auto extra_index : extra_mark_indexes) {
projected_mark_join_indexes.insert(extra_index);
}
}
return projected_mark_join_indexes;
}

FilterPushdown::FilterPushdown(Optimizer &optimizer, LogicalOperator &plan, bool convert_mark_joins)
: optimizer(optimizer), combiner(optimizer.context), convert_mark_joins(convert_mark_joins) {

unordered_set<idx_t> table_bindings;
// other operators have not yet removed unused columns, so they will project mark joins
// even though they are not needed at this time
for (auto &binding : plan.GetColumnBindings()) {
table_bindings.insert(binding.table_index);
}
projected_mark_indexes = GetMarkJoinIndexes(plan, table_bindings);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are now calling GetMarkJoinIndexes for every single FilterPushdown construction - when the presence of mark joins in a plan is very rare.

I think we should invert this in a stand-alone optimizer for the mark join, i.e.: first we find a mark join, then after finding a mark join, we traverse the relevant part of the query tree (e.g. PROJECTION -> FILTER -> MARK JOIN, or AGGREGATE -> FILTER -> MARK) to look for occurrences of the marker.

Then if we only find it in the filter we turn the mark join into a semi join.

If no mark joins are present we don't need to do any additional work in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this should probably just be a different operator, but I don't think GetMarkJoinIndexes is called for every construction. It should just be called once when FilterPushdown is started then passed down to child constructions. The extra constructor here filter_pushdown.hpp:22 should be used, but I can see I missed a few cases.

Regardless, I'll work on moving this functionality to a new optimizer

}

FilterPushdown::FilterPushdown(Optimizer &optimizer, const unordered_set<idx_t> &projected_mark_indexes,
bool convert_mark_joins)
: optimizer(optimizer), combiner(optimizer.context), projected_mark_indexes(projected_mark_indexes),
convert_mark_joins(convert_mark_joins) {
}

unique_ptr<LogicalOperator> FilterPushdown::Rewrite(unique_ptr<LogicalOperator> op) {
Expand Down Expand Up @@ -148,7 +220,7 @@ unique_ptr<LogicalOperator> FilterPushdown::PushFinalFilters(unique_ptr<LogicalO
unique_ptr<LogicalOperator> FilterPushdown::FinishPushdown(unique_ptr<LogicalOperator> op) {
// unhandled type, first perform filter pushdown in its children
for (auto &child : op->children) {
FilterPushdown pushdown(optimizer, convert_mark_joins);
FilterPushdown pushdown(optimizer, *child, convert_mark_joins);
child = pushdown.Rewrite(std::move(child));
}
// now push any existing filters
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void Optimizer::RunBuiltInOptimizers() {

// perform filter pushdown
RunOptimizer(OptimizerType::FILTER_PUSHDOWN, [&]() {
FilterPushdown filter_pushdown(*this);
FilterPushdown filter_pushdown(*this, *plan);
plan = filter_pushdown.Rewrite(std::move(plan));
});

Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/pushdown/pushdown_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ unique_ptr<LogicalOperator> FilterPushdown::PushdownAggregate(unique_ptr<Logical

// pushdown into AGGREGATE and GROUP BY
// we cannot push expressions that refer to the aggregate
FilterPushdown child_pushdown(optimizer, convert_mark_joins);
FilterPushdown child_pushdown(optimizer, projected_mark_indexes, convert_mark_joins);
for (idx_t i = 0; i < filters.size(); i++) {
auto &f = *filters[i];
if (f.bindings.find(aggr.aggregate_index) != f.bindings.end()) {
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/pushdown/pushdown_cross_product.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ using Filter = FilterPushdown::Filter;

unique_ptr<LogicalOperator> FilterPushdown::PushdownCrossProduct(unique_ptr<LogicalOperator> op) {
D_ASSERT(op->children.size() > 1);
FilterPushdown left_pushdown(optimizer, convert_mark_joins), right_pushdown(optimizer, convert_mark_joins);
FilterPushdown left_pushdown(optimizer, projected_mark_indexes, convert_mark_joins),
right_pushdown(optimizer, projected_mark_indexes, convert_mark_joins);
vector<unique_ptr<Expression>> join_expressions;
auto join_ref_type = JoinRefType::REGULAR;
switch (op->type) {
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/pushdown/pushdown_left_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ unique_ptr<LogicalOperator> FilterPushdown::PushdownLeftJoin(unique_ptr<LogicalO
if (op->type == LogicalOperatorType::LOGICAL_DELIM_JOIN) {
return FinishPushdown(std::move(op));
}
FilterPushdown left_pushdown(optimizer, convert_mark_joins), right_pushdown(optimizer, convert_mark_joins);
FilterPushdown left_pushdown(optimizer, projected_mark_indexes, convert_mark_joins),
right_pushdown(optimizer, projected_mark_indexes, convert_mark_joins);
// for a comparison join we create a FilterCombiner that checks if we can push conditions on LHS join conditions
// into the RHS of the join
FilterCombiner filter_combiner(optimizer);
Expand Down
10 changes: 7 additions & 3 deletions src/optimizer/pushdown/pushdown_mark_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ using Filter = FilterPushdown::Filter;
unique_ptr<LogicalOperator> FilterPushdown::PushdownMarkJoin(unique_ptr<LogicalOperator> op,
unordered_set<idx_t> &left_bindings,
unordered_set<idx_t> &right_bindings) {
auto op_bindings = op->GetColumnBindings();
auto &join = op->Cast<LogicalJoin>();
auto &comp_join = op->Cast<LogicalComparisonJoin>();
D_ASSERT(join.join_type == JoinType::MARK);
D_ASSERT(op->type == LogicalOperatorType::LOGICAL_COMPARISON_JOIN ||
op->type == LogicalOperatorType::LOGICAL_DELIM_JOIN || op->type == LogicalOperatorType::LOGICAL_ASOF_JOIN);

right_bindings.insert(comp_join.mark_index);
FilterPushdown left_pushdown(optimizer, convert_mark_joins), right_pushdown(optimizer, convert_mark_joins);
FilterPushdown left_pushdown(optimizer, projected_mark_indexes, convert_mark_joins),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if the FilterPushdown optimizer is the right place to do this optimization, given that we are now adding even more new parameters here only to facilitate this one optimization (that actually has little to do with filter pushdown).

Perhaps we should move this to a separate optimizer altogether so that we can clean up the filter pushdown optimizer and leave it to only push down filters?

I think it was originally added there because it seemed easy but given that this has caused a number of issues it seems that was the wrong assumption.

right_pushdown(optimizer, projected_mark_indexes, convert_mark_joins);
#ifdef DEBUG
bool simplified_mark_join = false;
#endif
Expand All @@ -35,7 +37,8 @@ unique_ptr<LogicalOperator> FilterPushdown::PushdownMarkJoin(unique_ptr<LogicalO
#endif
// this filter references the marker
// we can turn this into a SEMI join if the filter is on only the marker
if (filters[i]->filter->type == ExpressionType::BOUND_COLUMN_REF && convert_mark_joins) {
if (filters[i]->filter->type == ExpressionType::BOUND_COLUMN_REF && convert_mark_joins &&
projected_mark_indexes.find(join.mark_index) == projected_mark_indexes.end()) {
// filter just references the marker: turn into semi join
#ifdef DEBUG
simplified_mark_join = true;
Expand All @@ -61,7 +64,8 @@ unique_ptr<LogicalOperator> FilterPushdown::PushdownMarkJoin(unique_ptr<LogicalO
break;
}
}
if (all_null_values_are_equal && convert_mark_joins) {
if (all_null_values_are_equal && convert_mark_joins &&
projected_mark_indexes.find(join.mark_index) == projected_mark_indexes.end()) {
#ifdef DEBUG
simplified_mark_join = true;
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/pushdown/pushdown_projection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ unique_ptr<LogicalOperator> FilterPushdown::PushdownProjection(unique_ptr<Logica
// push filter through logical projection
// all the BoundColumnRefExpressions in the filter should refer to the LogicalProjection
// we can rewrite them by replacing those references with the expression of the LogicalProjection node
FilterPushdown child_pushdown(optimizer, convert_mark_joins);
FilterPushdown child_pushdown(optimizer, projected_mark_indexes, convert_mark_joins);
// There are some expressions can not be pushed down. We should keep them
// and add an extra filter operator.
vector<unique_ptr<Expression>> remain_expressions;
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/pushdown/pushdown_semi_anti_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ unique_ptr<LogicalOperator> FilterPushdown::PushdownSemiAntiJoin(unique_ptr<Logi

// push all current filters down the left side
op->children[0] = Rewrite(std::move(op->children[0]));
FilterPushdown right_pushdown(optimizer, convert_mark_joins);
FilterPushdown right_pushdown(optimizer, *op, convert_mark_joins);
op->children[1] = right_pushdown.Rewrite(std::move(op->children[1]));

bool left_empty = op->children[0]->type == LogicalOperatorType::LOGICAL_EMPTY_RESULT;
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/pushdown/pushdown_set_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ unique_ptr<LogicalOperator> FilterPushdown::PushdownSetOperation(unique_ptr<Logi
}

// pushdown into set operation, we can duplicate the condition and pushdown the expressions into both sides
FilterPushdown left_pushdown(optimizer, convert_mark_joins), right_pushdown(optimizer, convert_mark_joins);
FilterPushdown left_pushdown(optimizer, projected_mark_indexes, convert_mark_joins),
right_pushdown(optimizer, projected_mark_indexes, convert_mark_joins);
for (idx_t i = 0; i < filters.size(); i++) {
// first create a copy of the filter
auto right_filter = make_uniq<Filter>();
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/pushdown/pushdown_single_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ unique_ptr<LogicalOperator> FilterPushdown::PushdownSingleJoin(unique_ptr<Logica
unordered_set<idx_t> &left_bindings,
unordered_set<idx_t> &right_bindings) {
D_ASSERT(op->Cast<LogicalJoin>().join_type == JoinType::SINGLE);
FilterPushdown left_pushdown(optimizer, convert_mark_joins), right_pushdown(optimizer, convert_mark_joins);
FilterPushdown left_pushdown(optimizer, projected_mark_indexes, convert_mark_joins),
right_pushdown(optimizer, projected_mark_indexes, convert_mark_joins);
// now check the set of filters
for (idx_t i = 0; i < filters.size(); i++) {
auto side = JoinSide::GetJoinSide(filters[i]->bindings, left_bindings, right_bindings);
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/pushdown/pushdown_window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ bool CanPushdownFilter(vector<column_binding_set_t> window_exprs_partition_bindi
unique_ptr<LogicalOperator> FilterPushdown::PushdownWindow(unique_ptr<LogicalOperator> op) {
D_ASSERT(op->type == LogicalOperatorType::LOGICAL_WINDOW);
auto &window = op->Cast<LogicalWindow>();
FilterPushdown pushdown(optimizer);
FilterPushdown pushdown(optimizer, *op, convert_mark_joins);

// 1. Loop throguh the expressions, find the window expressions and investigate the partitions
// if a filter applies to a partition in each window expression then you can push the filter
Expand Down
4 changes: 2 additions & 2 deletions src/optimizer/statistics/operator/propagate_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ void StatisticsPropagator::CreateFilterFromJoinStats(unique_ptr<LogicalOperator>
child->expressions.emplace_back(std::move(filter_expr));
}

// not allowed to let filter pushdowwn change mark joins to semi joins.
// not allowed to let filter pushdown change mark joins to semi joins.
// semi joins are potentially slower AND the conversion can ruin column binding information
FilterPushdown filter_pushdown(optimizer, false);
FilterPushdown filter_pushdown(optimizer, *child, false);
child = filter_pushdown.Rewrite(std::move(child));
PropagateExpression(expr);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# name: test/optimizer/pushdown/no_mark_to_semi_if_mark_index_is_projected.test
# description: No mark to semi conversion if the mark join index is projected
# group: [pushdown]

statement ok
CREATE OR REPLACE TABLE BaseData AS (
SELECT
'10' AS my_key,
'20' AS parent_key,
'30' AS payload,
'40' as foo,
'50' as foo2,
'60' as foo3
);


# Original query
query III
WITH
Example AS (
SELECT
c.my_key,
(c.parent_key IN (SELECT my_key FROM BaseData)) AS parentExists,
p.my_key IS NOT NULL AS parentExists2,
FROM BaseData AS c
LEFT JOIN BaseData AS p ON c.parent_key = p.my_key
)
SELECT *
FROM Example
WHERE parentExists
----

# original query no CTE
query III
SELECT
c.my_key,
(c.parent_key IN (SELECT my_key FROM BaseData)) AS parentExists,
p.my_key IS NOT NULL AS parentExists2,
FROM BaseData AS c
LEFT JOIN BaseData AS p ON c.parent_key = p.my_key
WHERE parentExists;
----

# original query but the CTE is a subquery
query III
SELECT *
FROM (SELECT
c.my_key,
(c.parent_key IN (SELECT my_key FROM BaseData)) AS parentExists,
p.my_key IS NOT NULL AS parentExists2,
FROM BaseData AS c
LEFT JOIN BaseData AS p ON c.parent_key = p.my_key
)
WHERE parentExists;
----

statement ok
PRAGMA explain_output='optimized_only'

query II
EXPLAIN
WITH Example AS (
SELECT
c.my_key,
(c.parent_key IN (SELECT my_key FROM BaseData)) AS parentExists,
p.my_key IS NOT NULL AS parentExists2,
FROM BaseData AS c
LEFT JOIN BaseData AS p ON c.parent_key = p.my_key
)
SELECT *
FROM Example
WHERE parentExists
----
logical_opt <REGEX>:.*MARK.*

query II
EXPLAIN
WITH Example AS (
SELECT
c.my_key,
(c.parent_key IN (SELECT my_key FROM BaseData)) AS parentExists,
p.my_key IS NOT NULL AS parentExists2,
FROM BaseData AS c
LEFT JOIN BaseData AS p ON c.parent_key = p.my_key
)
SELECT *
FROM Example
WHERE parentExists
----
logical_opt <!REGEX>:.*SEMI.*

statement ok
create table t0 as select range a from range(300);
statement ok
create table t2 as select range b from range(50000);

query I
select sum(in_alias::INT) FROM (select a in (select b from t2) as in_alias from t0) where in_alias;
----
300