(Test) Advanced adaptive filter selectivity evaluation#20363
(Test) Advanced adaptive filter selectivity evaluation#20363adriangb wants to merge 4 commits intoapache:mainfrom
Conversation
|
run benchmark tpcds |
|
run benchmark clickbench_partitioned |
|
🤖 |
|
run benchmark tpch |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
show benchmark queue |
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20363 (comment)).
|
|
show benchmark queue |
|
🤖 Hi @Dandandan, you asked to view the benchmark queue (#20363 (comment)).
|
|
Hm it seems stuck again |
|
FYI @alamb
|
|
@Dandandan this is mostly vibe coded, I'm only 50% confident it even makes sense without reviewing the code fwiw |
e0240af to
09cdb0b
Compare
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#20363 (comment)).
|
|
Wonder if I'm infinite looping it or something :( |
Yes I think previously it got stuck during infinite loops / extremely long running tasks. |
My bad I’ll try to add a PR to have timeouts and a cancel command |
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#20363 (comment)).
|
|
run benchmark tpch |
|
I think it is stuck again 😆 |
|
@alamb could you take a look? Somehow the result is also empty. |
|
Yeah, seems like it’s always tpcds? I don’t think it’s this branch necessarily, it got stuck on your branch earlier and this one has been pretty much completely rewritten since last time it got stuck here. |
Hmmm could be... |
|
run benchmark clickbench_partitioned |
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#20363 (comment)).
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark clickbench_partitioned |
|
show benchmark queue |
|
🤖 Hi @adriangb, you asked to view the benchmark queue (#20363 (comment)).
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark clickbench_partitioned |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark clickbench_extended |
|
run benchmark tcph |
|
🤖 |
|
🤖 Hi @adriangb, thanks for the request (#20363 (comment)).
Please choose one or more of these with You can also set environment variables on subsequent lines: Unsupported benchmarks: tcph. |
|
run benchmark tpch |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark clickbench_extended |
|
🤖 |
|
🤖: Benchmark completed Details
|
Which issue does this PR close?
Related to filter pushdown performance optimization work.
Rationale for this change
Currently when
pushdown_filters = true, DataFusion pushes all filter predicates into the Parquet reader as row-level filters (ArrowPredicates) unconditionally. This is suboptimal because:reorder_filtersheuristic was static. It used compressed column size as a proxy for cost and sorted filters by that metric, but never measured actual runtime selectivity or evaluation cost. It could not adapt to data skew or runtime conditions.HashJoinExec) cannot be dropped even when they provide no benefit. Without a way to mark filters as optional, the system was forced to always evaluate them.This PR introduces an adaptive filter selectivity tracking system that observes filter behavior at runtime and makes data-driven decisions about whether each filter should be pushed down as a row-level predicate or applied post-scan.
What changes are included in this PR?
1. New module:
selectivity.rs(1,554 lines)The core of this PR. Introduces
SelectivityTracker, a shared, lock-guarded structure that:New -> RowFilter | PostScan -> (promoted/demoted/dropped)states based on:filter_bytes / projection_bytes) to cheaply decide whether a new filter starts as a row filter or post-scan filter.filter_pushdown_min_bytes_per_sec.OptionalFilterPhysicalExprcan be dropped entirely when ineffective.snapshot_generation(), resetting statistics when a filter's predicate changes (e.g., when aDynamicFilterPhysicalExprfrom a hash join updates its value set).Key types:
SelectivityTracker-- cross-file tracker shared by allParquetOpenerinstancesTrackerConfig-- immutable configuration (built fromParquetOptions)SelectivityStats-- per-filter Welford statistics with confidence interval methodsFilterState--RowFilter | PostScan | DroppedenumPartitionedFilters-- output ofpartition_filters(), consumed by the openerFilterId-- stableusizeidentifier assigned byParquetSource::with_predicate2. New wrapper:
OptionalFilterPhysicalExpr(inphysical_expr_common)A transparent
PhysicalExprwrapper that marks a filter as optional -- droppable without affecting query correctness. AllPhysicalExprtrait methods delegate to the inner expression. The selectivity tracker detects this viadowncast_ref::<OptionalFilterPhysicalExpr>()and can drop the filter entirely when it is ineffective, rather than demoting it to post-scan.HashJoinExecnow wraps its dynamic join filters inOptionalFilterPhysicalExprbefore pushing them down. This is why plan output now showsOptional(DynamicFilter [...])instead ofDynamicFilter [...].3. Removal of
reorder_filtersconfig optionThe old static
reorder_filtersboolean and its associated heuristic (sort byrequired_bytes, thencan_use_index) are removed entirely. The adaptive system subsumes this:FilterCandidateno longer storesrequired_bytesorcan_use_indexfields.size_of_columns()andcolumns_sorted()helper functions inrow_filter.rsare removed.SelectivityTracker::partition_filters()based on measured effectiveness or byte-ratio fallback.4. Three new configuration options (in
ParquetOptions)filter_pushdown_min_bytes_per_sec0.0= all promoted,INFINITY= none promoted (feature disabled).filter_collecting_byte_ratio_thresholdfilter_confidence_z5. Changes to
ParquetOpener/ opener.rsVec<(FilterId, Arc<dyn PhysicalExpr>)>instead of a single combinedArc<dyn PhysicalExpr>.selectivity_tracker.partition_filters()to split filters into row-level vs. post-scan.build_row_filter()(updated signature).apply_post_scan_filters_with_stats(), a new function that evaluates each filter individually, reports per-filter timing and selectivity back to the tracker, and combines results into a single boolean mask.limitis only applied to the Parquet reader when there are no post-scan filters (otherwise limiting would cut off rows before the filter could find matches).filter_apply_timemetric tracks post-scan filter evaluation time.6. Changes to
ParquetSource/ source.rsOption<Arc<dyn PhysicalExpr>>toOption<Vec<(FilterId, Arc<dyn PhysicalExpr>)>>.with_predicate()now splits the predicate into conjuncts and assigns stableFilterIds (indices).SelectivityTrackeris stored as a sharedArconParquetSourceand passed to all openers.with_table_parquet_options()now builds a freshSelectivityTrackerfrom the three new config values.with_reorder_filters()andreorder_filters()methods are removed.7. Changes to
build_row_filter()/ row_filter.rsVec<(FilterId, Arc<dyn PhysicalExpr>)>+&Arc<SelectivityTracker>instead of&Arc<dyn PhysicalExpr>+reorder_predicates: bool.RowFilterWithMetrics(new struct) containing both theRowFilterand any unbuildable filters that must be applied post-scan.DatafusionArrowPredicatenow carries aFilterIdandArc<SelectivityTracker>, reporting per-batch evaluation metrics back to the tracker after eachevaluate()call.build_row_filter-- filters arrive pre-ordered by the tracker.8. Changes to
HashJoinExecOptionalFilterPhysicalExprbefore being pushed down.OptionalFilterPhysicalExprto find the innerDynamicFilterPhysicalExpr.9. Protobuf schema updates
reorder_filtersfield (tag 6) marked asreservedindatafusion_common.proto.filter_pushdown_min_bytes_per_sec(tag 35),filter_collecting_byte_ratio_threshold(tag 40),filter_confidence_z(tag 41).pbjson.rs,prost.rs,from_proto,to_proto, andfile_formats.rs.10. Test and benchmark updates
reorder_filtersremoved from tests and benchmarks.filter_pushdown_min_bytes_per_sec = 0.0to preserve deterministic behavior (all filters always pushed down).DynamicFilter [...]toOptional(DynamicFilter [...]).selectivity.rscovering: effectiveness calculation, Welford's algorithm, confidence intervals, state machine transitions (initial placement, promotion, demotion, dropping), dynamic filter generation tracking, filter ordering, and integration lifecycle tests.explain_analyze.rs(output_rows=8->output_rows=5) due to the adaptive system now placing some filters as post-scan that were previously row-level, causing slight row count differences in EXPLAIN ANALYZE output.Are these changes tested?
Yes:
pushdown_filtersand filter pushdown SLT tests pass (withfilter_pushdown_min_bytes_per_sec = 0.0to force all filters to row-level for deterministic behavior).selectivity.rs(~450 lines of tests) covering theSelectivityStatscalculator,TrackerConfigbuilder, state machine transitions (initial placement, promotion, demotion, dropping, reset on generation change), filter ordering, and full promotion/demotion lifecycle integration tests.Optional(...)wrapper on dynamic filters.dynamic_filter_pushdown_config.slt,information_schema.slt,preserve_file_partitioning.slt,projection_pushdown.slt,push_down_filter.slt, andrepartition_subset_satisfaction.sltupdated.benchmarks/results.txtshows TPC-H (13 faster, 6 slower, 3 unchanged), TPC-DS (33 faster, 31 slower, 35 unchanged, with notable 24x improvement on Q64), and ClickBench (18 faster, 12 slower, 13 unchanged) results.Are there any user-facing changes?
Yes:
reorder_filtersconfig option removed. This is a breaking change. Users who setSET datafusion.execution.parquet.reorder_filters = truewill get an error. The adaptive system replaces this functionality automatically.Three new config options added under
datafusion.execution.parquet:filter_pushdown_min_bytes_per_sec(default: 52428800)filter_collecting_byte_ratio_threshold(default: 0.15)filter_confidence_z(default: 2.0)Changed default behavior when
pushdown_filters = true. Previously, all filters were unconditionally pushed into the Parquet reader. Now, the adaptive system decides per-filter based on byte-ratio thresholds and runtime effectiveness measurements. To restore the old behavior of pushing all filters unconditionally, setfilter_pushdown_min_bytes_per_sec = 0.0.EXPLAIN plan output changes. Dynamic join filters now display as
Optional(DynamicFilter [...])instead ofDynamicFilter [...], reflecting their new optional wrapper.Deprecated
predicate()method signature changed.ParquetSource::predicate()now returnsOption<Arc<dyn PhysicalExpr>>(owned) instead ofOption<&Arc<dyn PhysicalExpr>>(reference). This method was already deprecated in favor offilter().