Skip to content

Comments

[RFC] Add lambda support and array_transform udf#18921

Draft
gstvg wants to merge 6 commits intoapache:mainfrom
gstvg:lambda4
Draft

[RFC] Add lambda support and array_transform udf#18921
gstvg wants to merge 6 commits intoapache:mainfrom
gstvg:lambda4

Conversation

@gstvg
Copy link
Contributor

@gstvg gstvg commented Nov 25, 2025

Closes #14205

This PR adds support for lambdas with column capture and the array_transform scalar function which is used to test the lambda implementation. The changes are extensive, across various parts of the codebase, mostly tree traversals. This text aims to justify those changes, and show alternatives which may require less changes although not without trade-offs, so we can decide whether to move this forward or not, and if so, with what approach. For those who want to take a look at the code, don't waste time in the second commit as it is just adding a new field to a struct. There's a build of the documentation of this branch available online

Example array_transform usage:

array_transform([1, 2], v -> v*2)
[2, 4]
//logical
pub struct Lambda {
    pub params: Vec<String>, // in the example above, vec!["v"]
    pub body: Box<Expr>, // in the example above, v*2
}

pub struct LambdaColumn {
    name: String,
    field: FieldRef,
    spans: Spans,
}

//physical
pub struct LambdaExpr {
    params: Vec<String>, // in the example above, vec!["v"]
    body: Arc<dyn PhysicalExpr>, // in the example above, v * 2
}

pub struct LambdaColumn {
    name: String,
    field: FieldRef,
    value: Option<ColumnarValue>,
}

The logical LambdaColumn field is computed during sql planning.
The physical LambdaColumn optional/unbinded ColumnarValue, it's meant to be rewritten during execution, for each batch, by the higher order UDF to a LambdaColumn with a binded value, using the bind_lambdas_columns helper function.

Changes in ScalarUDF[Impl] to support lambdas

Since lambda parameters are defined by the UDF implementation, datafusion doesn't know the type, nullability nor its metadata. So, we add a method to ScalarUDF[Impl], where the implementation returns a Field for each parameter supported for each of its lambdas

ScalarUDF[Impl] lambdas_parameters method
struct/trait ScalarUDF[Impl] {
    /// Returns the parameters that any lambda supports
    fn lambdas_parameters(
        &self,
        args: &[ValueOrLambdaParameter],
    ) -> Result<Vec<Option<Vec<Field>>>> {
        Ok(vec![None; args.len()])
    }
}

pub enum ValueOrLambdaParameter<'a> {
    /// A columnar value with the given field
    Value(FieldRef),
    /// A lambda
    Lambda,
}
ArrayTransform lambdas_parameters implementation
impl ScalarUDFImpl for ArrayTransform {
    fn lambdas_parameters(
        &self,
        args: &[ValueOrLambdaParameter],
    ) -> Result<Vec<Option<Vec<Field>>>> {
        let [ValueOrLambdaParameter::Value(list), ValueOrLambdaParameter::Lambda] =
            args
        else {
            return exec_err!(
                "{} expects a value followed by a lambda, got {:?}",
                self.name(),
                args
            );
        };

        let (field, index_type) = match list.data_type() {
            DataType::List(field) => (field, DataType::Int32),
            DataType::LargeList(field) => (field, DataType::Int64),
            DataType::FixedSizeList(field, _) => (field, DataType::Int32),
            _ => return exec_err!("expected list, got {list}"),
        };

        // we don't need to omit the index in the case the lambda don't specify, e.g. array_transform([], v -> v*2),
        // nor check whether the lambda contains more than two parameters, e.g. array_transform([], (v, i, j) -> v+i+j),
        // as datafusion will do that for us
        let value = Field::new("value", field.data_type().clone(), field.is_nullable())
            .with_metadata(field.metadata().clone());
        let index = Field::new("index", index_type, false);

        Ok(vec![None, Some(vec![value, index])])
    }
}

In order for the UDF to be able to compute its return field, it usually needs to know the return field of its lambdas, which is the case for array_transform. Because lambdas may capture columns, to compute the return field of a lambda, it's necessary to know the Field of the captured columns, which is only available in the schema and it's not passed as argument to return_fields[_from_args]. To avoid changing that, we internally use the fields returned in the newly added method ScalarUDFImpl::lambdas_parameters paired with the schema to compute the return field of the lambdas, and pass them in ReturnFieldArgs.arg_fields. We also add a slice of bools indicating if the arg in the position `i` is a lambda or not. Finally, we add a helper method to_lambda_args which merges the data in arg_fields and lambdas into a vec of ValueOrField enums allowing convenient pattern matching instead of having to inspect both arg_fields and lambdas.
ReturnFieldArgs changes
pub struct ReturnFieldArgs<'a> {
    /// The data types of the arguments to the function
    ///
    /// If argument `i` to the function is a lambda, it will be the return field of the
    /// lambda expression when evaluated with the arguments returned from  
    /// ScalarUDFImpl::lambdas_parameters`
    ///
    /// For example, with `array_transform([1], v -> v == 5)`
    /// this field will be `[Field::new("", DataType::List(DataType::Int32), false), Field::new("", DataType::Boolean, false)]`
    pub arg_fields: &'a [FieldRef],
    ... skipped fields
    /// Is argument `i` to the function a lambda?
    ///
    /// For example, with `array_transform([1], v -> v == 5)`
    /// this field will be `[false, true]`
    pub lambdas: &'a [bool],
}

/// A tagged Field indicating whether it correspond to a value or a lambda argument
#[derive(Debug)]
pub enum ValueOrLambdaField<'a> {
    /// The Field of a ColumnarValue argument
    Value(&'a FieldRef),
    /// The Field of the return of the lambda body when evaluated with the parameters from ScalarUDF::lambda_parameters
    Lambda(&'a FieldRef),
}

impl<'a> ReturnFieldArgs<'a> {
    /// Based on self.lambdas, encodes self.arg_fields to tagged enums
    /// indicating whether it correspond to a value or a lambda argument
    pub fn to_lambda_args(&self) -> Vec<ValueOrLambdaField<'a>> {
        std::iter::zip(self.arg_fields, self.lambdas)
            .map(|(field, is_lambda)| {
                if *is_lambda {
                    ValueOrLambdaField::Lambda(field)
                } else {
                    ValueOrLambdaField::Value(field)
                }
            })
            .collect()
    }
}
ArrayTransform return_field_from_args implementation
impl ScalarUDFImpl for ArrayTransform {
    fn return_field_from_args(
        &self,
        args: datafusion_expr::ReturnFieldArgs,
    ) -> Result<Arc<Field>> {
        let args = args.to_lambda_args();

        let [ValueOrLambdaField::Value(list), ValueOrLambdaField::Lambda(lambda)] =
            take_function_args(self.name(), &args)?
        else {
            return exec_err!(
                "{} expects a value followed by a lambda, got {:?}",
                self.name(),
                args
            );
        };

        // lambda is the resulting field of executing the lambda body
        // with the parameters returned in lambdas_parameters
        let field = Arc::new(Field::new(
            Field::LIST_FIELD_DEFAULT_NAME,
            lambda.data_type().clone(),
            lambda.is_nullable(),
        ));

        let return_type = match list.data_type() {
            DataType::List(_) => DataType::List(field),
            DataType::LargeList(_) => DataType::LargeList(field),
            DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size),
            _ => exec_err!("expected list, got {list}")?,
        };

        Ok(Arc::new(Field::new("", return_type, list.is_nullable())))
    }
}

For execution, we add the lambdas fields to ScalarFunctionArgs and the helper method to_lambda_args, similar to the changes in ReturnFieldArgs and its to_lambda_args:
ScalarFunctionArgs changes
pub struct ScalarFunctionArgs {
    /// The evaluated arguments to the function
    /// If it's a lambda, will be `ColumnarValue::Scalar(ScalarValue::Null)`
    ///
    /// For example, with `array_transform([1], v -> v == 5)`
    /// this field will be `[ColumnarValue::Scalar(ScalarValue::List([1])), ColumnarValue::Scalar(ScalarValue::Null)]`
    pub args: Vec<ColumnarValue>,
    /// Field associated with each arg, if it exists
    pub arg_fields: Vec<FieldRef>,
    ....
    /// The lambdas passed to the function
    /// If it's not a lambda it will be `None`
    ///
    /// For example, with `array_transform([1], v -> v == 5)`
    /// this field will be `[None, Some(...)]`
    pub lambdas: Option<Vec<Option<ScalarFunctionLambdaArg>>>,
}

/// A lambda argument to a ScalarFunction
#[derive(Clone, Debug)]
pub struct ScalarFunctionLambdaArg {
    /// The parameters defined in this lambda
    ///
    /// For example, for `array_transform([2], v -> -v)`,
    /// this will be `vec![Field::new("v", DataType::Int32, true)]`
    pub params: Vec<FieldRef>,
    /// The body of the lambda
    ///
    /// For example, for `array_transform([2], v -> -v)`,
    /// this will be the physical expression of `-v`
    pub body: Arc<dyn PhysicalExpr>,
    /// A RecordBatch containing at least the captured columns inside this lambda body, if any
    /// Note that it may contain additional, unspecified columns, but that's an implementation detail
    ///
    /// For example, for `array_transform([2], v -> v + a + b)`,
    /// this will be a `RecordBatch` with at least columns `a` and `b`
    pub captures: Option<RecordBatch>,
}

impl ScalarFunctionArgs {
    pub fn to_lambda_args(&self) -> Vec<ValueOrLambda<'_>> {
        match &self.lambdas {
            Some(lambdas) => std::iter::zip(&self.args, lambdas)
                .map(|(arg, lambda)| match lambda {
                    Some(lambda) => ValueOrLambda::Lambda(lambda),
                    None => ValueOrLambda::Value(arg),
                })
                .collect(),
            None => self.args.iter().map(ValueOrLambda::Value).collect(),
        }
    }
}

/// An argument to a higher-order scalar function
pub enum ValueOrLambda<'a> {
    Value(&'a ColumnarValue),
    Lambda(&'a ScalarFunctionLambdaArg),
}
ArrayTransform invoke_with_args implementation
impl ScalarUDFImpl for ArrayTransform {
    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
        // args.lambda_args allows the convenient match below, instead of inspecting both args.args and args.lambdas
        let lambda_args = args.to_lambda_args();
        let [list_value, lambda] = take_function_args(self.name(), &lambda_args)?;

        let (ValueOrLambda::Value(list_value), ValueOrLambda::Lambda(lambda)) =
            (list_value, lambda)
        else {
            return exec_err!(
                "{} expects a value followed by a lambda, got {:?}",
                self.name(),
                &lambda_args
            );
        };

        let list_array = list_value.to_array(args.number_rows)?;

        // if any column got captured, we need to adjust it to the values arrays,
        // duplicating values of list with multiple values and removing values of empty lists
        // list_indices is not cheap so is important to avoid it when no column is captured
        let adjusted_captures = lambda
            .captures
            .as_ref()
            .map(|captures| take_record_batch(captures, &list_indices(&list_array)?))
            .transpose()?;

        // use closures and merge_captures_with_lazy_args so that it calls only the needed ones based on the number of arguments
        // avoiding unnecessary computations
        let values_param = || Ok(Arc::clone(list_values(&list_array)?));
        let indices_param = || elements_indices(&list_array);

        let binded_body = bind_lambda_columns(
            Arc::clone(&lambda.body),
            &lambda.params,
            &[&values_param, &indices_param],
        )?;

        // call the transforming expression with the record batch composed of the list values merged with captured columns
        let transformed_values = binded_body
            .evaluate(&adjusted_captures.unwrap_or_else(|| {
                RecordBatch::try_new_with_options(
                    Arc::new(Schema::empty()),
                    vec![],
                    &RecordBatchOptions::new().with_row_count(Some(list_values.len())),
                )
                .unwrap()
            }))?
            .into_array(list_values.len())?;


        let field = match args.return_field.data_type() {
            DataType::List(field)
            | DataType::LargeList(field)
            | DataType::FixedSizeList(field, _) => Arc::clone(field),
            _ => {
                return exec_err!(
                    "{} expected ScalarFunctionArgs.return_field to be a list, got {}",
                    self.name(),
                    args.return_field
                )
            }
        };

        let transformed_list = match list_array.data_type() {
            DataType::List(_) => {
                let list = list_array.as_list();

                Arc::new(ListArray::new(
                    field,
                    list.offsets().clone(),
                    transformed_values,
                    list.nulls().cloned(),
                )) as ArrayRef
            }
            DataType::LargeList(_) => {
                let large_list = list_array.as_list();

                Arc::new(LargeListArray::new(
                    field,
                    large_list.offsets().clone(),
                    transformed_values,
                    large_list.nulls().cloned(),
                ))
            }
            DataType::FixedSizeList(_, value_length) => {
                Arc::new(FixedSizeListArray::new(
                    field,
                    *value_length,
                    transformed_values,
                    list_array.as_fixed_size_list().nulls().cloned(),
                ))
            }
            other => exec_err!("expected list, got {other}")?,
        };

        Ok(ColumnarValue::Array(transformed_list))
    }
}

Outdated # Changes in tree traversals

Using this query as an example:

create table t as select 1 as a, [[2, 3]] as b, 1 as c;

select a, b, c, array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) from t;

a | b | c | array_transform(b, (b, i) -> array_transform(b, b -> b + c + i))
1, [[2, 3]], 1, [[4, 5]]

Detailing the identifiers in the query:

                                                                                                
                                                                                                     
                                                                                                     
          definition of lambda first parameter, arbitrally named "b", the element of the array being 
          transformed. In the example, a List(Int32) column of [2, 3] . shadows t.b                  
            ^                                                                                        
            |                                                                                        
            | definition of lambda second   <--+                                                     
            | parameter arbitrally named "i"   |                                                     
            | the 1-based index of the element | definition of the only parameter of the lambda,     
            | being transformed: in the        | arbitrally named "b", the element of the array      
            | example, a Int32 column of "1"   | being transformed. Shadows the parameter "b"        
            |                                  | from outer lambda. The index parameter is omitted.   
            |                                  | In the example, a Int32 column with values "2, 3"   
            +-------------------------------+  |                        ^                            
                                            |  |                        |                            
                                            |  |                        |                            
        select a, b, c, array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) from t;     
               |  |  |                  |                            |       |   |   |               
               |  |  |                  v                            |       |   |   |               
               |  |  |  column "b" from table "t", t.b being passed  |       |   |   |               
               |  |  |  as argument to outer array_transform         |       |   |   |               
               |  |  v                                               |       |   |   |               
               |  | projection of column "c" from table "t", t.c     |       |   |   |               
               |  |                                                  |       |   |   |               
               |  v                                                  |       |   |   |               
               | projection of column "b" from table "t", t.b        |       |   |   |               
               |                                                     |       |   |   |               
               v                                                     |       |   |   |               
         projection of column "a" from table "t", t.a                |       |   |   |               
                                                                     |       |   |   |               
                                                                     v       |   |   |               
                         parameter "b" from outer lambda being passed        |   |   |               
                         as argument to the inner array_transform            |   |   |               
                                                                             v   |   |               
                                  reference to parameter "b" from inner lambda   |   |               
                                                                                 |   |               
                                                                                 v   |               
                               reference to column "c" captured from table "t", t.c  |               
                                                                                     |               
                                                                                     v               
                                    reference to parameter "i" captured from outer lambda            
                                                                                                

Note that:

1: lambdas may be nested
2: lambdas may capture columns from the outer scope, be it from the input plan or from another, outer lambdas.
3: lambdas introduces parameters that shadows columns from the outer scope
4: lambdas may support multiple parameters, and it is possible to omit the trailing ones that aren't used. Omitting unnecessary parameters positioned before an used parameter is currently not supported and may incur unnecessary computations

Representing columns referring lambda parameters while being able to differentiate them from regular columns in Expr tree traversals

Because they are identical to regular columns, it is intuitive to use the same logical and physical expression to represent columns referring to lambdas parameters. However, the existing tree traversals were made without taking into account that a column may refer to a lambda parameter, and not a column from the input plan, and so they would behave erratically. In the example query, projection pushdown would try to push the lambda parameter "i", which won't exist in table "t".

Another example:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply(|expr| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // if this is a lambda column, code below will break
            used_columns.insert(col.index());
        }
        Ok(TreeNodeRecursion::Continue)
    });
    ...
}

Therefore, we either make available a way to differentiate them, or use two different expressions:

Option 1. Use the same Column expression, differentiate with a new set of TreeNode methods, *_with_lambdas_params

This PR uses the existing column expression, always unqualified, and adds a new set of TreeNode-like methods on expressions that starts traversals with an empty HashSet, and while traversing the expr tree, when finding a lambda, clone the set and adds the lambda parameters to it, and pass it to the visiting/transforming closure so that it can differentiate the columns

impl Expr {
    pub fn transform_with_lambdas_params<
        F: FnMut(Self, &HashSet<String>) -> Result<Transformed<Self>>,
    >(
        self,
        mut f: F,
    ) -> Result<Transformed<Self>> {}
}
Expr tree traversal with lambdas_params. This query is a modified version of the example query where the inner lambda second parameter is used
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, (b, j) -> b + i + j)) ╷        
╷                                                                       ╷        
╷                                                                       ╷        
╷                        lambdas_params = {}                            ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                    │                                            
                                    ▼                                            
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, (b, j) -> b + i + j)         ╷        
╷                                                                       ╷        
╷                                                                       ╷        
╷                     lambdas_params = { b, i }                         ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                    │                                            
                                    ▼                                            
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, (b, j) -> b + i + j)               ╷        
╷                                                                       ╷        
╷                                                                       ╷        
╷                     lambdas_params = { b, i }                         ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                    │                                            
                                    ▼                                            
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐       
╷                          (b, j) -> b + i + j                           ╷       
╷                                                                        ╷       
╷                                                                        ╷       
╷                      lambdas_params = { b, i, j }                      ╷       
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘       

