Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Dec 15, 2022

What changes were proposed in this pull request?

There are 11 lambda functions, this PR adds the basic support for LambdaFunction and add the exists function.

Why are the changes needed?

for API coverage

Does this PR introduce any user-facing change?

yes, new API

How was this patch tested?

added UT

@zhengruifeng
Copy link
Contributor Author

Copy link
Contributor Author

@zhengruifeng zhengruifeng Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PySpark invokes UnresolvedNamedLambdaVariable.freshVarName in JVM to get a unique variable name

object UnresolvedNamedLambdaVariable {

  // Counter to ensure lambda variable names are unique
  private val nextVarNameId = new AtomicInteger(0)

  def freshVarName(name: String): String = {
    s"${name}_${nextVarNameId.getAndIncrement()}"
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think from looking at this the last time, the reason for the variables is mostly to create unique aliases but not to reference them actually in the plan. Because the expression itself must be a boolean expression. And the lambda function we pass in is really just a expression transformation.

@zhengruifeng
Copy link
Contributor Author

The failed ./dev/test-dependencies.sh is irrelevant

@zhengruifeng
Copy link
Contributor Author

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we see if we can do without this one? This is very much an implementation detail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just make this an unresolved attribute? And do the heavy lifting in the connect planner?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is special resolution in Catalyst on this type otherwise +1 to re-use unresolved attribute

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can someone give me a good example what the variable is supposed to do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it are literally the argument names in a lambda, for example x => x + 1, the UnresolvedNamedLambdaVariable is x.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am on the fence about adding this message. We could go the CaseWhen route here as well. The only argument against that are the arguments them self :).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the lambda function look like for using the CaseWhen route?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that would be simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the lambda function look like for using the CaseWhen route?

this one #38956, we build the expressions in Connect Planner other than let FunctionRegistry do this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, we can remove ExpressionString, UnresolvedStar, UnresolvedRegex, Cast in the same way.

But now I'm not sure whether it is the correct way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is tricky because it's hard to provide clear guidance on what the right approach is. Generally, the downside of the unresolved function approach is that you're using a magic value that someone has to understand. This knowledge is now embedded in the client and cannot be inferred when looking at the protos.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add LambdaFunction back in the protos

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is tricky because it's hard to provide clear guidance on what the right approach is. Generally, the downside of the unresolved function approach is that you're using a magic value that someone has to understand. This knowledge is now embedded in the client and cannot be inferred when looking at the protos.

Agreed, that is what I feel. We should avoid abusing unresolved function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all arguments must be UnresolvedNamedLambdaVariables. why not define this as
repeated UnresolvedNamedLambdaVariables arguments = 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hitting something weird if using UnresolvedNamedLambdaVariables here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the scala side, a LambdaFunction actually accepts NamedExpressions as argument;
In PySpark, an argument is always a UnresolvedNamedLambdaVariable;

Here we use Expression instead of UnresolvedAttributes, a benefit I can image is, if we need to support more types of Expression, we do not need to change the proto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we simply use repeated String ...? If we think about the SQL API exists(array_col, c -> ...), what we need to provide here is the argument name list, which is really just a list of string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I'm in favor of repeated string since it's clearer about the intention. I think this function is a good example where we need to put guidance on where the correct resolution of function arguments happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's simpler. At least repeated String is enough for existing pyspark's implementations. will update

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the lambda function look like for using the CaseWhen route?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can someone give me a good example what the variable is supposed to do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think from looking at this the last time, the reason for the variables is mostly to create unique aliases but not to reference them actually in the plan. Because the expression itself must be a boolean expression. And the lambda function we pass in is really just a expression transformation.

@zhengruifeng zhengruifeng force-pushed the connect_function_lambda branch from 432286c to 91b290c Compare December 16, 2022 07:57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch deserves it's own function please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 will add it back

Comment on lines 581 to 586
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two interesting issues here:

  1. When someone submits to the API an expression that does not transform into UnresolvedExpression this would throw a weird error message about the name parts but actually the type does not match.
  2. Why the restriction to single part names? Is this a Spark limitation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1, the error message also mentioned UnresolvedAttribute

2, existing implementation in PySpark only use single part name

Comment on lines 554 to 556
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding these assertions here is helpful in the Python client but the server side does not do the same assertion. What happens if we drop the assertion on ColumnReference what would happen on the server?

Is the analysis exception not better than the Python assertion>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also check UnresolvedAttribute in the server side

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@zhengruifeng zhengruifeng force-pushed the connect_function_lambda branch from f8771a9 to d6aea83 Compare December 19, 2022 04:07
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing pyspark only uses single name part, so check it in the server side

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we support up to 3 parameters, shall we add an assert here to avoid future mistakes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Here is the validation on number of paramters

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good from my side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I'm in favor of repeated string since it's clearer about the intention. I think this function is a good example where we need to put guidance on where the correct resolution of function arguments happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we get an analysis exception if more than three arguments are submitted?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a validation in the client side already but I guess we need that in the server side as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let me add it in server side

Copy link
Contributor

@amaliujia amaliujia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we use Column with UnresolvedAttribute to invoke the function, then at the server side, we replace UnresolvedAttribute with LambdaVariable.

Shall we use Column with LambdaVariable to invoke the function in client side directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the first commit used UnresolvedNamedLambdaVariable

    arg_cols: List[Column] = []
    for arg in arg_names[: len(parameters)]:
        # TODO: How to make sure lambda variable names are unique? RPC for increasing ID?
        _uuid = str(uuid.uuid4()).replace("-", "_")
        arg_cols.append(Column(UnresolvedNamedLambdaVariable([f"{arg}_{_uuid}"])))

    result = f(*arg_cols)

    if not isinstance(result, Column):
        raise ValueError(f"Callable {f} should return Column, got {type(result)}")

    return LambdaFunction(result._expr, [arg._expr for arg in arg_cols])

then switched to UnresolvedAttribute according to the comments #39068 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this can simplify the client-side implementation, but it does save one proto message. At least we should document this in the proto message LambdaFunction: the function body should use UnresolvedAttribute as arguments to build the query plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@zhengruifeng zhengruifeng force-pushed the connect_function_lambda branch from 568458f to ab49f07 Compare December 21, 2022 01:32
@HyukjinKwon
Copy link
Member

Merged to master.

@zhengruifeng
Copy link
Contributor Author

thank you all for the reviews!

@zhengruifeng zhengruifeng deleted the connect_function_lambda branch December 21, 2022 03:44
zhengruifeng added a commit that referenced this pull request Jan 17, 2023
…bda functions

### What changes were proposed in this pull request?
1, #39068 reused the `UnresolvedAttribute` for the `UnresolvedNamedLambdaVariable`, but then `Column('x')` and `UnresolvedNamedLambdaVariable('x')` are mixed in `lambda x: x + cdf.x` (since we use `x/y/z` as augment names); this PR adds the `UnresolvedNamedLambdaVariable` back to distinguish between `Column('x')` and `UnresolvedNamedLambdaVariable('x')`;

2, the `refreshVarName` logic in PySpark was added in #32523 to address similar issue in PySpark's Lambda Function, this PR adds a similar function in the Python Client to avoid rewriting the function expression in the server side, which is unnecessary and prone to error .

### Why are the changes needed?
before this PR, the nested lambda function doesn't work properly

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
enabled UT and added UT

Closes #39619 from zhengruifeng/connect_fix_nested_lambda.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants