Fix functions with Volatility::Volatile and parameters#13001
Fix functions with Volatility::Volatile and parameters#13001alamb merged 1 commit intoapache:mainfrom
Conversation
ff4b90c to
4c8ebc4
Compare
datafusion/expr/src/udf.rs
Outdated
| not_impl_err!( | ||
| "Function {} does not implement invoke_batch but called", | ||
| self.name() | ||
| ) |
There was a problem hiding this comment.
🤔 It seems like the ideal outcome would be for all ScalarUDFs to implement this method (as it covers invoke, invoke_no_args as well).
Would you be open to changing this so it uses a default implementation like this?
| not_impl_err!( | |
| "Function {} does not implement invoke_batch but called", | |
| self.name() | |
| ) | |
| if _args.empty() { | |
| self.invoke_no_args(number_rows) | |
| } else { | |
| self.invoke(args) | |
| } |
Then the function implementation could decide what to do with that information
| "+-----------+", // | ||
| "| str |", // | ||
| "+-----------+", // | ||
| "| 1. test_1 |", // |
There was a problem hiding this comment.
what is the meaning of the trailing // ?
Also, it seems like the indexes repeat (multiple with 1) imply invoke is run multiple times - perhaps we could set target_partitions to 1 on the SessionContext so the data wasn't repartitioned?
There was a problem hiding this comment.
I myself did not understand what the // are for, but I left it because this style was used above.
There was a problem hiding this comment.
the invoke_batch method is called once. In the logic of the function itself, I wrote the indexing of the module by the contents of the lines.
1. test_1
2. test_1
3. test_1
4. test_1
1. test_2
2. test_2
3. test_2
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
Outdated
Show resolved
Hide resolved
| let output = match self.args.is_empty() { | ||
| true => self.fun.invoke_no_args(batch.num_rows()), | ||
| false => self.fun.invoke(&inputs), | ||
| false => match self.fun.signature().volatility { |
There was a problem hiding this comment.
If you modified invoke_batch as above, we could change this code to simply call self.fun.invoke_batch() always
4c8ebc4 to
1e6ad19
Compare
| /// | ||
| /// [invoke_no_args]: ScalarUDFImpl::invoke_no_args | ||
| fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue>; | ||
| fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> { |
There was a problem hiding this comment.
Nice! -- as a follow on PR I think we should deprecate the other two functions (invoke_no_args and invoke) telling people to use invoke instead
Is this ok with you @jayzhan211 ?
There was a problem hiding this comment.
as a follow on PR I think we should deprecate the other two functions (
invoke_no_argsandinvoke) telling people to useinvokeinstead
did you mean invoke_batch?
yes, it would be great to have only one invoke entry-point
| assert_batches_eq!(expected, &result); | ||
|
|
||
| let result = | ||
| plan_and_collect(&ctx, "select add_index_to_string('test') AS str from t") // with fixed function parameters |
| /// | ||
| /// [invoke_no_args]: ScalarUDFImpl::invoke_no_args | ||
| fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue>; | ||
| fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> { |
There was a problem hiding this comment.
as a follow on PR I think we should deprecate the other two functions (
invoke_no_argsandinvoke) telling people to useinvokeinstead
did you mean invoke_batch?
yes, it would be great to have only one invoke entry-point
datafusion/expr/src/udf.rs
Outdated
| _args: &[ColumnarValue], | ||
| _number_rows: usize, |
There was a problem hiding this comment.
These are not unused, let's remove leading _ from arg names
datafusion/expr/src/udf.rs
Outdated
| /// The function should be used for signatures with [`datafusion_expr_common::signature::Volatility::Volatile`] | ||
| /// and with arguments. |
There was a problem hiding this comment.
Only for these?
if yes => the function should be named appropriate invoke_volatile)
if no => remove the comment
There was a problem hiding this comment.
I removed this comment. This function is always called in the current implementation.
| } | ||
| { |
There was a problem hiding this comment.
Nit: consider separating test cases into separate test functions, this would given them descriptive names
There was a problem hiding this comment.
These tests are very similar, so they don't break down into two parts well.
| /// Volatile UDF that should be append a different value to each row | ||
| struct AddIndexToStringScalarUDF { |
There was a problem hiding this comment.
The volatility is important, let's reflect it in the function name -- it's this function's main purpose, not an attribute it happens to have.
AddIndexToStringVolatileScalarUDF
| Ok(()) | ||
| } | ||
|
|
||
| /// Volatile UDF that should be append a different value to each row |
There was a problem hiding this comment.
| /// Volatile UDF that should be append a different value to each row | |
| /// Volatile UDF that should append a different value to each row | |
| #[derive(Debug)] |
| impl std::fmt::Debug for AddIndexToStringScalarUDF { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | ||
| f.debug_struct("ScalarUDF") | ||
| .field("name", &self.name) | ||
| .field("signature", &self.signature) | ||
| .field("fun", &"<FUNC>") | ||
| .finish() | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Leverage #[derive(Debug)], to ensure all fields are part of debug.
| impl std::fmt::Debug for AddIndexToStringScalarUDF { | |
| fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
| f.debug_struct("ScalarUDF") | |
| .field("name", &self.name) | |
| .field("signature", &self.signature) | |
| .field("fun", &"<FUNC>") | |
| .finish() | |
| } | |
| } |
| _ => unimplemented!(), | ||
| }; | ||
| Ok(ColumnarValue::Array( | ||
| Arc::new(StringArray::from(answer)) as ArrayRef |
There was a problem hiding this comment.
| Arc::new(StringArray::from(answer)) as ArrayRef | |
| Arc::new(StringArray::from(answer)) |
| } | ||
| _ => unimplemented!(), | ||
| }; | ||
| Ok(ColumnarValue::Array( |
There was a problem hiding this comment.
Is it OK for this function to return array also when it's invoked with ColumnarValue::Scalar only?
There was a problem hiding this comment.
Yes, that's fine. If you return a ColumnarValue::Scalar, all rows will have the same result.
1e6ad19 to
2c68fc9
Compare
Co-authored-by: Agaev Huseyn <[email protected]>
Co-authored-by: Agaev Huseyn <[email protected]>
Co-authored-by: Agaev Huseyn <[email protected]>
Co-authored-by: Agaev Huseyn <[email protected]>
Co-authored-by: Agaev Huseyn <[email protected]>
Co-authored-by: Agaev Huseyn <[email protected]>
Which issue does this PR close?
Closes #13000.
Rationale for this change
These changes will allow functions to be implemented that produce a unique result for each call given the same input.