-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-37504][PYTHON] Pyspark create SparkSession with existed session should not pass static conf #34757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…on should not pass static conf
|
Test build #145764 has finished for PR 34757 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145771 has finished for PR 34757 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145788 has finished for PR 34757 at commit
|
|
Test build #145791 has finished for PR 34757 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145800 has finished for PR 34757 at commit
|
|
Kubernetes integration test starting |
| jsparkSession = self._jvm.SparkSession.getDefaultSession().get() | ||
| self._jvm.SparkSession.applyModifiableSettings(jsparkSession, options) | ||
| else: | ||
| jsparkSession = self._jvm.SparkSession(self._jsc.sc(), options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a short comment here that this is the case when we can set static configurations
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
Co-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good otherwise
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145828 has finished for PR 34757 at commit
|
|
Test build #145830 has finished for PR 34757 at commit
|
|
@AngersZhuuuu some tests look being failed. mind taking a look please? |
Seems |
|
You can add getattr(getattr(session._jvm, "SparkSession$"), "MODULE$").applyModifiableSettings(...)You can check how to access this in Java first FWIW. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145844 has finished for PR 34757 at commit
|
Thanks for your help. New knowledge for me.... |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145861 has finished for PR 34757 at commit
|
|
Merged to master. |
| and not self._jvm.SparkSession.getDefaultSession().get().sparkContext().isStopped() | ||
| ): | ||
| jsparkSession = self._jvm.SparkSession.getDefaultSession().get() | ||
| getattr(getattr(self._jvm, "SparkSession$"), "MODULE$").applyModifiableSettings( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu, this actually shows a lot of new warnings (see also #34893). Another reproducer:
./bin/spark-shell --conf spark.executor.memory=8g --conf spark.driver.memory=8g>>> from pyspark.sql.functions import udf
>>> udf(lambda x: x)("a")
21/12/15 14:03:15 WARN SparkSession: Using an existing SparkSession; the static sql configurations will not take effect.
Column<'<lambda>(a)'>There are more places to fix like this:
ml/util.py: self._sparkSession = SparkSession.builder.getOrCreate()
sql/column.py: spark = SparkSession.builder.getOrCreate()
sql/context.py: sparkSession = SparkSession.builder.getOrCreate()
sql/readwriter.py: spark = SparkSession.builder.getOrCreate()
sql/readwriter.py: spark = SparkSession.builder.getOrCreate()
sql/session.py: return SparkSession.builder.getOrCreate()
sql/session.py: return SparkSession.builder.getOrCreate()
sql/streaming.py: spark = SparkSession.builder.getOrCreate()
sql/streaming.py: spark = SparkSession.builder.getOrCreate()
sql/udf.py: spark = SparkSession.builder.getOrCreate()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can't make it in Spark 3.3, I think maybe it's just safer to revert #34757 #34732 and #34559 for now because each patch here will introduce either:
- Unexpected configuration propagation of static SQL configuration, or
- Too much warnings
Separately, I still feel 8424f55 is inefficient. We don't know which configurations don't take effect, or why it keeps complaining (see the example above) for which configuration. We should probably at least print out the keys or lower the level of log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xinrong-databricks actually this is more Python side codes. Are you interested in creating a followup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably at least print out the keys or lower the level of log.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the head-ups, @HyukjinKwon .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly, I will fix it and keep you updated. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably at least print out the keys or lower the level of log.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a PR at #35001 👍
…y set in SparkSession.builder.getOrCreate
### What changes were proposed in this pull request?
This PR proposes to show ignored configurations and hide the warnings for configurations that are already set when invoking `SparkSession.builder.getOrCreate`.
### Why are the changes needed?
Currently, `SparkSession.builder.getOrCreate()` is too noisy even when duplicate configurations are set. Users cannot easily tell which configurations are to fix. See the example below:
```bash
./bin/spark-shell --conf spark.abc=abc
```
```scala
import org.apache.spark.sql.SparkSession
spark.sparkContext.setLogLevel("DEBUG")
SparkSession.builder.config("spark.abc", "abc").getOrCreate
```
```
21:04:01.670 [main] WARN org.apache.spark.sql.SparkSession - Using an existing SparkSession; some spark core configurations may not take effect.
```
It is straitforward when there are few configurations but it is difficult for users to figure out when there are too many configurations especially when these configurations are defined in a property file like 'spark-default.conf' maintained separately by system admins in production.
See also #34757 (comment).
### Does this PR introduce _any_ user-facing change?
Yes.
1. Show ignored configurations in debug level logs:
```bash
./bin/spark-shell --conf spark.abc=abc
```
```scala
import org.apache.spark.sql.SparkSession
spark.sparkContext.setLogLevel("DEBUG")
SparkSession.builder
.config("spark.sql.warehouse.dir", "2")
.config("spark.abc", "abcb")
.config("spark.abcd", "abcb4")
.getOrCreate
```
**Before:**
```
21:13:28.360 [main] WARN org.apache.spark.sql.SparkSession - Using an existing SparkSession; the static sql configurations will not take effect.
21:13:28.360 [main] WARN org.apache.spark.sql.SparkSession - Using an existing SparkSession; some spark core configurations may not take effect.
```
**After**:
```
20:34:30.619 [main] WARN org.apache.spark.sql.SparkSession - Using an existing Spark session; only runtime SQL configurations will take effect.
20:34:30.622 [main] DEBUG org.apache.spark.sql.SparkSession - Ignored static SQL configurations:
spark.sql.warehouse.dir=2
20:34:30.623 [main] DEBUG org.apache.spark.sql.SparkSession - Configurations that might not take effect:
spark.abcd=abcb4
spark.abc=abcb
```
2. Do not issue a warning and hide a configuration already explicitly set (with the same value) before.
```bash
./bin/spark-shell --conf spark.abc=abc
```
```scala
import org.apache.spark.sql.SparkSession
spark.sparkContext.setLogLevel("DEBUG")
SparkSession.builder.config("spark.abc", "abc").getOrCreate // **Ignore** warnings because it's already set in --conf
SparkSession.builder.config("spark.abc.new", "abc").getOrCreate // **Show** warnings for only configuration newly set.
SparkSession.builder.config("spark.abc.new", "abc").getOrCreate // **Ignore** warnings because it's set ^.
```
**Before**:
```
21:13:56.183 [main] WARN org.apache.spark.sql.SparkSession - Using an existing SparkSession; some spark core configurations may not take effect.
21:13:56.356 [main] WARN org.apache.spark.sql.SparkSession - Using an existing SparkSession; some spark core configurations may not take effect.
21:13:56.476 [main] WARN org.apache.spark.sql.SparkSession - Using an existing SparkSession; some spark core configurations may not take effect.
```
**After:**
```
20:36:36.251 [main] WARN org.apache.spark.sql.SparkSession - Using an existing Spark session; only runtime SQL configurations will take effect.
20:36:36.253 [main] DEBUG org.apache.spark.sql.SparkSession - Configurations that might not take effect:
spark.abc.new=abc
```
3. Do not issue a warning and hide runtime SQL configurations in debug log:
```bash
./bin/spark-shell
```
```scala
import org.apache.spark.sql.SparkSession
spark.sparkContext.setLogLevel("DEBUG")
SparkSession.builder.config("spark.sql.ansi.enabled", "true").getOrCreate // **Ignore** warnings for runtime SQL configurations
SparkSession.builder.config("spark.buffer.size", "1234").getOrCreate // **Show** warnings for Spark core configuration
SparkSession.builder.config("spark.sql.source.specific", "abc").getOrCreate // **Show** warnings for custom runtime options
SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate // **Show** warnings for static SQL configurations
```
**Before**:
```
11:11:40.846 [main] WARN org.apache.spark.sql.SparkSession - Using an existing SparkSession; some spark core configurations may not take effect.
11:11:41.037 [main] WARN org.apache.spark.sql.SparkSession - Using an existing SparkSession; some spark core configurations may not take effect.
11:11:41.167 [main] WARN org.apache.spark.sql.SparkSession - Using an existing SparkSession; some spark core configurations may not take effect.
11:11:41.318 [main] WARN org.apache.spark.sql.SparkSession - Using an existing SparkSession; the static sql configurations will not take effect.
```
**After**:
```
10:39:54.870 [main] WARN org.apache.spark.sql.SparkSession - Using an existing Spark session; only runtime SQL configurations will take effect.
10:39:54.872 [main] DEBUG org.apache.spark.sql.SparkSession - Configurations that might not take effect:
spark.buffer.size=1234
10:39:54.988 [main] WARN org.apache.spark.sql.SparkSession - Using an existing Spark session; only runtime SQL configurations will take effect.
10:39:54.988 [main] DEBUG org.apache.spark.sql.SparkSession - Configurations that might not take effect:
spark.sql.source.specific=abc
10:39:55.107 [main] WARN org.apache.spark.sql.SparkSession - Using an existing Spark session; only runtime SQL configurations will take effect.
10:39:55.108 [main] DEBUG org.apache.spark.sql.SparkSession - Ignored static SQL configurations:
spark.sql.warehouse.dir=xyz
```
Note that there is no behaviour change on session state initialization when configurations are not set. For example:
```scala
import org.apache.spark.sql.SparkSession
spark.sparkContext.setLogLevel("DEBUG")
SparkSession.builder.getOrCreate
```
But the session state initialization can be triggered now for static SQL configurations set after this PR. Previously, it was not triggered. This would not introduce something user-facing or a bug but worth noting it.
For runtime SQL configurations, the session state initialization in this code path was introduced at #15295.
### How was this patch tested?
It was manually tested as shown above.
Closes #35001 from HyukjinKwon/SPARK-37727.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
In current pyspark, we have code as below
Here will pass all options to created/existed SparkSession, in Scala code path, spark only pass non-static sql conf.
In this pr, we keep this behavior consistent
Why are the changes needed?
Keep consistent behavior between pyspark and Scala code. when initialize SparkSession, when their are existed Session, only overwrite non-static sql conf.
Does this PR introduce any user-facing change?
User can't overwrite static sql conf when use pyspark with existed SparkSession
How was this patch tested?
Modefied UT