How minimize_join_filter would looks like:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply_with_lambdas_params(|expr, lambdas_params| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // dont include lambdas parameters
            if !lambdas_params.contains(col.name()) {
                used_columns.insert(col.index());
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}

I started with this option without an idea of how big it would be. It requires using the new TreeNode methods and checking the column in 30+ tree traversals, and so will downstream too. I choose to keep it in this PR so that we only choose/discard this knowing how big it is.

Option 2. New Expr LambdaColumn

Create a new Expr, LambdaColumn, which doesn't require new TreeNode methods, but requires that expr_api users use this new expr when applicable. It requires a fix in expr_simplifier for expressions with a lambda column, and may require similar work in downstream.

struct LambdaColumn {
    name: String,
    spans: Spans,
}

struct LambdaColumn {
    name: String,
    index: usize,
}

Existing code inalterated

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply(|expr| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
                //no need to check because Column is never a LambdaColumn
                used_columns.insert(col.index());
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}

Option 3. Add is_lambda_parameter boolean field to Column

Add is_lambda_parameter to the existing column expression. It won't require new TreeNode methods, but still requires checking the field everywhere that new TreeNode methods are currently used in this PR

//logical
struct Column {
    pub relation: Option<TableReference>,
    pub name: String,
    pub spans: Spans,
    pub is_lambda_parameter: bool,
}

//physical
struct Column {
    name: String,
    index: usize,
    is_lambda_parameter: bool,
}

How minimize_join_filter would look like:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply(|expr| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // dont include lambdas parameters
            if !col.is_lambda_parameter {
                used_columns.insert(col.index());
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}

Comparison between options:

Expr::Column/ColumnExpr Expr::LambdaColumn/LambdaColumnExpr is_lambda_parameter flag in Column
Internal code change New TreeNode methods, use them where applicable Add new expr. Use the correct expr while parsing sql(instrumentation already exists to skip normalization of lambdas parameters in sql parsing) Check the field where applicable. Set the field while parsing sql
Downstream code change If lambda support is desired, use the new TreeNode where applicable. Otherwise none None If lambda support is desired, check the field where applicable Otherwise none
Create a new Expr No Yes No
Requires new TreeNode methods Yes, _with_lambdas_params for both logical and physical expressions No No
Inhibits existing optimizations for exprs with columns of lambdas parameters No expr_simplifier, fixable. But may happen in downstream too No
expr_api users must reason about No Yes, use the correct expression type Yes, set the flag
Two almost identical Expressions which major difference is the place in the expr tree that they exist No Yes No
Data from the tree leaks into the column node No No Yes, is_lambda_parameter is a information not about the column node itself, but about its place in the tree

Ultimately, I don't have an inclination for any option and I believe it's a decision up to maintainers and those with more contact with downstream users, who have more idea of what option is easier to use. I think that the most laborious being already implemented puts us in a good position to make a choice, and would be easy to change to another option.

Continue in the comment below.

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate catalog Related to the catalog crate common Related to common crate proto Related to proto crate functions Changes to functions implementation datasource Changes to the datasource crate ffi Changes to the ffi crate physical-plan Changes to the physical-plan crate spark labels Nov 25, 2025
@gstvg
Copy link
Contributor Author

gstvg commented Nov 25, 2025

Outdated # Traversing Expr trees with a schema that include lambdas parameters

The parameters of a lambda aren't present in the schema of the plan they belong to. During tree traversals that use a schema to check expressions datatype, nullability and metadata, there must be a way to access a schema which includes those parameters.

Expr tree traversal with wrong schema
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                         a = Int32                                ╷        
╷                         b = List(List(Int32))                    ╷        
╷                         c = Int32                                ╷        
╷                                                                  ╷        
╷              !! missing "i", incorrect "b" type !!               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
╷              !! missing "i", incorrect "b" type !!               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                          a = Int32                               ╷        
╷                          b = List(List(Int32))                   ╷        
╷                          c = Int32                               ╷        
╷                                                                  ╷        
╷              !! missing "i", incorrect "b" type !!               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

Option 1. Per-lambda schema with a new set of TreeNode methods: *_with_schema

Once again, this PR adds another set of TreeNode-like methods on logical and physical expressions, that while traversing expression trees, when they find a ScalarUDF that contains a lambda on its arguments, uses ScalarUDF::lambdas_parameters to create a schema adjusted for each of its arguments, and pass it as an argument to the visiting/transforming function.

impl Expr {
pub fn transform_with_schema<
        F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>,
    >(
        self,
        schema: &DFSchema,
        f: F,
    ) -> Result<Transformed<Self>> {}
}

Example usage:

pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
        let mut has_placeholder = false;
        // Provide the schema as the first argument. 
        // Transforming closure receive an adjusted_schema as argument
        self.transform_with_schema(schema, |mut expr, adjusted_schema| {
            match &mut expr {
                // Default to assuming the arguments are the same type
                Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => {
                    // use adjusted_schema and not schema. Those expressions may contain 
                    // columns referring to a lambda parameter, which Field would only be
                    // available in adjusted_schema and not in schema
                    rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?;
                    rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?;
                }
    ....

In order to add the lambda parameters to schema, we need to take into account DFSchema properties:

"Unqualified fields must be unique not only amongst themselves, but also must have a distinct name from any qualified field names"

Since lambdas parameters are always unqualified, they may conflict with columns of the outer schema, even though those being qualified. To fix this conflict, we can either:

1: Replace the existing column with the lambda parameter, in the same index of the vec of fields of the schema, in order to not change the index of columns to the right of it. That's the current approach in this PR

Expr tree traversal with adjusted schema, replacing conflicts
 +------------------------------------------------------------------+  
 | array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) |  
 |                                                                  |  
 |                         a = Int32                                |  
 |                         b = List(List(Int32))                    |  
 |                         c = Int32                                |  
 +------------------------------------------------------------------+  
                                   |                                   
                                   |                                   
                                   v                                   
 +------------------------------------------------------------------+  
 |             (b, i) -> array_transform(b, b -> b + c + i)         |  
 |                                                                  |  
 |                         a = Int32                                |  
 |                         b = List(Int32)  ! replaced !            |  
 |                         c = Int32                                |  
 |                         i = Int32                                |  
 +------------------------------------------------------------------+  
                                   |                                   
                                   |                                   
                                   v                                   
 +------------------------------------------------------------------+  
 |                 array_transform(b, b -> b + c + i)               |  
 |                                                                  |  
 |                         a = Int32                                |  
 |                         b = List(Int32)   ! replaced !           |  
 |                         c = Int32                                |  
 |                         i = Int32                                |  
 +------------------------------------------------------------------+  
                                   |                                   
                                   |                                   
                                   v                                   
 +------------------------------------------------------------------+ 
 |                          b -> b + c + i                          | 
 |                                                                  | 
 |                           a = Int32                              | 
 |                           b = Int32     ! replaced !             | 
 |                           c = Int32                              | 
 |                           i = Int32                              | 
 +------------------------------------------------------------------+ 
                                                                      


2: Rename the shadowed column to an unique, non-conflicting name and add the lambda parameter to the end of the vec of fields of the schema. This option allows checking if a physical column refers to a lambda parameter by checking if its index is greater or equal than the number of fields of the outer schema. When this information is available, it eliminates the need to use the with_lambdas_params variations of TreeNode methods. It's trivial to change the PR to use this.

Expr tree traversal with adjusted schema, renaming conflicts
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         a = Int32                                ╷        
╷                         b_shadowed1 = List(List(Int32))          ╷        
╷                         c = Int32                                ╷        
╷                         b = List(Int32)                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b_shadowed1 = List(List(Int32))           ╷        
╷                        c = Int32                                 ╷        
╷                        b = List(Int32)                           ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                          a = Int32                               ╷        
╷                          b_shadowed1 = List(List(Int32))         ╷        
╷                          c = Int32                               ╷        
╷                          b_shadowed2 = List(Int32)               ╷        
╷                          b = Int32                               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        



Lambdas usually are evaluated with a different number of rows than that of the outer scope, as in the example, where array_transform is executed with one row, and its lambda with two rows, one for each element of the array. The UDF implementation is responsible for adjusting the captured columns with the number of rows of its parameters with whatever logic makes sense to it. For array_transform, its to copy the value of the captured column for each element of the arrays:
        copied once  a [1]------------------> a 1  
                                                   
     copied 2 times  b [2, 3] --------------> b 2  
                               \                   
         not copied  c []       ------------> b 3     
                                                

This adjustment is costly, so it's necessary to provide a way to the implementation to avoid adjusting uncaptured columns.

It's intuitive to just remove the uncaptured columns, but note in the diagram and in the query below that it can change the index of captured columns. The "c" column has index 2 in the outer scope but ends up with index 1 in the others scopes

Expr tree traversal with a schema with uncaptured columns removed
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a@0 = Int32                               ╷        
╷                        b@1 = List(List(Int32))                   ╷        
╷                        c@2 = Int32                               ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         b@0 = List(Int32)                        ╷        
╷                         c@1 = Int32                              ╷        
╷                         i@2 = Int32                              ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         b@0 = List(Int32)                        ╷        
╷                         c@1 = Int32                              ╷        
╷                         i@2 = Int32                              ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                          b@0 = Int32                             ╷        
╷                          c@1 = Int32                             ╷        
╷                          i@2 = Int32                             ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

select a@0, b@1, c@2, array_transform(b@0, (b@0, i@2) -> array_transform(b@0, b@0 -> b@0 + c@1 + i@2)) from t;

Option 1a: Nullify uncaptured columns

To keep the indices stable, this PR won't remove uncaptured columns, as such, they are still present in the adjusted schema with their original type during tree traversals with the new _with_schema methods. However, to avoid the costly adjustment, when they are passed to the UDF in invoke_with_args, they are substituted with columns with the Null datatype.

Expr execution/evaluation RecordBatch schema with uncaptured columns substituted with Null columns
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Int32                                 ╷        
╷                        b = List(List(Int32))                     ╷        
╷                        c = Int32                                 ╷        
╷                                                                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                         a = Null  ! nullified !                  ╷        
╷                         b = List(Int32)                          ╷        
╷                         c = Int32                                ╷        
╷                         i = Int32                                ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        a = Null  ! nullified !                   ╷        
╷                        b = List(Int32)                           ╷        
╷                        c = Int32                                 ╷        
╷                        i = Int32                                 ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                          a = Null  ! nullified !                 ╷        
╷                          b = Int32                               ╷        
╷                          c = Int32                               ╷        
╷                          i = Int32                               ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

Option 1b TreeNode *_with_indices_mapping

To avoid keeping uncaptured columns in the schema and substituting them with null in the batch, is possible to add another set of TreeNode-like methods on physical expressions that calls the visiting/transforming function with a second parameter of type HashMap<usize, usize> mapping the indices of the current scope with the ones from the outermost scope. This requires that before calling the visiting/transforming function for a physical lambda expression, all its subtree be visited to collect all the captured columns to build the indices mapping. Inner lambdas require the process again and can't reuse the work of the outer lambda. This may be costly for lambdas with complex expressions and/or highly nested.

impl PhysicalExprExt for Arc<dyn PhysicalExpr> {
    pub fn transform_with_indices_mapping<
        F: FnMut(Self, &HashMap<usize, usize>) -> Result<Transformed<Self>>,
    >(
        self,
        mut f: F,
    ) -> Result<Transformed<Self>> {}
}
Expr tree traversal with indices_mapping: "c" has index 2 in the root scope but 1 in the others
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                        indices_mapping = {}                      ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                     indices_mapping = { 1 => 2 }                 ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                    indices_mapping = { 1 => 2 }                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                                                                  ╷        
╷                    indices_mapping = { 1 => 2 }                  ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

The code on minimize_join_filter would change to:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply_with_indices_mapping(|expr, indices_mapping| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // this column may be child of a lambda, where this indice would refer to the lambda
            // scoped schema, which won't include uncaptured columns from the plan input,
            // and therefore may differ from the indices of the schema of the input plan
            // In such cases, indices_mapping contain the mapping to the index of the input plan
            // if a mapping is not found, it should be a column referring to a lambda parameter
            let scoped_index = col.index();
            if let Some(plan_index) = indices_mapping.get(scoped_index) {
                used_columns.insert(plan_index);
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}

Option 2. Create a schema with all parameters from all lambdas for tree traversals

Use a secondary schema containing all parameters from all lambdas. For that, expressions must be transformed, normalizing all lambda parameters and its references, with a unique qualifier per lambda, so they can coexist without conflicts in this schema. A qualifier field would be added to the lambda expr

pub struct Lambda {
    pub qualifier: Option<String>,
    pub params: Vec<String>,
    pub body: Box<Expr>,
}

Schema of the example:

t.a: Int32
t.b: List(List(Int32))
lambda1.b: List(Int32)
lambda1.i: UInt32
lambda2.b: Int32

From my understanding of this video, this is similar to what DuckDB does on its binder, although with differences in the evaluation part. I didn't find any other resource for other open source engines with lambda support, like Clickhouse and Spark.

This works well when dealing with plans nodes, where, during plan creation time or schema recomputation, we can normalize its lambdas, create the extended schema and save it as plan field, exposing it with a method like "lambda_extended_schema", although with an added cost to plan creation/schema recomputation. The lambda normalization actually requires two passes, a first to collect any existing lambda qualifier to avoid reusing them in the last, normalizing pass.

How code would look like:

//from
expr.transform_with_schema(plan.schema(), |node, adjusted_schema| ...)
//to
let schema = plan.lambda_extended_schema();
expr.transform(|node| ...)

Another example:

impl LogicalPlan {
    pub fn replace_params_with_values(
            self,
            param_values: &ParamValues,
        ) -> Result<LogicalPlan> {
            self.transform_up_with_subqueries(|plan| {
                // use plan.lambda_extended_schema() containing lambdas parameters
                // instead of plan.schema() which wont
                let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema());
                let name_preserver = NamePreserver::new(&plan);
                plan.map_expressions(|e| {
                    // if this expression is child of lambda and contain columns referring it's parameters
                    // the lambda_extended_schema already contain them
                    let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?;
    ....

However, when working with functions/methods that deal directly with expressions, detached from a plan, the expression lambdas may be unnormalized, and the extended schema is unavailable. There's a few public methods/functions like that, like infer_placeholder_types for example:

impl Expr {
    pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
        let mut has_placeholder = false;
        self.transform(|mut expr| ...)
        ...
    }
}   

It could either:

1: Require to be only called with normalized expressions, and that the schema argument be the extended schema, or return an error otherwise, which is restrictive and put strain on users

2: Allow to be called with unnormalized expressions, visit the whole expr tree collecting the existing lambdas qualifiers to avoid to avoid duplicate qualifiers in the next step, perform a first transformation to guarantee that the expression lambdas are normalized, create the extended schema, for only then perform the second transformation to infer the placeholder types using the extended schema. While it can document that the returned expression is normalized, it's still a regular Expr which doesn't encode that property in its type. Also, without changing the method signature, it wouldn't even be possible to return the extended schema to allow it to be used again in other places without recomputation. This is costly and won't allow reuse of its costly work



Normalized example:

select t.a, t.b, array_transform(t.b, (lambda1.b, lambda1.i) -> array_transform(lambda1.b, lambda2.b -> lambda2.b + t.a + lambda1.i)) from t;

Just like with the first option, this also sets uncaptured columns to Null, as well as unavailable/out-of-scope lambdas parameters.

Expr tree batch evaluation with a single extended schema
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷        
╷                                                                  ╷        
╷                        t.a = Int32                               ╷        
╷                        t.b = List(List(Int32))                   ╷        
╷                        t.c = Int32                               ╷        
╷                        lambda1.b = Null                          ╷        
╷                        lambda1.i = Null                          ╷        
╷                        lambda2.b = Null                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷             (b, i) -> array_transform(b, b -> b + c + i)         ╷        
╷                                                                  ╷        
╷                        t.a = Null                                ╷        
╷                        t.b = Null                                ╷        
╷                        t.c = Int32                               ╷        
╷                        lambda1.b = List(Int32)                   ╷        
╷                        lambda1.i = Int32                         ╷        
╷                        lambda2.b = Null                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                 array_transform(b, b -> b + c + i)               ╷        
╷                                                                  ╷        
╷                        t.a = Null                                ╷        
╷                        t.b = Null                                ╷        
╷                        t.c = Int32                               ╷        
╷                        lambda1.b = List(Int32)                   ╷        
╷                        lambda1.i = Int32                         ╷        
╷                        lambda2.b = Null                          ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        
                                 │                                          
                                 ▼                                          
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐        
╷                          b -> b + c + i                          ╷        
╷                                                                  ╷        
╷                          t.a = Null                              ╷        
╷                          t.b = Null                              ╷        
╷                          t.c = Int32                             ╷        
╷                          lambda1.b = Null                        ╷        
╷                          lambda1.i = Int32                       ╷        
╷                          lambda2.b = Int32                       ╷        
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘        

With this option, indices are always stable across the whole tree

This option allows checking if a physical column refers to a lambda parameter by checking if its index is greater or equal than the number of fields of the outer schema. When this information is available, it eliminates the need to use the with_lambdas_params variations of TreeNode methods.

Comparison between options:

* per-lambda schema with uncaptured columns set to null per-lambda schema with indices_mapping single extended schema
New set of TreeNode methods Yes, 1, _with_schema for both logical and physical expressions Yes, 2, _with_schema for both logical and physical expressions, and _with_indices_mapping for physical expressions No
Tree traversal added cost Only when encountering a lambda Only when encountering a lambda Zero
Plan creation/ recompute schema added cost Zero Zero Always, regardless of existence of any lambda
Code change, internal New set of TreeNode methods and using them instead of the current ones when applicable 2 new set of TreeNode methods and using them instead of the current ones when applicable Untried, unpredictable
Code change, downstream if lambda support is desired, use the new TreeNode methods instead of the current ones when applicable otherwise none if lambda support is desired, use the new TreeNode methods instead of the current ones when applicable otherwise none Variable, medium when closely associated with a Plan, just call plan. lambda_extended_schema() Unpredictable when plan is unavailable or doesn't exist
Change uncaptured columns DataType to Null Yes No Yes
Presence of unneeded Null columns in the schema during planning and optimizing and in the RecordBatch during execution as a padding/filler to keep indices stable Yes No Yes
Stable column indices across the whole expr tree Yes No Yes
Make _with_lambdas_params unnecessary for physical expressions if Expr::Column is used No Yes No

Splitting this into smaller PRs

If this PR is decided to move forward, it will likely be with smaller PRs. In that case, I already planned a division shown below. It's necessary to analyze the graph, as it doesn't help with the discussion of this text, and it's included here just to show a reasonable granularity of smaller PRs I could find in case it helps decide whether to move this forward or not.

Each rectangular node is a PR. Asymmetrical nodes are a collection of smaller PRs which share the same dependencies and aren't a dependency of any other PR. Green ones can be opened immediately. Gray ones contain unmet dependencies. Merged PRs will be colored with blue.

Left-to-Right full-screen link

Details
graph LR
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end


Loading


Top-to-Bottom full-screen link

Details
graph TB
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end

Loading


Right-to-Left full-screen link

Details
graph RL
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end

Loading


Bottom-to-Top full-screen link

Details
graph BT
    classDef blue fill:blue
    classDef green fill:green

    subgraph "SQL" [SQL 84LOC]
        SQLP[SQL Parse 65LOC]
        SQLU[SQL Unparse 19LOC]
    end
    
    LL[Logical Expr::Lambda 100LOC]:::green
    LL --> CSE[CSE 50LOC]
    P[Plan logical lambda into physical expr 25LOC]
    UDF1["Extend ScalarUDF[Impl] 300LOC"]
    UDF2[ScalarUDF lambda schema helpers 100LOC]
    AM[Non functional array_transform 270LOC]
    PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]

    PL --> PSFL
    UDF1 --> PSFL
    UDF1 --> AM

    %%CILP[Column::is_lambda_parameter 6LOC]
    
    subgraph "Expr::*_with_schema"
        LTN[Expr::*_with_schema API def 50LOC]:::green
        LTNB[make Expr::*_with_schema lambda aware 70LOC]
        LTN --> LTNES[Expr Simplifier 100LOC]
        LTNA>"
            use Expr::*_with_schema in existing code
            Type Coercion 110LOC
            normalize_col[with_schemas_and_ambiguity_check] 31LOC
            SessionState::create_physical_expr 4LOC
            Expr::infer_placeholder_type 1LOC
            ApplyFunctionRewrites 10LOC
            optmize_projections::rewrite_expr 2LOC
            SqlToRel::try_process_group_by_unnest 10LOC
            sql resolve_columns 5LOC
            Example type_coercion_demo 10LOC
        "]
    end
    
    PL[Physical Lambda Expr 108LOC]:::green
    
    subgraph "PhysicalExpr::*_with_schema"
        PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
        PTNA>"
            use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
            PhysicalExprSimplifier 20LOC
            unwrap_cast_in_comparison 10LOC
            AsyncMapper::find_references 1LOC
            Example CustomCastsPhysicalExprAdapter 10LOC
        "]
        PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
    end
    
    subgraph capture
        CS[Capture support 30LOC]
        LCH["List capture helpers 60LOC(4x15)"]
        MCH["Merge capture with arguments helper 66LOC(2x33)"]
        AMCT[array_transform capture support 20LOC]
    end

    PSFL --> CS
    PTN --> CS
    LL --> SQL
    LL --> P
    UDF2 --> LTNES
    UDF2 --> LTNB
    UDF2 --> PTNB
    LTN --> LTNA
    LTN --> LTNB
    LL --> LTNB
    LTN --> UDF2
    LL --> UDF1 --> UDF2
    UDF2 --> P
    PL --> P
    PL --> PTNB
    PTN --> PTNB
    PTN --> PTNA
    AM --> AMCT
    CS --> AMCT
    LCH --> AMCT
    MCH --> AMCT

    LL --> LTNLP2

subgraph "Expr::*_with_lambdas_params"
        LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
        LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
        AAAAA>"
            expr_applicable_for_cols 5LOC
            Expr::add_column_refs[_counts]/any_column_refs 15LOC
            expr_to_columns 10LOC
            columnize_expr 15LOC
            find_columns_referenced_by_expr 10LOC
            find_column_indexes_referenced_by_expr 5LOC
            normalize_col 10LOC
            normalize_col_with_schemas_and_ambiguity_check 15LOC
            replace_col 10LOC
            transform_up_with_lambdas_params 10LOC
            filter_exprs_evaluation_result_on_empty_batch 10LOC
            replace_cols_by_name 10LOC
            optimizer::push_down_filter::contain 15LOC
            ScalarSubqueryToJoin 40LOC
            TableAliasRewriter 20LOC
            Example ShreddedJsonRewriter 30LOC
        "]
    end

    PL --> PTNLP2

    subgraph "PhysicalExpr::*_with_lambdas_params"
        PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda aware]

        PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
        BBBBB>"
            CustomPhysicalExprAdapter 3LOC
            ParquetPushdownChecker 13LOC
            add_offset_to_expr 3LOC
            update_expr 10LOC
            project_ordering 20LOC
            with_new_schema 10LOC
            collect_columns 10LOC
            reassign_expr_columns 10LOC
            DefaultPhysicalExprAdapter 40LOC
            projection pushdown 50LOC
            sort pushdown 40LOC
            projection 50LOC
            stream_join_utils 40LOC
            pruning predicate rewrite_column_expr 5LOC
            Example DefaultValuePhysicalExprAdapter 20LOC
        "]
    end

Loading

@gstvg gstvg changed the title [DRAFT] Add lambda support and array_transform udf [RFC] Add lambda support and array_transform udf Dec 9, 2025
@fbx31
Copy link

fbx31 commented Dec 11, 2025

Thanks a lot @gstvg, huge amount of work apparently (even if I am not enough skilled to judge). I am waiting for such functions since a very long time.. IMHO, this is a MUST HAVE in datafusion. All serious analytics engine provide this kind of functions, DuckDB implementation is very mature, Polars seems to have it as well even if I didn"t tried it yet and I really hope that datafusion will jump into this enhancement very quickly as it is again a big miss.

Use cases like list_filter(list, expr), list_map(list, expr) and list_reduce(list, base, expr) will be available soon.

Again, thanks a lot for your work and I cross the fingers for a quick progress.

@shehabgamin
Copy link
Contributor

@gstvg This is super exciting! I'll review over the next couple of days.

cc @SparkApplicationMaster @andygrove would be great to get your thoughts too 😃

@gstvg
Copy link
Contributor Author

gstvg commented Dec 14, 2025

Thanks @fbx31. This is high priority for me right now. I also hope we can finish this quickly. It's indeed a lot work, but now I believe it ended up that way because I went to fast in the wrong direction... More on the comment below. Thanks again!

@gstvg
Copy link
Contributor Author

gstvg commented Dec 14, 2025

Thanks @shehabgamin. I analyzed the Spark implementation one last time, and realized I could have done similarly since the beginning... I've already tested it locally and pretend to push soon, the diff reduced to ~2K LOC, mostly in new, self contained code with small changes in existing code and without requiring changes in downstream code. I hope you haven't put time reviewing it yet because I believe the spark approach is much better and also easier/faster to review

Basically, for planning, we lazily set the Field in the lambda column itself, instead of injecting it into a DFSchema

//logical
struct LambdaColumn {
    name: String,
    field: Option<FieldRef>,
}

And for evaluation, again, we lazily set the value of a lambda column instead of injecting it into a RecordBatch

//physical
struct LambdaColumn {
    name: String,
    field: FieldRef,
    value: Option<ColumnarValue>,
}

I think I initially ruled that out because other expressions doesn't contain it's own Field, and this have been discussed before in #12604 *, and didn't went forward, which the difference that the other expressions Field's naturally exist in the plan schema, which now I believe justifies this difference.
And most importantly, because I had no idea of how much work and downstream churn injecting the lambdas parameters into the Schema/RecordBatch would cause 🤦

I will push soon and update the PR description, thanks again!

*This is being discussed again in #18845

@github-actions github-actions bot removed core Core DataFusion crate datasource Changes to the datasource crate labels Dec 15, 2025
@github-actions github-actions bot removed the physical-plan Changes to the physical-plan crate label Dec 15, 2025
@gstvg
Copy link
Contributor Author

gstvg commented Dec 15, 2025

This is ready for a first look @shehabgamin 🚀

@fbx31
Copy link

fbx31 commented Dec 15, 2025

Hello @gstvg, thanks for your last submission, some questions came to my mind reading your proposal, I see quite well the map/filter operations that are producing array in result but I am a bit "confused" about the support of "reduce" like operation that could produce any type as output like another array or scalar values with an aggegation value that has to be first intialized by an Expr through the call to HoF and the "propagation" of this value accross the plan architecture of datafusion.
I think about use cases like array_reduce(input: [], init_aggregate_val: Expr, lamba: Expr) with lambda(current_value_of_input, aggregate_value) -> aggregate_value
Is it possible with your proposal ? (I saw potential several parameters for the lambda so I suppose your proposal is compliant ?)
Also, is it possible to "nest" the HoF like: array_transform(arr_transform([], lamda1), lamba2) ?
For this last question, I don't see any "obstacles" but I prefer to confirm with you... :-)
And also, pratically, how to reference "current_value" and "aggregate_value" in the dataframe API? These are not named in the schema and these are not query parameter as well so it's not clear for me how to reference these values inside the expressions:
df.with_column("new_col", array_transform(col("my_array"), max( * 2, 1000)))
Sorry to question like that, but I was bumping my head on the walls on this topic since months without success (not enough mastering datafusion logical/physical plans logic).

Many, many thanks !!!

@gstvg
Copy link
Contributor Author

gstvg commented Dec 16, 2025

Hi @fbx31, reduce is supported too. It isn't included here because it's implementation is not as simple as array_transform and would make this PR even bigger. But if any reviewer/maintainer request it we can add it and make sure it works since the beginning.

Nesting is supported, some tests already cover it

With the new approach you need to use lambda_col("current_value") instead of col("current_value"):

df.with_column(
    "new_col",
    array_transform(
        col("my_array"),
        lambda(
            vec!["current_value"], // define the parameters name 
            lambda_col("current_value") * 2 // the lambda body
        )
    )
)

SQL equivalent:

 array_transform(my_array, current_value -> current_value * 2)

No worries, feel free to ask any questions you have

Unnest(Unnest),
/// Lambda expression
Lambda(Lambda),
LambdaColumn(LambdaColumn),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gstvg Would LambdaVariable be a more accurate name here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done in d844b2d

@shehabgamin
Copy link
Contributor

This is ready for a first look @shehabgamin 🚀

@gstvg Exciting! I've started reviewing, it will take me a few days to get through it all. Thank you for thoroughly explaining all of the options btw, it's super helpful!

@fbx31
Copy link

fbx31 commented Dec 17, 2025

Crystal clear @gstvg thanks a lot, cannot wait to test and use it.
A bit disappointed for reduce operation but i fully understand the drawbacks for the reviewers.
Hope this review will be successfull.
And thanks in advance to all reviewers and of course to the author of this PR.

@linhr
Copy link
Contributor

linhr commented Dec 18, 2025

This is an exciting initiative!

I skimmed through the PR, and I have some thoughts regarding how this can fit into DataFusion's existing setup. I wonder if we can have Expr::LambdaFunction(LambdaFunction), similar to Expr::ScalarFunction(ScalarFunction)? Here are my reasoning:

  1. LambdaFunction can represent a "resolved" lambda function call in the logical plan. In contrast, Expr::Lambda and Expr::LambdaColumn are only fragments whose data types etc. are not well-defined which may be challenging to work with in other parts of the library (e.g. ExprSchemable).
  2. I feel ScalarUDF may not fit cleanly for lambda functions, so we can then have a separate abstraction (e.g. LambdaUDF) inside Expr::LambdaFunction. The ArrayTransform example shows that we would have to use ScalarFunctionArgs which was not originally designed for the lambda use case. (IMO when the function is actually "invoked" with Arrow arrays, the lambda parameter should have been resolved and removed from the argument list.)
  3. Once we have a self-contained logical expression, the entire query analysis flow might become easier to reason about (from SqlToRel to Expr::LambdaFunction to a dedicated PhysicalExpr and the corresponding physical planner).

This is just my rough thoughts. Happy to discuss!

@gstvg
Copy link
Contributor Author

gstvg commented Dec 19, 2025

Thanks @linhr
I think a new LambdaUDF is reasonable, and I'm not strongly inclined towards using ScalarUDF either

Expr::Lambda and Expr::LambdaColumn are only fragments whose data types etc. are not well-defined which may be challenging to work with in other parts of the library (e.g. ExprSchemable).

Yeah, this is the major challenge in this PR. Currently, Expr::Lambda always return DataType::Null and Expr::LambdaVariable embeds a FieldRef which is used to implement the ExprSchemable and don't even look at the DFSchema

when the function is actually "invoked" with Arrow arrays, the lambda parameter should have been resolved and removed from the argument list.

Could you expand this further? It is like LambdaUDF having a resolve_lambdas method which result is passed to the invoke_with_args method?

@linhr
Copy link
Contributor

linhr commented Dec 21, 2025

Could you expand this further? It is like LambdaUDF having a resolve_lambdas method which result is passed to the invoke_with_args method?

Suppose we have array_transform([1, 2], v -> v*2). We could have a trait LambdaUDF and have impl LambdaUDF for ArrayTransform. (Or we follow the existing convention to have struct LambdaUDF and trait LambdaUDFImpl separately.) A logical representation of v -> v*2 is passed to ArrayTransform::new(). For Expr::LambdaFunction(LambdaFunction), we can have LambdaFunction { func: Arc<dyn LambdaUDF>, args: Vec<Expr> } where the non-lambda parameter [1, 2] is stored in args.

During physical planning, we could resolve ArrayTransform into PhysicalArrayTransform which stores v -> v*2 resolved as certain PhysicalExpr. We have a trait PhysicalLambdaUDF and impl PhysicalLambdaUDF for PhysicalArrayTransform. The trait method PhysicalLambdaUDF::invoke_with_args accepts the Arrow array [1, 2] and compute the results. I'd imagine this invocation can be done in a general (physical) LambdaFunctionExpr that works for all lambda functions, similar to ScalarFunctionExpr.

When I worked with ScalarUDF, I notice that the logic required for logical representation, physical planning, and the actual execution are all within a single ScalarUDFImpl trait. If we look at how these trait methods are used by various planning/execution stages, we might get the big picture how a parallel code structure (with multiple traits to separate the responsibilities) can be designed for lambda functions.

I haven't thought about function registry, documentation etc. which we can get for free in the existing ScalarUDF setup. So some more investigation is needed to estimate the amount of work if we explore the route I described above.

@gstvg
Copy link
Contributor Author

gstvg commented Dec 23, 2025

@linhr Thanks, I got it now. I think it's possible to resolve lambda parameters and remove them from the arguments list with ScalarUDF itself, with a few cons compared to a dedicated to LambdaUDF, but with much less code changes and similar enough to compare the alternatives. I believe I'm already finishing it, and will push to another branch soon.

@keen85
Copy link

keen85 commented Jan 25, 2026

@gstvg any updates on this one? 😇

@gstvg
Copy link
Contributor Author

gstvg commented Feb 9, 2026

@linhr
Really really sorry for the delay


I think it's possible to resolve lambda parameters and remove them from the arguments list with ScalarUDF itself, with a few cons compared to a dedicated to LambdaUDF, but with much less code changes and similar enough to compare the alternatives. I believe I'm already finishing it, and will push to another branch soon.

Unfortunately, trying to resolve lambdas with ScalarUDF didn't work well.
So, before moving to a LambdaUDF based implementation, I would like to further discuss the current approach.


Expr::Lambda and Expr::LambdaColumn are only fragments whose data types etc. are not well-defined which may be challenging to work with in other parts of the library (e.g. ExprSchemable).

Yeah, this is the major challenge in this PR. Currently, Expr::Lambda always return DataType::Null and Expr::LambdaVariable embeds a FieldRef which is used to implement the ExprSchemable and don't even look at the DFSchema

I mean, this was the major challenge of the PR on it's first implementation, now that we use LambdaVariable with a Field, I consider this to be solved. And currently Expr::Lambda actually returns the return_field of it's body, but I believe we can also return a field with DataType::Null if deemed better.


LambdaFunction can represent a "resolved" lambda function call in the logical plan. In contrast, Expr::Lambda and Expr::LambdaColumn are only fragments whose data types etc. are not well-defined which may be challenging to work with in other parts of the library (e.g. ExprSchemable).
... when the function is actually "invoked" with Arrow arrays, the lambda parameter should have been resolved and removed from the argument list

Sorry, at first I thought this only means resolving/embedding the lambda body within the UDF itself and removing it from Expr::ScalarFunction.args/LambdaFunction.args, and also removing Expr::Lambda variant from the logical Expr enum and it's physical counterpart, but now I'm not sure it also means omitting the lambda body from TreeNode traversals?

To streamline the discussion, I have a few comments for each of these options, if they're applicable:

Omitting the lambda body from TreeNode traversals

At least the following traversals currently use TreeNode and would require adjust if the body is omitted:

Projection pushdown for column capture, type coercion for correctness, expr simplifier and CSE for performance.

Being only 4 internal traversals, is easy to specially handle them, but since DF is an open system, I believe there should be a way to downstream users to visit/transform lambdas expressions. We could document that they need to be specially handled, outside TreeNode, or we could offer an API for it, and after all, I believe the ideal API would be exactly the TreeNode API.

In my first iteration of this PR, the "outdated" sections of the description and of the first comment, I manually checked every TreeNode usage on logical and physical expressions, and find none where Expr::Lambda should be specially handled(it is simply ignored), and Expr::LambdaVariable only required handling in few places.

Removing Expr::Lambda variant from Expr enum and it's physical counterpart

Removing Expr::Lambda from AST makes some tree traversals more difficult, while keeping it doesn't make difference: it's simply ignored as most expressions in most traversals.

One example is BindLambdaVariable in this PR. Is not important to understand what it does, just that it's bigger, with more boilerplate and harder to read without Expr::Lambda:

struct BindLambdaVariable<'a> {
    variables: HashMap<&'a str, (ArrayRef, usize)>, 
    //(variable value, number of times it has been shadowed by other lambdas variables in inner scopes in the current position of the tree)
}

impl TreeNodeRewriter for BindLambdaVariable<'_> {
    type Node = Arc<dyn PhysicalExpr>;

    fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
        if let Some(lambda_variable) = node.as_any().downcast_ref::<LambdaVariable>() {
            if let Some((value, shadows)) = self.variables.get(lambda_variable.name()) {
                if *shadows == 0 {
                    return Ok(Transformed::yes(Arc::new(
                        lambda_variable.clone().with_value(value.clone()),
                    )));
                }
            }
        } else if let Some(inner_lambda) = node.as_any().downcast_ref::<LambdaExpr>() {
            for param in inner_lambda.params() {
                if let Some((_value, shadows)) = self.variables.get_mut(param.as_str()) {
                    *shadows += 1;
                }
            }
        }

        Ok(Transformed::no(node))
    }

    fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
        if let Some(inner_lambda) = node.as_any().downcast_ref::<LambdaExpr>() {
            for param in inner_lambda.params() {
                if let Some((_value, shadows)) = self.variables.get_mut(param.as_str()) {
                    *shadows -= 1;
                }
            }
        }

        Ok(Transformed::no(node))
    }
}

Implementation without Physical Lambda on AST

fn bind_lambda_variables(
    node: Arc<dyn PhysicalExpr>,
    params: &mut HashMap<&str, (ArrayRef, usize)>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
    node.transform_down(|node| {
        if let Some(lambda_variable) = node.as_any().downcast_ref::<LambdaVariable>() {
            if let Some((value, shadows)) = params.get(lambda_variable.name()) {
                if *shadows == 0 {
                    return Ok(Transformed::yes(Arc::new(
                        lambda_variable.clone().with_value(value.clone()),
                    )));
                }
            }
        } else if let Some(fun) = node.as_any().downcast_ref::<LambdaUDFExpr>() {
            let mut transformed = false;

            let new_lambdas = fun
                .lambdas()
                .iter()
                .map(|(params_names, body)| {
                    for param in *params_names {
                        if let Some((_value, shadows)) = params.get_mut(param.as_str()) {
                            *shadows += 1;
                        }
                    }

                    let new_body = bind_lambda_variables(Arc::clone(body), params)?;
                    transformed |= new_body.transformed;

                    for param in *params_names {
                        if let Some((_value, shadows)) = params.get_mut(param.as_str()) {
                            *shadows -= 1;
                        }
                    }

                    Ok((params_names.to_vec(), new_body.data))
                })
                .collect::<Result<_>>()?;

            let new_args = fun
                .args()
                .iter()
                .map(|arg| {
                    let new_arg = bind_lambda_variables(Arc::clone(arg), params)?;
                    transformed |= new_arg.transformed;
                    Ok(new_arg.data)
                })
                .collect::<Result<_>>()?;

            let fun = fun.with_new_children(new_args, new_lambdas);

            return Ok(Transformed::new(
                Arc::new(fun),
                transformed,
                TreeNodeRecursion::Stop,
            ));
        }

        Ok(Transformed::no(node))
    })
}

Note that this transformation always return a constant TreeNodeRecursion value. If not, TreeNodeRecursion would also needed to be manually handled, adding more boilerplate that TreeNode handles automatically when Expr::Lambda is present

Resolve/embed lambdas on the UDF implementation itself(essentially partition args from lambdas)

By partition args and lambdas, we lose positional data, which are necessary to implement SqlUnparser and logical and physical formatting, leading to either the implementation itself having to implement them, adding some boilerplate, where today is automatically handled by ScalarUDF and SqlUnparser itself, or that the implementation save and expose positional data, that is them used by ScalarUDF and SqlUnparser to de-partition the args list, and them proceed normally.

Some closed system without support for udf with lambdas use a common positioning for lambdas and can just use them without having to store positional data nor lambdas and other args in the same ordered list:

Always last for spark, duckdb and snowflake:

transform(array(1), v -> v*2) -- spark
list_transform([1, 2, NULL, 3], lambda x: x + 1)  -- duckdb
transform([1, 2, 3], a INT -> a * 2) -- snowflake

Always first for clickhouse:

 arrayMap(x -> (x + 2), [1, 2, 3])

But I think that datafusion as an open and extendable system shouldn't stick to a single convention as different deployments may want to support one, another or both conventions.

Furthermore, my interest in lambda functions is to implement union manipulating udfs, which receives multiple lambdas interleaved with regular arguments. Consider an Union<'str' = Utf8, 'num' = Float64>, that could be transformed like this:

union_transform(
    union_value, 
    'str', str -> trim(str),
    'num', num -> num*2
)

where today the only way is with this(if/when support for union_value lands):

case union_tag(union_value)
    when 'str' then union_value('str', trim(union_extract(union_value, 'str')))
    when 'num' then union_value('num', union_extract(union_value, 'num') * 2)
end

For such cases, storing positional data or manually implementing logical and phyiscal formatting and SqlUnparser for every UDF is mandatory, as that positioning doesn't fit the simple conventions like always first or last.

This also omit positional info from the implementation. None lambda udf discussed so far, including union_transform, requires positional info for evaluation, only for logical and physical formatting and SqlUnparser, but I don't like the idea of blocking any future hypothetical udf that requires positional info for evaluation. A theoretical example is union_transform itself supporting a slight shorter syntax for lambdas that returns constants:

union_transform(
    union_value, 
    'str', str -> trim(str),
    'num', _num -> 0 -- lambda that returns constant/scalar
)

union_transform(
    union_value, 
    'str', str -> trim(str),
    'num', 0 -- shorter syntax, pass constant directly
)

That syntax does requires positional info for evaluation. It's really just an example and I don't plan to support it on my union_transform implementation

Instead, if we add a new LambdaFunction expression and LambdaUDF trait, the implementation could own both the lambdas and the other args, so that no partitioning would occur:

struct LambdaFunction { invocation: Box<dyn LambdaInvocation> } //invocation includes all args, both lambdas and non-lambdas
Expr::LambdaFunction(LambdaFuntion::new(ArrayTransform::new(all_args_including_lambdas)))

// instead of 
struct LambdaFunction { func: Arc<dyn LambdaUDF>, args: Vec<Expr> } // func owns only lambdas, and non lambdas are stored on args
Expr::LambdaFunction(LambdaFunction::new(ArrayTransform::new(lambda), args_without_lambdas))

Insights from Spark implementation

I'm don't have much experience with Spark and Scala, I only implemented a custom data source a few years ago, so if something below seems wrong, it probably is.

Spark includes both HigherOrderFunction trait and LambdaFunction class, similar to our current Expr::Lambda (and not the proposed LambdaFunction), as well as NamedLambdaVariable class, similar to ours Expr::LambdaVariable, all of them extending Expression, which in turn extends TreeNode

Some traversals branch directly on LambdaFunction expressions, which in my view support the idea of both keeping Expr::Lambda and exposing the lambda body on the TreeNode API:

Lambda Binder, (similar to ours BindLambdaVariable)
ResolveLambdaVariables
ColumnResolutionHelper
ColumnNodeToExpressionConverter
CheckAnalysis
ColumnNodeToProtoConverter

Some on branch LambdaVariable, which in my view support the ideia exposing of the lambda body on the TreeNode API:

SessionCatalog
HigherOrderFunction.functionForEval
NormalizePlan
TableOutputResolver

Some branch on HigherOrderFunction:

CheckAnalysis
Analyzer
ResolveLambdaVariables

Is true that some traversals branch on HigherOrderFunction expressions, therefore supporting the ideia of a new Expr::LambdaFunction, but since checking if a Expr::ScalarFunction invocation contains lambdas is simple as the code below, and can be made even simpler by adding a helper method contains_lambdas(&self) -> bool to ScalarFunction, I don't think it alone justifies adding a new Expr::LambdaFunction holding a ScalarFunction. If LambdaUDF trait is added then a new Expr::LambdaFunction variant has to be added anyway so this become a non-issue.

match expr {
    Expr::ScalarFunction(fun) if fun.args.iter().any(|arg| matches!(arg, Expr::Lambda(_))) => ...,
    // with helper
    Expr::ScalarFunction(fun) if fun.contains_lambdas() => ...
    ...
}

impl ScalarFunction {
    fn contains_lambdas(&self) -> bool {
        self.args.iter().any(|arg| matches!(arg, Expr::Lambda(_)))
    }

    // Other helpers could be added to like:
    fn lambda_args(&self) -> impl Iterator<Item = &LambdaFunction> {
        self.args.iter().filter_map(|arg| match arg {
            Expr::Lambda(l) => Some(l),
            _ => None
        })
    }
    
    fn non_lambda_args(&self) -> impl Iterator<Item = &Expr> {
        self.args.iter().filter(|arg| !matches(arg, Expr::Lambda(_)))
    }
}

Higher order functions, like ArrayTransform, receive both the regular arg as well as the lambda as regular Expressions, the lambda being expected to be a LambdaFunction:

case class ArrayTransform(
    argument: Expression,
    function: Expression)
  extends ArrayBasedSimpleHigherOrderFunction with CodegenFallback {

  override def dataType: ArrayType = ArrayType(function.dataType, function.nullable)

  override protected def bindInternal(
      f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = {
    val ArrayType(elementType, containsNull) = argument.dataType
    function match {
      case LambdaFunction(_, arguments, _) if arguments.size == 2 =>
        copy(function = f(function, (elementType, containsNull) :: (IntegerType, false) :: Nil))
      case _ =>
        copy(function = f(function, (elementType, containsNull) :: Nil))
    }
  }

And it's instantiated like that:

        val param = NamedLambdaVariable("x", et, containsNull)
        val funcBody = insertMapSortRecursively(param)

        ArrayTransform(e, LambdaFunction(funcBody, Seq(param)))

So, regarding to resolving lambdas/partition lambdas from args, all args are owned by the implementation, and not by HigherOrderFunction, which is just a trait, therefore the lambas are indeed resolved, but so are the other args, and no partitioning occurs. I believe that our current approach would look like this in Spark:
trait Expression, implemented by class HigherOrderFunction which owns a trait HigherOrderFunctionImpl which is implemented by ArrayTransform, where the lambda and the arg would be stored in the class HigherOrderFunction, not in ArrayTransform. The Spark approach implemented in DF would be look the new LambdaFunction expression and LambdaUDF trait cited above, where implementation owns both the lambdas and the other args: Expr::LambdaFunction(LambdaFuntion::new(ArrayTransform::new(all_args_including_lambdas))), instead of Expr::LambdaFunction(LambdaFunction::new(ArrayTransform::new(lambdas), args_without_lambdas))

Finally, running this script on spark-shell returns the following output, showing both that lambda bodies are present in TreeNode traversals as well as LambdaFunction nodes:

val df = spark.sql("SELECT transform(array(1), v -> v*2)")
val transform = df.queryExecution.logical.expressions(0)

transform.foreach { node =>
  println(s"${node.nodeName}: ${node.toString}")
}

println(transform.treeString)
UnresolvedAlias: unresolvedalias(transform(array(1), lambdafunction((v * 2), v)))
UnresolvedFunction: transform(array(1), lambdafunction((v * 2), v))
UnresolvedFunction: array(1)
Literal: 1
LambdaFunction: lambdafunction((v * 2), v)
Multiply: (v * 2)
UnresolvedNamedLambdaVariable: v
Literal: 2
UnresolvedNamedLambdaVariable: v

unresolvedalias('transform('array(1), lambdafunction((lambda 'v * 2), lambda 'v, false)))
+- 'transform('array(1), lambdafunction((lambda 'v * 2), lambda 'v, false))
   :- 'array(1)
   :  +- 1
   +- lambdafunction((lambda 'v * 2), lambda 'v, false)
      :- (lambda 'v * 2)
      :  :- lambda 'v
      :  +- 2
      +- lambda 'v

In short, if we more or less agree with my points above, adding a new LambdaUDF just to not modify ScalarFunctionArgs, IMHO, doesn't seem worth the additional code that get's to be reviewed and maintained both in this PR (which is already big) and in subsequent PRs adding more lambdas UDFs like array_filter and array_fold

What do you think? I'm missing or misunderstanding something? Thanks!
And again, sorry for the delay

cc @keen85

@linhr
Copy link
Contributor

linhr commented Feb 19, 2026

Sorry for missing the update on this thread and thanks for the detailed analysis @gstvg!

The reasoning makes a lot of sense to me. And it's great to see your investigation on the Spark codebase.

I didn't think deeply enough about how the lambda function would need to interact with various parts in DataFusion, such as tree traversal, SQL unparser, and the general ability to preserve argument positions. So the LambdaUDF trait I mentioned indeed has many limitations.

Therefore, feel free to continue the ideas you are pursuing! And let's see if DataFusion maintainers who are familiar with this part of the codebase have anything to add for the approach.

@comphead
Copy link
Contributor

Thanks @gstvg for driving this. I missed this PR and actually was advocating for lambda support in DataFusion roadmap. Let me explore this approach, in high level IMO it makes sense to introduce LambdaUDF trait so taking as example SELECT list_filter([1,2,3,4], x -> x % 2 = 0); it could look like

FunctionExpression
  name: list_filter
  children:
    1. ListExpression [1,2,3,4]
    2. LambdaExpression
         parameters: ["x"]
         body:
            ComparisonExpression (=)
              left:
                 ArithmeticExpression (%)
                    left: ColumnRef("x")
                    right: Constant(2)
              right:
                 Constant(0)

Then we likely need to Bind x to know what is it and finally rewrite it into something computable, perhaps using UDF of (Int) => Boolean and provide a bitmap as the result of valid indices and then extract valid indices from array. It might preserve SIMD execution.

It is great to see this effort is moving

@gstvg
Copy link
Contributor Author

gstvg commented Feb 21, 2026

Thanks @linhr! It most applies to having lambdas and args partitioned, omitting the body on TreeNode and removing Expr::Lambda. Changing the PR to just use a new Expr::LambdaFunction(Arc<dyn LambdaUDF>) where args are unpartitioned, body is exposed on TreeNode and Expr::Lambda is kept is okay, it wouldn't incur in the other cited points, but would make the PR bigger.

Thanks @comphead! The current representation looks somewhat similar to your suggestion, except that is a Expr::ScalarFunction/ScalarFunctionExpr instead of dedicated LambdaFunction, and that references to lambda parameters uses LambdaVariable instead of Column, containing an optional value that should be binded before execution:

FunctionExpression //regular ScalarFunction
  name: list_filter
  children:
    1. ListExpression [1,2,3,4]
    2. LambdaExpression // the added physical LambdaExpr / logical Expr::Lambda
         parameters: ["x"]
         body:
            ComparisonExpression (=)
              left:
                 ArithmeticExpression (%)
                    left: LambdaVariable("x", Field::new("", Int32, false), None) that after binding becomes
                          LambdaVariable("x", Field::new("", Int32, false), Some([1, 2, 3, 4]))
                    right: Constant(2)
              right:
                 Constant(0)

The reason why adding LambdaVariable is easier than trying to use Column is show on the outdated sections on this PR description and on the first comment.

If the added code for a LambdaUDF is considered worthwhile to not modify ScalarUDF and/or because it's a better representation, while keeping others things unchanged, I'm okay with it. But since you mentioned vectorized execution, I just want to make sure that we are not moving to LambdaUDF only for performance because, contrary to Spark, where the LambdaVariable value is set and re-set for every row, which would obviously be terrible for a vectorized engine, here it is only set once per batch, and so can perform as fast as regular expressions. The LambdaVariable value being a ColumnarValue is just a niche optimization for scalar evaluation, we can change it to support ArrayRef only. Native performance has been a goal since the beginning, and a list_filter implementation based on this can be easily vectorized. As an example, #17220 got vectorized execution while also based on ScalarUDF and regular physical expressions, the lambda body is evaluated once per batch and it's output used to filter the list values with the filter kernel, and then the offsets are adjusted:

    let filter_array = lambda.evaluate(&batch)?;
    let ColumnarValue::Array(filter_array) = filter_array else {
        return exec_err!("array_filter requires a lambda that returns an array of booleans");
    };

    let filter_array = as_boolean_array(&filter_array)?;
    let filtered = filter(&values, filter_array)?;

    for row_index in 0..list_array.len() {
        if list_array.is_null(row_index) {
            // Handle null arrays by keeping the offset unchanged
            offsets.push_length(0);
            continue;
        }
        let start = value_offsets[row_index];
        let end = value_offsets[row_index + 1];
        let num_true = filter_array
            .slice(start.as_usize(), (end - start).as_usize())
            .true_count();
        offsets.push_length(num_true);
    }
    let offsets = offsets.finish();
    let list_array = GenericListArray::<OffsetSize>::try_new(
        Arc::clone(field),
        offsets,
        filtered,
        nulls.cloned(),
    )?;

    Ok(Arc::new(list_array))

One thing that can be optimized is use Buffer::count_set_bits_offset instead of BooleanArray::slice and then true_count to avoid 1-2 Arc::clone per loop iteration. There's also a faster way to adjust the offsets when the average sub-list length is small, something like 64 or less, but it's bigger and a bit more intricate, so I choose implement array_transform instead to make the PR smaller. A reasonable fast array_fold is even more complicated, but still implementable in any lambda approach discussed here.

array_transform here is also vectorized: the transforming lambda body is evaluated only once per batch/invoke/evaluate call.


Then we likely need to Bind x to know what is it and finally rewrite it into something computable, perhaps using UDF of (Int) => Boolean

I believe that by using LambdaVariable, planning the lambda body into a physical expr as usual is enough to compute it, without having to use UDFs. One thing that we must take into account is column capture, which is already fully supported here, including shadowing and projection pushdown(columns from the input plan that only appear within a lambda body). As an example, #17220 does not support that, and if tried to support it while using Expr::Column for lambda variables and passing those variables in the RecordBatch to PhysicalExpr::evaluate, would likely get the same problems I got in my first approach, which are also show in the outdated sections of the description and of the first comment here. That's why LambdaVariable exists. Supporting lambdas without capture support is relatively easy, regardless of the approach.

@rluvaton I see you also were not fond of adding physical expr's to ScalarFunctionArgs on #17220. Any comments here?


Finally, how would the new LambdaUDF trait works?
I believe it should look like ScalarFunction?

struct LambdaFunction {
    args: Vec<Expr>,
    fun: Arc<dyn LambdaUDF>,
}

enum Expr {
    ...
    Expr::LambdaFunction(LambdaFunction),
}

It also can be made more like Spark:

trait LambdaInvoker {
    ...
    fn invoke(&self, args: Vec<Expr>) -> Box<dyn LambdaInvocation>;
}

enum Expr {
    ...
    Expr::LambdaFunction(Box<dyn LambdaInvocation>),
}

I believe LambdaUDF/LambdaInvocation trait would look like ScalarUDFImpl except for the lambdas_parameters method, ReturnFieldArgs in return_field_from_args and LambdaFunctionArgs in the invoke method, or any of you imagine more fundamental changes?

Thanks!

@rluvaton
Copy link
Member

rluvaton commented Feb 23, 2026

thanks a lot for this PR.

Here are my thoughts:

Future features that the API should support without or minimal breaking changes

Couple of things to make sure we support or have a way to add them in the future without breaking changes:

  1. Support Map, (Large)List, (Large)ListView, FixedSizeList as the input for lambda
  2. multiple lambdas in a single expression, for example map_key_value(some_map_col, map_key_lambda, map_value_lambda) and each lambda gets a different variables
  3. Lambda expression that access columns that are not in the list itself
    so I can do the following:
    | year |    grades      |
    |------|----------------|
    | 1998 | [1, 2, 3]      |
    | 1999 | [4, 99, 5, 10] |
    | 2000 | [6, 0, null]   |
    
    array_transform(grades, x -> if year <= 1990 then x * 10 else x)
  4. optional arguments for lambda, for example the index of the item in the list
    the optional here is important as I want to avoid creating that input if I don't need to.
  5. Nested lambda expressions: array_transform(matrix, x -> array_transform(x, y -> y * 2))

Things that every lambda implementation would need to handle

  1. replace null lists that is not empty underneath, with nulls with empty list underneath.
    consider the following when the expression can fail: array_transform(list, x -> 1 / x)
    with this input

    fn get_list() -> GenericListArray<i32> {
      GenericListArray::new(
        Arc::new(Field::new_list_field(DataType::Int8, false)),
        OffsetBuffer::<i32>::from_lengths(vec![2, 2, 1]),
        Arc::new(Int8Array::from(vec![1, 2, 0, 3, 4])),
        Some(NullBuffer::from(&[true, false, true])),
      )
    }

    which the second list is null but the underlying value is [0, 3] which if we run the transform on 0 on it, it will fail with division by zero.

    I have a lot of helpers to cleanup the nulls BTW

  2. sliced list and not computing values outside the sliced data

  3. fixed size list (which is different than list as you can't change the underlying nulls to be empty list) with a null list and the underlying values can cause failures, consider the expression array_transform(list, x -> 1 / x) on this input:

fn get_list() -> FixedSizeListArray<i32> {
    FixedSizeListArray::new(
       Arc::new(Field::new_list_field(DataType::Int8, false)),
       3,
       Arc::new(Int8Array::from(vec![
           1, 2, 3,
           0, 1, 1,
           4, 5, 6
       ])),
       Some(NullBuffer::from(&[true, false, true])),
    )
}

In every case here, we should answer:

  1. how would the user handle that?
  2. how could we make it easier for the user?
  3. How would we help them avoid forgetting handling?

I'm conflicted on whether we should fix case 1 for them as it is costly and the user might have prior knowledge to avoid that.

Why a new trait LambdaUDFImpl instead of adding functions on ScalarUDF

I think having a new LambdaUDFImpl is better than adding functions on existing ScalarUDF because:

  1. The ScalarUDF trait will not grow too much and make implementing regular scalar UDFs easier or lambda overwhelming
  2. what if we need to add a required function but only for lambda, we can add it on the new trait with ease and we won't need to do some weird stuff
    to avoid breaking changes.
  3. Less ambiguity on the API.

Implementation specific:

I want to keep the simplicity of ScalarUDF which means that in order to evaluate a lambda expression I don't need to construct stuff, only need to provide the input and maybe some options for future use.

@comphead
Copy link
Contributor

Thanks @rluvaton and @gstvg , its nice you mentioned array_transform, the tricky part for this function is its return type depends on lambda

array_transform(array<T>, function<T, U>) -> array<U>

I want to keep the simplicity of ScalarUDF which means that in order to evaluate a lambda expression I don't need to construct stuff, only need to provide the input and maybe some options for future use.

Right, on high level it could be like

pub struct LambdaExpr {
    /// Parameter names/types already resolved
    pub param_types: Vec<DataType>,

    /// Expression body, what needs to be evaluated, this thing potentially can be UDF
    pub body: Arc<dyn PhysicalExpr>,
}

Impl

impl LambdaExpr {
    pub fn new(
        param_types: Vec<DataType>,
        body: Arc<dyn PhysicalExpr>,
    ) -> Self {
        Self { param_types, body }
    }

    /// Evaluate lambda over provided arrays
    pub fn evaluate_with_args(
        &self,
        args: Vec<ArrayRef>,
    ) -> Result<ArrayRef> {
        // Build synthetic schema
        let fields: Vec<Field> = self.param_types
            .iter()
            .enumerate()
            .map(|(i, dt)| Field::new(format!("arg{}", i), dt.clone(), true))
            .collect();

        let schema = Arc::new(Schema::new(fields));

        let batch = RecordBatch::try_new(schema, args)?;

        self.body.evaluate(&batch)   // this where our UDF would be called
    }
}

So for example x -> x + 1 we need to parse expression and create our Lambda, so we need to modify parser to get structures below from user defined code and there is an existing ticket apache/datafusion-sqlparser-rs#1273

// Parameter x at column 0
let x = Arc::new(ColumnExpr::new(0));

// Literal 1
let one = Arc::new(LiteralExpr::new(
    ScalarValue::Int32(Some(1))
));

// x + 1
let body = Arc::new(BinaryExpr::new(
    x,
    one,
    Operator::Add,
));

// Lambda(x) -> x + 1
let lambda = LambdaExpr::new(
    vec![DataType::Int32],
    body,
);

and call it from caller built in function

fn array_transform(
    list_array: &ListArray,
    lambda: &LambdaExpr,
) -> Result<ListArray> {

    let values = list_array.values().clone();

    // evaluate lambda on flattened child array
    let transformed =
        lambda.evaluate_with_args(vec![values])?;

    Ok(ListArray::new(
        list_array.data_type().clone(),
        list_array.offsets().clone(),
        transformed,
        list_array.nulls().cloned(),
    ))
}

@github-actions github-actions bot added core Core DataFusion crate execution Related to the execution crate datasource Changes to the datasource crate and removed ffi Changes to the ffi crate labels Feb 23, 2026
@gstvg
Copy link
Contributor Author

gstvg commented Feb 24, 2026

@rluvaton @comphead

I updated the PR to a LambdaUDF trait based implementation. It added 1300 lines, totaling 3000, mostly boilerplate from ScalarUDF including a lot of documentation. Was that on the range you were expecting?
Based on the ScalarUDF docs stating that it exists to maintain backwards compatibility with an older API, I included only a LambdaUDF trait and not a struct LambdaUDF + trait LambdaUDFImpl pair to not make even bigger. There's a few small things missing that I want to implement tomorrow, and if all of you are okay with the results, I want to open this to review. My only concern is that the PR size may delay the review, as in #17220 (comment), but since IMHO this PR is simpler, I hope it will be that long to review, despite the size

  1. The ScalarUDF trait will not grow too much and make implementing regular scalar UDFs easier or lambda overwhelming
  2. what if we need to add a required function but only for lambda, we can add it on the new trait with ease and we won't need to do some weird stuff to avoid breaking changes.
  3. Less ambiguity on the API.

Yeah, I think that 2 is the main point. The previous ScalarUDF based approach only added a single method to the trait that already contains 20, it didn't require any change for non-lambda udfs (this would be unacceptable), and compared to the actual LambdaUDF based, lambda UDFs only required 2 additional lines of code per implementation, but, while I can't think of any ... all my previous counter-arguments can fall apart in the future with a single requirement change. There's upfront cost in review time and time to merge, but also more room to work in the future

Support Map, (Large)List, (Large)ListView, FixedSizeList as the input for lambda

Currently any type is accepted and passed to to the implementation to derive the parameters from it, usually the inner values of a list, but I want to work with unions too, for example. But as of now array_transform doesn't handle ListView's (I thought there's no plans to support it overall?). Since the ListView values may contain unreferenced values, should it be compacted, or casted to a regular compacted List? And if so, return the transformed List or cast it to ListView?

multiple lambdas in a single expression, for example map_key_value(some_map_col, map_key_lambda, map_value_lambda) and each lambda gets a different variables

Supported: LambdaFunctionArgs.args can hold multiple lambdas

    fn invoke_with_args(&self, args: LambdaFunctionArgs) -> Result<ColumnarValue> {
        let [list_value, lambda] = take_function_args(self.name(), &args.args)?;

        let (ValueOrLambda::Value(list_value), ValueOrLambda::Lambda(lambda)) =
            (list_value, lambda)
        else {
            return exec_err!(...

Since the beginning I worked to support multiple lambdas to implement union manipulating functions like this:

union_transform(
    union_value, 
    'str', str -> trim(str),
    'num', num -> num*2,
    'bool', bool -> NOT bool,
)

Lambda expression that access columns that are not in the list itself

Already supported and tested (I call it column capture through the PR and comments. Please ignore the comment above the test, I'm going to remove it)

CREATE TABLE t as SELECT 1 as n;
query ?
SELECT array_transform([1, 2], (e) -> n) from t;
----
[1, 1]

optional arguments for lambda, for example the index of the item in the list
the optional here is important as I want to avoid creating that input if I don't need to.

Also supported

        // use closures so that bind_lambda_variables evaluates only the params that are actually referenced
        // avoiding unnecessary computations
        let values_param = || Ok(Arc::clone(list_values));
        let indices_param = || elements_indices(&list_array);

        let binded_body = bind_lambda_variables(
            Arc::clone(&lambda.body),
            &lambda.params,
            &[&values_param, &indices_param],
        )?;

*after a refactor, bind_lambda_params actually eager evaluated all params, but I'll fix that

Nested lambda expressions

Supported

SELECT array_transform(t.v, (v1, i) -> array_transform(v1, (v2, j) -> array_transform(v2, v3 -> j)) ) from t;
----
[[[0, 1], [0]], [[0]], [[]]]

array_transform, the tricky part for this function is its return type depends on lambda

LambdaUDF contains a method lambdas_arguments where the implementation must return the type of the parameters of all it's lambdas when evaluated with a given set of values. Then DF uses this info to compute the type of the lambdas and pass it on return_field_from_args, so the implementation can easily compute it's return type. For example the expr array_transform([1, 2], v -> repeat(v, 'a')), lambdas_arguments would receive [ValueOrLambda::Value(List(Int32)), ValueOrLambda::Lambda] and should return vec![None, Some(vec![DataType::Int32])]. Then return_field_from_args would be called with [ValueOrLambda::Value(List(Int32)), ValueOrLambda::Lambda(Utf8)], where the implementation would need just to return List(Utf8).

return_field_from_args method
pub struct LambdaReturnFieldArgs<'a> {
    /// The data types of the arguments to the function
    ///
    /// If argument `i` to the function is a lambda, it will be the field returned by the
    /// lambda when executed with the arguments returned from `LambdaUDF::lambdas_parameters`
    ///
    /// For example, with `array_transform([1], v -> v == 5)`
    /// this field will be `[Field::new("", DataType::List(DataType::Int32), false), Field::new("", DataType::Boolean, false)]`
    pub arg_fields: &'a [ValueOrLambdaField],
    /// Is argument `i` to the function a scalar (constant)?
    ///
    /// If the argument `i` is not a scalar, it will be None
    ///
    /// For example, if a function is called like `my_function(column_a, 5)`
    /// this field will be `[None, Some(ScalarValue::Int32(Some(5)))]`
    pub scalar_arguments: &'a [Option<&'a ScalarValue>],
}

/// A tagged Field indicating whether it correspond to a value or a lambda argument
#[derive(Clone, Debug)]
pub enum ValueOrLambdaField {
    /// The Field of a ColumnarValue argument
    Value(FieldRef),
    /// The Field of the return of the lambda body when evaluated with the parameters from LambdaUDF::lambda_parameters
    Lambda(FieldRef),
}

    fn return_field_from_args(
        &self,
        args: datafusion_expr::LambdaReturnFieldArgs,
    ) -> Result<Arc<Field>> {
        let [ValueOrLambdaField::Value(list), ValueOrLambdaField::Lambda(lambda)] =
            take_function_args(self.name(), args.arg_fields)?
        else {
            return exec_err!(
                "{} expects a value follewed by a lambda, got {:?}",
                self.name(),
                args
            );
        };

        //TODO: should metadata be copied into the transformed array?

        // lambda is the resulting field of executing the lambda body
        // with the parameters returned in lambdas_parameters
        let field = Arc::new(Field::new(
            Field::LIST_FIELD_DEFAULT_NAME,
            lambda.data_type().clone(),
            lambda.is_nullable(),
        ));

        let return_type = match list.data_type() {
            DataType::List(_) => DataType::List(field),
            DataType::LargeList(_) => DataType::LargeList(field),
            DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size),
            _ => unreachable!(),
        };

        Ok(Arc::new(Field::new("", return_type, list.is_nullable())))
    }
lambdas_parameters method
trait LambdaUDF {
    /// Returns the parameters that any lambda supports
    fn lambdas_parameters(
        &self,
        args: &[ValueOrLambdaParameter],
    ) -> Result<Vec<Option<Vec<Field>>>>;
}

pub enum ValueOrLambdaParameter<'a> {
    /// A columnar value with the given field
    Value(FieldRef),
    /// A lambda
    Lambda,
}

// array_transform implementation

impl LambdaUDF for ArrayTransform {
    fn lambdas_parameters(
        &self,
        args: &[ValueOrLambdaParameter],
    ) -> Result<Vec<Option<Vec<Field>>>> {
        let [ValueOrLambdaParameter::Value(list), ValueOrLambdaParameter::Lambda] =
            args
        else {
            return exec_err!(
                "{} expects a value followed by a lambda, got {:?}",
                self.name(),
                args
            );
        };

        let (field, index_type) = match list.data_type() {
            DataType::List(field) => (field, DataType::Int32),
            DataType::LargeList(field) => (field, DataType::Int64),
            DataType::FixedSizeList(field, _) => (field, DataType::Int32),
            _ => return exec_err!("expected list, got {list}"),
        };

        // we don't need to omit the index in the case the lambda don't specify, e.g. array_transform([], v -> v*2),
        // nor check whether the lambda contains more than two parameters, e.g. array_transform([], (v, i, j) -> v+i+j),
        // as datafusion will do that for us
        let value = Field::new("value", field.data_type().clone(), field.is_nullable())
            .with_metadata(field.metadata().clone());
        let index = Field::new("index", index_type, false);

        Ok(vec![None, Some(vec![value, index])])
    }
}

pub struct LambdaExpr {
/// Parameter names/types already resolved
pub param_types: Vec,
/// Expression body, what needs to be evaluated, this thing potentially can be UDF
pub body: Arc,
}

The current implementation is functional only with the parameters names without requiring their datatypes, and IHMO it's easier to the users to be like that, both expr_api and sql. The expr api could look like this:

    array_transform(
        col("my_array"),
        lambda(
            ["current_value"], // define the parameters name 
            lambda_variable("current_value") * lit(2) // the lambda body
        )
    )

How to parse the clickhouse syntax I believe it's a question for the future: either check that the types match with the ones returned by LambdaUDF::lambdas_parameters, or cast the parameters to the specified type

  1. replace null lists that is not empty underneath
  2. fixed size list with a null list
    I'm conflicted on whether we should fix case 1 for them as it is costly and the user might have prior knowledge to avoid that.

Saw your review in #17220 but thought it mattered only for performance and didn't thought about fallible expressions..
Yeah.. maybe we can add a helper udf that clean the list on user demand, to be called before calling the lambda udf?

For fixed size lists, I think we can replace null sublists with the first non-null sublist of the array, if the non-null also fails then it's a user problem

2 .sliced list and not computing values outside the sliced data

Also saw on #17220 about this but forgot it, thanks for reminding me, will fix soon

In every case here, we should answer:

how would the user handle that?
how could we make it easier for the user?
How would we help them avoid forgetting handling?

I think the best we can do is to document it, add examples and helpers functions and add a lot of comments in one or few LambdaUDFs as reference implementations.

@comphead
Copy link
Contributor

Thanks @gstvg I feel we need some structure here.
I pinged in ASF slack that we are discussing lambda support in this RFC. Also it would be great for ppl not to read through comments but having some simple doc on proposed object and how it would be parsed/evaluated, what traits involved.

I can try to start this doc by going through this RFC, or if you feel more comfortable you can create it, WDYT?

@timsaucer
Copy link
Member

I had a partial implementation for array_transform in #17289 but I wasn't able to get it over the line before I had to work on other things. There is a lot of discussion here so I'm going to try to make some time in the next few days to go over it all, but I am very interested in the directions this goes.

@gstvg
Copy link
Contributor Author

gstvg commented Feb 24, 2026

You are right @comphead, my first implementation was more complex and I thoroughly documented it, but after simplifying it to the current version, I thought it was easy to grasp and poorly documented it, but I was obviously wrong. Despite not liking much my high level writing skills, I believe that writing the doc is my responsibility (there's few edge cases and alternative approaches not discussed yet). But I would really appreciate if you reviewed it, I can push a DOC.md to this branch and we can start a review on it to not add even more comments here, and then finally update the PR description, WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

catalog Related to the catalog crate common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate execution Related to the execution crate functions Changes to functions implementation logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates proto Related to proto crate spark sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for lambda/higher order functions

8 participants