Skip to content

Commit 84d817c

Browse files
committed
Merge remote-tracking branch 'upstream/master' into spark-34152
2 parents d4e958a + 2e31e2c commit 84d817c

File tree

129 files changed

+2616
-824
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

129 files changed

+2616
-824
lines changed

.github/workflows/build_and_test.yml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ jobs:
6161
excluded-tags: org.apache.spark.tags.SlowHiveTest
6262
comment: "- other tests"
6363
# SQL tests
64+
- modules: sql
65+
java: 8
66+
hadoop: hadoop3.2
67+
hive: hive2.3
68+
included-tags: org.apache.spark.tags.DedicatedJVMTest
69+
comment: "- dedicated JVM tests"
6470
- modules: sql
6571
java: 8
6672
hadoop: hadoop3.2
@@ -71,7 +77,7 @@ jobs:
7177
java: 8
7278
hadoop: hadoop3.2
7379
hive: hive2.3
74-
excluded-tags: org.apache.spark.tags.ExtendedSQLTest
80+
excluded-tags: org.apache.spark.tags.DedicatedJVMTest,org.apache.spark.tags.ExtendedSQLTest
7581
comment: "- other tests"
7682
env:
7783
MODULES_TO_TEST: ${{ matrix.modules }}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.tags;
19+
20+
import org.scalatest.TagAnnotation;
21+
22+
import java.lang.annotation.ElementType;
23+
import java.lang.annotation.Retention;
24+
import java.lang.annotation.RetentionPolicy;
25+
import java.lang.annotation.Target;
26+
27+
@TagAnnotation
28+
@Retention(RetentionPolicy.RUNTIME)
29+
@Target({ElementType.METHOD, ElementType.TYPE})
30+
public @interface DedicatedJVMTest { }

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,9 +1726,10 @@ package object config {
17261726
ConfigBuilder("spark.eventLog.compression.codec")
17271727
.doc("The codec used to compress event log. By default, Spark provides four codecs: " +
17281728
"lz4, lzf, snappy, and zstd. You can also use fully qualified class names to specify " +
1729-
"the codec. If this is not given, spark.io.compression.codec will be used.")
1729+
"the codec.")
17301730
.version("3.0.0")
1731-
.fallbackConf(IO_COMPRESSION_CODEC)
1731+
.stringConf
1732+
.createWithDefault("zstd")
17321733

17331734
private[spark] val BUFFER_SIZE =
17341735
ConfigBuilder("spark.buffer.size")

core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.resource
2020
import org.apache.spark.annotation.{Evolving, Since}
2121

2222
/**
23-
* An Executor resource request. This is used in conjunction with the ResourceProfile to
23+
* An Executor resource request. This is used in conjunction with the [[ResourceProfile]] to
2424
* programmatically specify the resources needed for an RDD that will be applied at the
2525
* stage level.
2626
*
@@ -39,7 +39,7 @@ import org.apache.spark.annotation.{Evolving, Since}
3939
*
4040
* See the configuration and cluster specific docs for more details.
4141
*
42-
* Use ExecutorResourceRequests class as a convenience API.
42+
* Use [[ExecutorResourceRequests]] class as a convenience API.
4343
*
4444
* @param resourceName Name of the resource
4545
* @param amount Amount requesting

core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,19 @@ class ExecutorResourceRequests() extends Serializable {
3737

3838
private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
3939

40+
/**
41+
* Returns all the resource requests for the task.
42+
*/
4043
def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
4144

45+
/**
46+
* (Java-specific) Returns all the resource requests for the executor.
47+
*/
4248
def requestsJMap: JMap[String, ExecutorResourceRequest] = requests.asJava
4349

