Skip to content

Commit a8f169e

Browse files
committed
Merge branch 'master' into issues/SPARK-35382/nested_higher_order_functions
2 parents 846ca23 + c0b52da commit a8f169e

File tree

10 files changed

+160
-67
lines changed

10 files changed

+160
-67
lines changed

.github/workflows/build_and_test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ jobs:
8989
id: sync-branch
9090
run: |
9191
apache_spark_ref=`git rev-parse HEAD`
92-
git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF##*/}
92+
git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/}
9393
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' merge --no-commit --progress --squash FETCH_HEAD
9494
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' commit -m "Merged commit"
9595
echo "::set-output name=APACHE_SPARK_REF::$apache_spark_ref"
@@ -186,7 +186,7 @@ jobs:
186186
id: sync-branch
187187
run: |
188188
apache_spark_ref=`git rev-parse HEAD`
189-
git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF##*/}
189+
git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/}
190190
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' merge --no-commit --progress --squash FETCH_HEAD
191191
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' commit -m "Merged commit"
192192
echo "::set-output name=APACHE_SPARK_REF::$apache_spark_ref"
@@ -262,7 +262,7 @@ jobs:
262262
id: sync-branch
263263
run: |
264264
apache_spark_ref=`git rev-parse HEAD`
265-
git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF##*/}
265+
git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/}
266266
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' merge --no-commit --progress --squash FETCH_HEAD
267267
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' commit -m "Merged commit"
268268
echo "::set-output name=APACHE_SPARK_REF::$apache_spark_ref"

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,8 @@ class SparkContext(config: SparkConf) extends Logging {
396396
if (!_conf.contains("spark.app.name")) {
397397
throw new SparkException("An application name must be set in your configuration")
398398
}
399+
// This should be set as early as possible.
400+
SparkContext.fillMissingMagicCommitterConfsIfNeeded(_conf)
399401

400402
_driverLogger = DriverLogger(_conf)
401403

@@ -2985,6 +2987,30 @@ object SparkContext extends Logging {
29852987
}
29862988
serviceLoaders.headOption
29872989
}
2990+
2991+
/**
2992+
* This is a helper function to complete the missing S3A magic committer configurations
2993+
* based on a single conf: `spark.hadoop.fs.s3a.bucket.<bucket>.committer.magic.enabled`
2994+
*/
2995+
private def fillMissingMagicCommitterConfsIfNeeded(conf: SparkConf): Unit = {
2996+
val magicCommitterConfs = conf
2997+
.getAllWithPrefix("spark.hadoop.fs.s3a.bucket.")
2998+
.filter(_._1.endsWith(".committer.magic.enabled"))
2999+
.filter(_._2.equalsIgnoreCase("true"))
3000+
if (magicCommitterConfs.nonEmpty) {
3001+
// Try to enable S3 magic committer if missing
3002+
conf.setIfMissing("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
3003+
if (conf.get("spark.hadoop.fs.s3a.committer.magic.enabled").equals("true")) {
3004+
conf.setIfMissing("spark.hadoop.fs.s3a.committer.name", "magic")
3005+
conf.setIfMissing("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
3006+
"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
3007+
conf.setIfMissing("spark.sql.parquet.output.committer.class",
3008+
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
3009+
conf.setIfMissing("spark.sql.sources.commitProtocolClass",
3010+
"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
3011+
}
3012+
}
3013+
}
29883014
}
29893015

29903016
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
253253
&& Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) {
254254
error("Executor memory must be a positive number")
255255
}
256+
if (driverCores != null && Try(driverCores.toInt).getOrElse(-1) <= 0) {
257+
error("Driver cores must be a positive number")
258+
}
256259
if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
257260
error("Executor cores must be a positive number")
258261
}

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
3232
import org.apache.hadoop.mapred.TextInputFormat
3333
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
3434
import org.json4s.{DefaultFormats, Extraction}
35+
import org.junit.Assert.{assertEquals, assertFalse}
3536
import org.scalatest.concurrent.Eventually
3637
import org.scalatest.matchers.must.Matchers._
3738

@@ -1237,6 +1238,53 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
12371238
}
12381239
}
12391240
}
1241+
1242+
test("SPARK-35383: Fill missing S3A magic committer configs if needed") {
1243+
val c1 = new SparkConf().setAppName("s3a-test").setMaster("local")
1244+
sc = new SparkContext(c1)
1245+
assertFalse(sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
1246+
1247+
resetSparkContext()
1248+
val c2 = c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "false")
1249+
sc = new SparkContext(c2)
1250+
assertFalse(sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
1251+
1252+
resetSparkContext()
1253+
val c3 = c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "true")
1254+
sc = new SparkContext(c3)
1255+
Seq(
1256+
"spark.hadoop.fs.s3a.committer.magic.enabled" -> "true",
1257+
"spark.hadoop.fs.s3a.committer.name" -> "magic",
1258+
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" ->
1259+
"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
1260+
"spark.sql.parquet.output.committer.class" ->
1261+
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
1262+
"spark.sql.sources.commitProtocolClass" ->
1263+
"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
1264+
).foreach { case (k, v) =>
1265+
assertEquals(v, sc.getConf.get(k))
1266+
}
1267+
1268+
// Respect a user configuration
1269+
resetSparkContext()
1270+
val c4 = c1.clone
1271+
.set("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
1272+
.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "true")
1273+
sc = new SparkContext(c4)
1274+
Seq(
1275+
"spark.hadoop.fs.s3a.committer.magic.enabled" -> "false",
1276+
"spark.hadoop.fs.s3a.committer.name" -> null,
1277+
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" -> null,
1278+
"spark.sql.parquet.output.committer.class" -> null,
1279+
"spark.sql.sources.commitProtocolClass" -> null
1280+
).foreach { case (k, v) =>
1281+
if (v == null) {
1282+
assertFalse(sc.getConf.contains(k))
1283+
} else {
1284+
assertEquals(v, sc.getConf.get(k))
1285+
}
1286+
}
1287+
}
12401288
}
12411289

