-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-41434][CONNECT][PYTHON] Initial LambdaFunction implementation
#39068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
reviewers can refer to the implementation in PySpark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PySpark invokes UnresolvedNamedLambdaVariable.freshVarName in JVM to get a unique variable name
object UnresolvedNamedLambdaVariable {
// Counter to ensure lambda variable names are unique
private val nextVarNameId = new AtomicInteger(0)
def freshVarName(name: String): String = {
s"${name}_${nextVarNameId.getAndIncrement()}"
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think from looking at this the last time, the reason for the variables is mostly to create unique aliases but not to reference them actually in the plan. Because the expression itself must be a boolean expression. And the lambda function we pass in is really just a expression transformation.
|
The failed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we see if we can do without this one? This is very much an implementation detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just make this an unresolved attribute? And do the heavy lifting in the connect planner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless there is special resolution in Catalyst on this type otherwise +1 to re-use unresolved attribute
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can someone give me a good example what the variable is supposed to do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it are literally the argument names in a lambda, for example x => x + 1, the UnresolvedNamedLambdaVariable is x.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am on the fence about adding this message. We could go the CaseWhen route here as well. The only argument against that are the arguments them self :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would the lambda function look like for using the CaseWhen route?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, that would be simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would the lambda function look like for using the
CaseWhenroute?
this one #38956, we build the expressions in Connect Planner other than let FunctionRegistry do this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, we can remove ExpressionString, UnresolvedStar, UnresolvedRegex, Cast in the same way.
But now I'm not sure whether it is the correct way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is tricky because it's hard to provide clear guidance on what the right approach is. Generally, the downside of the unresolved function approach is that you're using a magic value that someone has to understand. This knowledge is now embedded in the client and cannot be inferred when looking at the protos.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add LambdaFunction back in the protos
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is tricky because it's hard to provide clear guidance on what the right approach is. Generally, the downside of the unresolved function approach is that you're using a magic value that someone has to understand. This knowledge is now embedded in the client and cannot be inferred when looking at the protos.
Agreed, that is what I feel. We should avoid abusing unresolved function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If all arguments must be UnresolvedNamedLambdaVariables. why not define this as
repeated UnresolvedNamedLambdaVariables arguments = 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hitting something weird if using UnresolvedNamedLambdaVariables here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the scala side, a LambdaFunction actually accepts NamedExpressions as argument;
In PySpark, an argument is always a UnresolvedNamedLambdaVariable;
Here we use Expression instead of UnresolvedAttributes, a benefit I can image is, if we need to support more types of Expression, we do not need to change the proto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we simply use repeated String ...? If we think about the SQL API exists(array_col, c -> ...), what we need to provide here is the argument name list, which is really just a list of string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I'm in favor of repeated string since it's clearer about the intention. I think this function is a good example where we need to put guidance on where the correct resolution of function arguments happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's simpler. At least repeated String is enough for existing pyspark's implementations. will update
grundprinzip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would the lambda function look like for using the CaseWhen route?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can someone give me a good example what the variable is supposed to do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think from looking at this the last time, the reason for the variables is mostly to create unique aliases but not to reference them actually in the plan. Because the expression itself must be a boolean expression. And the lambda function we pass in is really just a expression transformation.
432286c to
91b290c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This branch deserves it's own function please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 will add it back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two interesting issues here:
- When someone submits to the API an expression that does not transform into
UnresolvedExpressionthis would throw a weird error message about the name parts but actually the type does not match. - Why the restriction to single part names? Is this a Spark limitation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1, the error message also mentioned UnresolvedAttribute
2, existing implementation in PySpark only use single part name
python/pyspark/sql/connect/column.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding these assertions here is helpful in the Python client but the server side does not do the same assertion. What happens if we drop the assertion on ColumnReference what would happen on the server?
Is the analysis exception not better than the Python assertion>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also check UnresolvedAttribute in the server side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
f8771a9 to
d6aea83
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
existing pyspark only uses single name part, so check it in the server side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we support up to 3 parameters, shall we add an assert here to avoid future mistakes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Here is the validation on number of paramters
grundprinzip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good from my side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I'm in favor of repeated string since it's clearer about the intention. I think this function is a good example where we need to put guidance on where the correct resolution of function arguments happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we get an analysis exception if more than three arguments are submitted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a validation in the client side already but I guess we need that in the server side as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, let me add it in server side
amaliujia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we use Column with UnresolvedAttribute to invoke the function, then at the server side, we replace UnresolvedAttribute with LambdaVariable.
Shall we use Column with LambdaVariable to invoke the function in client side directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the first commit used UnresolvedNamedLambdaVariable
arg_cols: List[Column] = []
for arg in arg_names[: len(parameters)]:
# TODO: How to make sure lambda variable names are unique? RPC for increasing ID?
_uuid = str(uuid.uuid4()).replace("-", "_")
arg_cols.append(Column(UnresolvedNamedLambdaVariable([f"{arg}_{_uuid}"])))
result = f(*arg_cols)
if not isinstance(result, Column):
raise ValueError(f"Callable {f} should return Column, got {type(result)}")
return LambdaFunction(result._expr, [arg._expr for arg in arg_cols])
then switched to UnresolvedAttribute according to the comments #39068 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this can simplify the client-side implementation, but it does save one proto message. At least we should document this in the proto message LambdaFunction: the function body should use UnresolvedAttribute as arguments to build the query plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
568458f to
ab49f07
Compare
|
Merged to master. |
|
thank you all for the reviews! |
…bda functions ### What changes were proposed in this pull request? 1, #39068 reused the `UnresolvedAttribute` for the `UnresolvedNamedLambdaVariable`, but then `Column('x')` and `UnresolvedNamedLambdaVariable('x')` are mixed in `lambda x: x + cdf.x` (since we use `x/y/z` as augment names); this PR adds the `UnresolvedNamedLambdaVariable` back to distinguish between `Column('x')` and `UnresolvedNamedLambdaVariable('x')`; 2, the `refreshVarName` logic in PySpark was added in #32523 to address similar issue in PySpark's Lambda Function, this PR adds a similar function in the Python Client to avoid rewriting the function expression in the server side, which is unnecessary and prone to error . ### Why are the changes needed? before this PR, the nested lambda function doesn't work properly ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? enabled UT and added UT Closes #39619 from zhengruifeng/connect_fix_nested_lambda. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
What changes were proposed in this pull request?
There are 11 lambda functions, this PR adds the basic support for
LambdaFunctionand add theexistsfunction.Why are the changes needed?
for API coverage
Does this PR introduce any user-facing change?
yes, new API
How was this patch tested?
added UT