4450
/**
4551
* Specify heap memory. The value specified will be converted to MiB.
52+
* This is a convenient API to add [[ExecutorResourceRequest]] for "memory" resource.
4653
*
4754
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
4855
* Default unit is MiB if not specified.
@@ -57,6 +64,7 @@ class ExecutorResourceRequests() extends Serializable {
5764
/**
5865
* Specify off heap memory. The value specified will be converted to MiB.
5966
* This value only take effect when MEMORY_OFFHEAP_ENABLED is true.
67+
* This is a convenient API to add [[ExecutorResourceRequest]] for "offHeap" resource.
6068
*
6169
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
6270
* Default unit is MiB if not specified.
@@ -70,6 +78,7 @@ class ExecutorResourceRequests() extends Serializable {
7078

7179
/**
7280
* Specify overhead memory. The value specified will be converted to MiB.
81+
* This is a convenient API to add [[ExecutorResourceRequest]] for "memoryOverhead" resource.
7382
*
7483
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
7584
* Default unit is MiB if not specified.
@@ -83,6 +92,7 @@ class ExecutorResourceRequests() extends Serializable {
8392

8493
/**
8594
* Specify pyspark memory. The value specified will be converted to MiB.
95+
* This is a convenient API to add [[ExecutorResourceRequest]] for "pyspark.memory" resource.
8696
*
8797
* @param amount Amount of memory. In the same format as JVM memory strings (e.g. 512m, 2g).
8898
* Default unit is MiB if not specified.
@@ -96,6 +106,7 @@ class ExecutorResourceRequests() extends Serializable {
96106

97107
/**
98108
* Specify number of cores per Executor.
109+
* This is a convenient API to add [[ExecutorResourceRequest]] for "cores" resource.
99110
*
100111
* @param amount Number of cores to allocate per Executor.
101112
*/
@@ -111,6 +122,7 @@ class ExecutorResourceRequests() extends Serializable {
111122
* like GPUs are gpu (spark configs spark.executor.resource.gpu.*). If you pass in a resource
112123
* that the cluster manager doesn't support the result is undefined, it may error or may just
113124
* be ignored.
125+
* This is a convenient API to add [[ExecutorResourceRequest]] for custom resources.
114126
*
115127
* @param resourceName Name of the resource.
116128
* @param amount amount of that resource per executor to use.

core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.SparkException
2525
* Trait used to help executor/worker allocate resources.
2626
* Please note that this is intended to be used in a single thread.
2727
*/
28-
trait ResourceAllocator {
28+
private[spark] trait ResourceAllocator {
2929

3030
protected def resourceName: String
3131
protected def resourceAddresses: Seq[String]

core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,13 @@ import org.apache.spark.util.Utils
3535
* Resource profile to associate with an RDD. A ResourceProfile allows the user to
3636
* specify executor and task requirements for an RDD that will get applied during a
3737
* stage. This allows the user to change the resource requirements between stages.
38-
* This is meant to be immutable so user can't change it after building.
38+
* This is meant to be immutable so user can't change it after building. Users
39+
* should use [[ResourceProfileBuilder]] to build it.
40+
*
41+
* @param executorResources Resource requests for executors. Mapped from the resource
42+
* name (e.g., cores, memory, CPU) to its specific request.
43+
* @param taskResources Resource requests for tasks. Mapped from the resource
44+
* name (e.g., cores, memory, CPU) to its specific request.
3945
*/
4046
@Evolving
4147
@Since("3.1.0")
@@ -53,6 +59,9 @@ class ResourceProfile(
5359
private var _maxTasksPerExecutor: Option[Int] = None
5460
private var _coresLimitKnown: Boolean = false
5561

62+
/**
63+
* A unique id of this ResourceProfile
64+
*/
5665
def id: Int = _id
5766

5867
/**
@@ -242,17 +251,39 @@ class ResourceProfile(
242251

243252
object ResourceProfile extends Logging {
244253
// task resources
254+
/**
255+
* built-in task resource: cpus
256+
*/
245257
val CPUS = "cpus"
246258
// Executor resources
247259
// Make sure add new executor resource in below allSupportedExecutorResources
260+
/**
261+
* built-in executor resource: cores
262+
*/
248263
val CORES = "cores"
264+
/**
265+
* built-in executor resource: cores
266+
*/
249267
val MEMORY = "memory"
268+
/**
269+
* built-in executor resource: offHeap
270+
*/
250271
val OFFHEAP_MEM = "offHeap"
272+
/**
273+
* built-in executor resource: memoryOverhead
274+
*/
251275
val OVERHEAD_MEM = "memoryOverhead"
276+
/**
277+
* built-in executor resource: pyspark.memory
278+
*/
252279
val PYSPARK_MEM = "pyspark.memory"
253280

254-
// all supported spark executor resources (minus the custom resources like GPUs/FPGAs)
255-
val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)
281+
/**
282+
* Return all supported Spark built-in executor resources, custom resources like GPUs/FPGAs
283+
* are excluded.
284+
*/
285+
def allSupportedExecutorResources: Array[String] =
286+
Array(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM, OFFHEAP_MEM)
256287

257288
val UNKNOWN_RESOURCE_PROFILE_ID = -1
258289
val DEFAULT_RESOURCE_PROFILE_ID = 0

core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,19 @@ import org.apache.spark.annotation.{Evolving, Since}
2626

2727

2828
/**
29-
* Resource profile builder to build a Resource profile to associate with an RDD.
30-
* A ResourceProfile allows the user to specify executor and task requirements for an RDD
31-
* that will get applied during a stage. This allows the user to change the resource
29+
* Resource profile builder to build a [[ResourceProfile]] to associate with an RDD.
30+
* A [[ResourceProfile]] allows the user to specify executor and task resource requirements
31+
* for an RDD that will get applied during a stage. This allows the user to change the resource
3232
* requirements between stages.
3333
*
3434
*/
3535
@Evolving
3636
@Since("3.1.0")
3737
class ResourceProfileBuilder() {
3838

39+
// Task resource requests specified by users, mapped from resource name to the request.
3940
private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
41+
// Executor resource requests specified by users, mapped from resource name to the request.
4042
private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
4143

4244
def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
@@ -54,11 +56,21 @@ class ResourceProfileBuilder() {
5456
_executorResources.asScala.asJava
5557
}
5658

59+
/**
60+
* Add executor resource requests
61+
* @param requests The detailed executor resource requests, see [[ExecutorResourceRequests]]
62+
* @return This ResourceProfileBuilder
63+
*/
5764
def require(requests: ExecutorResourceRequests): this.type = {
5865
_executorResources.putAll(requests.requests.asJava)
5966
this
6067
}
6168

69+
/**
70+
* Add task resource requests
71+
* @param requests The detailed task resource requests, see [[TaskResourceRequest]]
72+
* @return This ResourceProfileBuilder
73+
*/
6274
def require(requests: TaskResourceRequests): this.type = {
6375
_taskResources.putAll(requests.requests.asJava)
6476
this
@@ -80,7 +92,7 @@ class ResourceProfileBuilder() {
8092
s"task resources: ${_taskResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}"
8193
}
8294

83-
def build: ResourceProfile = {
95+
def build(): ResourceProfile = {
8496
new ResourceProfile(executorResources, taskResources)
8597
}
8698
}

core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,17 @@ package org.apache.spark.resource
2020
import org.apache.spark.annotation.{Evolving, Since}
2121

2222
/**
23-
* A task resource request. This is used in conjunction with the ResourceProfile to
23+
* A task resource request. This is used in conjunction with the [[ResourceProfile]] to
2424
* programmatically specify the resources needed for an RDD that will be applied at the
2525
* stage level.
2626
*
27-
* Use TaskResourceRequests class as a convenience API.
27+
* Use [[TaskResourceRequests]] class as a convenience API.
28+
*
29+
* @param resourceName Resource name
30+
* @param amount Amount requesting as a Double to support fractional resource requests.
31+
* Valid values are less than or equal to 0.5 or whole numbers. This essentially
32+
* lets you configure X number of tasks to run on a single resource,
33+
* ie amount equals 0.5 translates into 2 tasks per resource address.
2834
*/
2935
@Evolving
3036
@Since("3.1.0")

core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,19 @@ class TaskResourceRequests() extends Serializable {
3636

3737
private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
3838

39+
/**
40+
* Returns all the resource requests for the task.
41+
*/
3942
def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
4043

44+
/**
45+
* (Java-specific) Returns all the resource requests for the task.
46+
*/
4147
def requestsJMap: JMap[String, TaskResourceRequest] = requests.asJava
4248

4349
/**
4450
* Specify number of cpus per Task.
51+
* This is a convenient API to add [[TaskResourceRequest]] for cpus.
4552
*
4653
* @param amount Number of cpus to allocate per Task.
4754
*/
@@ -52,7 +59,8 @@ class TaskResourceRequests() extends Serializable {
5259
}
5360

5461
/**
55-
* Amount of a particular custom resource(GPU, FPGA, etc) to use.
62+
* Amount of a particular custom resource(GPU, FPGA, etc) to use.
63+
* This is a convenient API to add [[TaskResourceRequest]] for custom resources.
5664
*
5765
* @param resourceName Name of the resource.
5866
* @param amount Amount requesting as a Double to support fractional resource requests.
@@ -66,6 +74,9 @@ class TaskResourceRequests() extends Serializable {
6674
this
6775
}
6876

77+
/**
78+
* Add a certain [[TaskResourceRequest]] to the request set.
79+
*/
6980
def addRequest(treq: TaskResourceRequest): this.type = {
7081
_taskResources.put(treq.resourceName, treq)
7182
this

0 commit comments

Comments
 (0)