12421290
object SparkContextSuite {

docs/monitoring.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1189,7 +1189,20 @@ This is the component with the largest amount of instrumented metrics
11891189
`spark.metrics.executorMetricsSource.enabled` (default is true)
11901190
- This source contains memory-related metrics. A full list of available metrics in this
11911191
namespace can be found in the corresponding entry for the Executor component instance.
1192-
1192+
1193+
- namespace=ExecutorAllocationManager
1194+
- **note:** these metrics are only emitted when using dynamic allocation. Conditional to a configuration
1195+
parameter `spark.dynamicAllocation.enabled` (default is false)
1196+
- executors.numberExecutorsToAdd
1197+
- executors.numberExecutorsPendingToRemove
1198+
- executors.numberAllExecutors
1199+
- executors.numberTargetExecutors
1200+
- executors.numberMaxNeededExecutors
1201+
- executors.numberExecutorsGracefullyDecommissioned.count
1202+
- executors.numberExecutorsDecommissionUnfinished.count
1203+
- executors.numberExecutorsExitedUnexpectedly.count
1204+
- executors.numberExecutorsKilledByDriver.count
1205+
11931206
- namespace=plugin.\<Plugin Class Name>
11941207
- Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and
11951208
configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ApplyFunctionExpression.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,14 @@ case class ApplyFunctionExpression(
3030
override def dataType: DataType = function.resultType()
3131

3232
private lazy val reusedRow = new GenericInternalRow(children.size)
33-
private lazy val childrenWithIndex = children.zipWithIndex
3433

3534
/** Returns the result of evaluating this expression on a given input Row */
3635
override def eval(input: InternalRow): Any = {
37-
childrenWithIndex.foreach {
38-
case (expr, pos) =>
39-
reusedRow.update(pos, expr.eval(input))
36+
var i = 0
37+
while (i < children.length) {
38+
val expr = children(i)
39+
reusedRow.update(i, expr.eval(input))
40+
i += 1
4041
}
4142

4243
function.produceResult(reusedRow)

sql/core/benchmarks/V2FunctionBenchmark-jdk11-results.txt

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,43 +2,43 @@ OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1046-azure
22
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
33
scalar function (long + long) -> long, result_nullable = true codegen = true: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
44
------------------------------------------------------------------------------------------------------------------------------------------------------------
5-
native_long_add 16079 16684 619 31.1 32.2 1.0X
6-
java_long_add_default 45512 48772 NaN 11.0 91.0 0.4X
7-
java_long_add_magic 19506 19672 262 25.6 39.0 0.8X
8-
java_long_add_static_magic 18770 18901 164 26.6 37.5 0.9X
9-
scala_long_add_default 46895 47662 1136 10.7 93.8 0.3X
10-
scala_long_add_magic 19520 19667 188 25.6 39.0 0.8X
5+
native_long_add 16015 16309 407 31.2 32.0 1.0X
6+
java_long_add_default 48899 49122 352 10.2 97.8 0.3X
7+
java_long_add_magic 19169 19302 117 26.1 38.3 0.8X
8+
java_long_add_static_magic 18308 18373 57 27.3 36.6 0.9X
9+
scala_long_add_default 48773 48922 136 10.3 97.5 0.3X
10+
scala_long_add_magic 18372 18422 44 27.2 36.7 0.9X
1111

1212
OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1046-azure
1313
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
1414
scalar function (long + long) -> long, result_nullable = false codegen = true: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
1515
-------------------------------------------------------------------------------------------------------------------------------------------------------------
16-
native_long_add 17363 17424 67 28.8 34.7 1.0X
17-
java_long_add_default 43884 44592 615 11.4 87.8 0.4X
18-
java_long_add_magic 18927 19100 206 26.4 37.9 0.9X
19-
java_long_add_static_magic 16854 16918 76 29.7 33.7 1.0X
20-
scala_long_add_default 43741 44016 288 11.4 87.5 0.4X
21-
scala_long_add_magic 18770 19022 317 26.6 37.5 0.9X
16+
native_long_add 16414 16452 41 30.5 32.8 1.0X
17+
java_long_add_default 47640 47767 134 10.5 95.3 0.3X
18+
java_long_add_magic 18413 18554 139 27.2 36.8 0.9X
19+
java_long_add_static_magic 16659 16707 43 30.0 33.3 1.0X
20+
scala_long_add_default 47821 47857 48 10.5 95.6 0.3X
21+
scala_long_add_magic 18406 18502 99 27.2 36.8 0.9X
2222

2323
OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1046-azure
2424
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
2525
scalar function (long + long) -> long, result_nullable = true codegen = false: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
2626
-------------------------------------------------------------------------------------------------------------------------------------------------------------
27-
native_long_add 41825 42237 668 12.0 83.6 1.0X
28-
java_long_add_default 53779 53969 175 9.3 107.6 0.8X
29-
java_long_add_magic 131478 133225 NaN 3.8 263.0 0.3X
30-
java_long_add_static_magic 129304 129754 398 3.9 258.6 0.3X
31-
scala_long_add_default 54602 54986 344 9.2 109.2 0.8X
32-
scala_long_add_magic 132066 132243 159 3.8 264.1 0.3X
27+
native_long_add 36335 36366 27 13.8 72.7 1.0X
28+
java_long_add_default 53930 54056 155 9.3 107.9 0.7X
29+
java_long_add_magic 126621 127109 471 3.9 253.2 0.3X
30+
java_long_add_static_magic 126914 127193 251 3.9 253.8 0.3X
31+
scala_long_add_default 55812 55949 141 9.0 111.6 0.7X
32+
scala_long_add_magic 127629 127900 420 3.9 255.3 0.3X
3333

3434
OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1046-azure
3535
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
3636
scalar function (long + long) -> long, result_nullable = false codegen = false: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
3737
--------------------------------------------------------------------------------------------------------------------------------------------------------------
38-
native_long_add 40817 41093 388 12.2 81.6 1.0X
39-
java_long_add_default 54090 54563 425 9.2 108.2 0.8X
40-
java_long_add_magic 129341 129867 613 3.9 258.7 0.3X
41-
java_long_add_static_magic 127292 127432 218 3.9 254.6 0.3X
42-
scala_long_add_default 53397 53670 328 9.4 106.8 0.8X
43-
scala_long_add_magic 128455 128541 138 3.9 256.9 0.3X
38+
native_long_add 37433 37794 312 13.4 74.9 1.0X
39+
java_long_add_default 53629 53946 416 9.3 107.3 0.7X
40+
java_long_add_magic 160091 160605 549 3.1 320.2 0.2X
41+
java_long_add_static_magic 157228 158430 1372 3.2 314.5 0.2X
42+
scala_long_add_default 54026 54197 187 9.3 108.1 0.7X
43+
scala_long_add_magic 160926 161351 526 3.1 321.9 0.2X
4444

0 commit comments

Comments
 (0)