Skip to content

Commit 67e9ca1

Browse files
committed
Migrate executor pod construction to use the new architecture.
1 parent f0ea6d9 commit 67e9ca1

File tree

9 files changed

+386
-373
lines changed

9 files changed

+386
-373
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala

Lines changed: 0 additions & 72 deletions
This file was deleted.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 60 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -14,99 +14,63 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.scheduler.cluster.k8s
17+
package org.apache.spark.deploy.k8s.features
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model._
21+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
2222

23-
import org.apache.spark.{SparkConf, SparkException}
24-
import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
23+
import org.apache.spark.SparkException
24+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
2525
import org.apache.spark.deploy.k8s.Config._
2626
import org.apache.spark.deploy.k8s.Constants._
2727
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
28+
import org.apache.spark.rpc.RpcEndpointAddress
29+
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
2830
import org.apache.spark.util.Utils
2931

30-
/**
31-
* A factory class for bootstrapping and creating executor pods with the given bootstrapping
32-
* components.
33-
*
34-
* @param sparkConf Spark configuration
35-
* @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto
36-
* user-specified paths into the executor container
37-
*/
38-
private[spark] class ExecutorPodFactory(
39-
sparkConf: SparkConf,
40-
mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
41-
42-
private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
43-
44-
private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
45-
sparkConf,
46-
KUBERNETES_EXECUTOR_LABEL_PREFIX)
47-
require(
48-
!executorLabels.contains(SPARK_APP_ID_LABEL),
49-
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
50-
require(
51-
!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
52-
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
53-
" Spark.")
54-
require(
55-
!executorLabels.contains(SPARK_ROLE_LABEL),
56-
s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
32+
private[spark] class BasicExecutorFeatureStep(
33+
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
34+
extends KubernetesFeatureConfigStep {
5735

58-
private val executorAnnotations =
59-
KubernetesUtils.parsePrefixedKeyValuePairs(
60-
sparkConf,
61-
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
62-
private val nodeSelector =
63-
KubernetesUtils.parsePrefixedKeyValuePairs(
64-
sparkConf,
65-
KUBERNETES_NODE_SELECTOR_PREFIX)
66-
67-
private val executorContainerImage = sparkConf
36+
// Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
37+
private val executorExtraClasspath = kubernetesConf.get(EXECUTOR_CLASS_PATH)
38+
private val executorContainerImage = kubernetesConf
6839
.get(EXECUTOR_CONTAINER_IMAGE)
6940
.getOrElse(throw new SparkException("Must specify the executor container image"))
70-
private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
71-
private val blockManagerPort = sparkConf
41+
private val blockManagerPort = kubernetesConf
42+
.sparkConf
7243
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
7344

74-
private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
45+
private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
7546

76-
private val executorMemoryMiB = sparkConf.get(EXECUTOR_MEMORY)
77-
private val executorMemoryString = sparkConf.get(
47+
private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
48+
private val executorMemoryString = kubernetesConf.get(
7849
EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
7950

80-
private val memoryOverheadMiB = sparkConf
51+
private val memoryOverheadMiB = kubernetesConf
8152
.get(EXECUTOR_MEMORY_OVERHEAD)
8253
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
8354
MEMORY_OVERHEAD_MIN_MIB))
8455
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
8556

86-
private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
87-
private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
57+
private val executorCores = kubernetesConf.sparkConf.getDouble(
58+
"spark.executor.cores", 1)
59+
private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
60+
private val driverPod = kubernetesConf.roleSpecificConf.driverPod
61+
62+
private val driverUrl = RpcEndpointAddress(
63+
kubernetesConf.sparkConf.get("spark.driver.host"),
64+
kubernetesConf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
65+
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
8866

89-
/**
90-
* Configure and construct an executor pod with the given parameters.
91-
*/
92-
def createExecutorPod(
93-
executorId: String,
94-
applicationId: String,
95-
driverUrl: String,
96-
executorEnvs: Seq[(String, String)],
97-
driverPod: Pod,
98-
nodeToLocalTaskCount: Map[String, Int]): Pod = {
99-
val name = s"$executorPodNamePrefix-exec-$executorId"
67+
override def configurePod(pod: SparkPod): SparkPod = {
68+
val name = s"$executorPodNamePrefix-exec-${kubernetesConf.roleSpecificConf.executorId}"
10069

10170
// hostname must be no longer than 63 characters, so take the last 63 characters of the pod
10271
// name as the hostname. This preserves uniqueness since the end of name contains
10372
// executorId
10473
val hostname = name.substring(Math.max(0, name.length - 63))
105-
val resolvedExecutorLabels = Map(
106-
SPARK_EXECUTOR_ID_LABEL -> executorId,
107-
SPARK_APP_ID_LABEL -> applicationId,
108-
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
109-
executorLabels
11074
val executorMemoryQuantity = new QuantityBuilder(false)
11175
.withAmount(s"${executorMemoryMiB}Mi")
11276
.build()
@@ -122,7 +86,7 @@ private[spark] class ExecutorPodFactory(
12286
.withValue(cp)
12387
.build()
12488
}
125-
val executorExtraJavaOptionsEnv = sparkConf
89+
val executorExtraJavaOptionsEnv = kubernetesConf
12690
.get(EXECUTOR_JAVA_OPTIONS)
12791
.map { opts =>
12892
val delimitedOpts = Utils.splitCommandString(opts)
@@ -136,10 +100,11 @@ private[spark] class ExecutorPodFactory(
136100
// Executor backend expects integral value for executor cores, so round it up to an int.
137101
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
138102
(ENV_EXECUTOR_MEMORY, executorMemoryString),
139-
(ENV_APPLICATION_ID, applicationId),
103+
(ENV_APPLICATION_ID, kubernetesConf.appId),
140104
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
141105
(ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
142-
(ENV_EXECUTOR_ID, executorId)) ++ executorEnvs)
106+
(ENV_EXECUTOR_ID, kubernetesConf.roleSpecificConf.executorId)) ++
107+
kubernetesConf.sparkConf.getExecutorEnv)
143108
.map(env => new EnvVarBuilder()
144109
.withName(env._1)
145110
.withValue(env._2)
@@ -161,10 +126,10 @@ private[spark] class ExecutorPodFactory(
161126
.build()
162127
}
163128

164-
val executorContainer = new ContainerBuilder()
129+
val executorContainer = new ContainerBuilder(pod.container)
165130
.withName("executor")
166131
.withImage(executorContainerImage)
167-
.withImagePullPolicy(imagePullPolicy)
132+
.withImagePullPolicy(kubernetesConf.imagePullPolicy())
168133
.withNewResources()
169134
.addToRequests("memory", executorMemoryQuantity)
170135
.addToLimits("memory", executorMemoryLimitQuantity)
@@ -174,49 +139,40 @@ private[spark] class ExecutorPodFactory(
174139
.withPorts(requiredPorts.asJava)
175140
.addToArgs("executor")
176141
.build()
177-
178-
val executorPod = new PodBuilder()
179-
.withNewMetadata()
180-
.withName(name)
181-
.withLabels(resolvedExecutorLabels.asJava)
182-
.withAnnotations(executorAnnotations.asJava)
183-
.withOwnerReferences()
184-
.addNewOwnerReference()
185-
.withController(true)
186-
.withApiVersion(driverPod.getApiVersion)
187-
.withKind(driverPod.getKind)
188-
.withName(driverPod.getMetadata.getName)
189-
.withUid(driverPod.getMetadata.getUid)
190-
.endOwnerReference()
191-
.endMetadata()
192-
.withNewSpec()
193-
.withHostname(hostname)
194-
.withRestartPolicy("Never")
195-
.withNodeSelector(nodeSelector.asJava)
196-
.endSpec()
197-
.build()
198-
199142
val containerWithLimitCores = executorLimitCores.map { limitCores =>
200143
val executorCpuLimitQuantity = new QuantityBuilder(false)
201144
.withAmount(limitCores)
202145
.build()
203146
new ContainerBuilder(executorContainer)
204147
.editResources()
205-
.addToLimits("cpu", executorCpuLimitQuantity)
206-
.endResources()
148+
.addToLimits("cpu", executorCpuLimitQuantity)
149+
.endResources()
207150
.build()
208151
}.getOrElse(executorContainer)
209-
210-
val (maybeSecretsMountedPod, maybeSecretsMountedContainer) =
211-
mountSecretsBootstrap.map { bootstrap =>
212-
(bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
213-
}.getOrElse((executorPod, containerWithLimitCores))
214-
215-
216-
new PodBuilder(maybeSecretsMountedPod)
217-
.editSpec()
218-
.addToContainers(maybeSecretsMountedContainer)
152+
val executorPod = new PodBuilder(pod.pod)
153+
.editOrNewMetadata()
154+
.withName(name)
155+
.withLabels(kubernetesConf.roleLabels.asJava)
156+
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
157+
.withOwnerReferences()
158+
.addNewOwnerReference()
159+
.withController(true)
160+
.withApiVersion(driverPod.getApiVersion)
161+
.withKind(driverPod.getKind)
162+
.withName(driverPod.getMetadata.getName)
163+
.withUid(driverPod.getMetadata.getUid)
164+
.endOwnerReference()
165+
.endMetadata()
166+
.editOrNewSpec()
167+
.withHostname(hostname)
168+
.withRestartPolicy("Never")
169+
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
219170
.endSpec()
220171
.build()
172+
SparkPod(executorPod, containerWithLimitCores)
221173
}
174+
175+
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
176+
177+
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
222178
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121
import io.fabric8.kubernetes.client.Config
2222

2323
import org.apache.spark.{SparkContext, SparkException}
24-
import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory}
24+
import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory}
2525
import org.apache.spark.deploy.k8s.Config._
2626
import org.apache.spark.deploy.k8s.Constants._
2727
import org.apache.spark.internal.Logging
@@ -48,12 +48,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
4848
scheduler: TaskScheduler): SchedulerBackend = {
4949
val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
5050
sc.conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
51-
val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
52-
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
53-
} else {
54-
None
55-
}
56-
5751
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
5852
KUBERNETES_MASTER_INTERNAL_URL,
5953
Some(sc.conf.get(KUBERNETES_NAMESPACE)),
@@ -62,16 +56,14 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
6256
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
6357
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
6458

65-
val executorPodFactory = new ExecutorPodFactory(sc.conf, mountSecretBootstrap)
66-
6759
val allocatorExecutor = ThreadUtils
6860
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
6961
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
7062
"kubernetes-executor-requests")
7163
new KubernetesClusterSchedulerBackend(
7264
scheduler.asInstanceOf[TaskSchedulerImpl],
7365
sc.env.rpcEnv,
74-
executorPodFactory,
66+
new KubernetesExecutorBuilder,
7567
kubernetesClient,
7668
allocatorExecutor,
7769
requestExecutorsService)

0 commit comments

Comments
 (0)