-
Notifications
You must be signed in to change notification settings - Fork 2k
Description
Describe the bug
We have been noticing errors during execution of dynamic-filter-pushdown queries like:
restate sql 'select 1 from sys_invocation_status where partition_key is not null order by modified_at limit 5'
Error: <UNKNOWN> Datafusion error: Arrow error: Invalid argument error: Invalid comparison operation: UInt64 < Timestamp(ms, "+00:00")
Implying that our dynamic filters are referring to the wrong columns (partition_key mixed up with modified_at). The table provider we maintain does not do any column name based remapping so the column indices need to be correct.
I have bisected and the issue is introduced in #18719 ie v52 and v52.1 cc @adriangb, because with that PR, dynamic filters can pass through projections into filterexecs and this exposes the issue in filterexec where the indices are not remapped. I have managed to write a test fails now, and passes again with a fix that I will make a PR for shortly, but it also failed before #18719 (as the bug was still present but wasn't being triggered). I've struggled to write an integration test for this bug, because TestSource uses reassign_expr_columns and so is resistant to index mixups? Is this what I should be doing too?
To Reproduce
#[test]
fn test_filter_with_projection_remaps_post_phase_parent_filters() -> Result<()> {
// Test that FilterExec with a projection must remap parent dynamic
// filter column indices from its output schema to the input schema
// before passing them to the child.
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&input_schema)));
// FilterExec: a > 0, projection=[c@2]
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
));
let filter = FilterExecBuilder::new(predicate, input)
.apply_projection(Some(vec![2]))?
.build()?;
// Output schema should be [c:Float64]
let output_schema = filter.schema();
assert_eq!(output_schema.fields().len(), 1);
assert_eq!(output_schema.field(0).name(), "c");
// Simulate a parent dynamic filter referencing output column c@0
let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("c", 0));
let config = ConfigOptions::new();
let desc = filter.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![parent_filter],
&config,
)?;
// The filter pushed to the child must reference c@2 (input schema),
// not c@0 (output schema).
let parent_filters = desc.parent_filters();
assert_eq!(parent_filters.len(), 1); // one child
assert_eq!(parent_filters[0].len(), 1); // one filter
let remapped = &parent_filters[0][0].predicate;
let display = format!("{remapped}");
assert_eq!(
display, "c@2",
"Post-phase parent filter column index must be remapped \
from output schema (c@0) to input schema (c@2)"
);
Ok(())
}Expected behavior
Passed on parent dynamic filters refer to the input schema indices, but instead they refer to the output schema indices
Additional context
No response