1414 * See the License for the specific language governing permissions and
1515 * limitations under the License.
1616 */
17- package org .apache .spark .scheduler . cluster . k8s
17+ package org .apache .spark .deploy . k8s . features
1818
1919import scala .collection .JavaConverters ._
2020
21- import io .fabric8 .kubernetes .api .model ._
21+ import io .fabric8 .kubernetes .api .model .{ ContainerBuilder , ContainerPortBuilder , EnvVar , EnvVarBuilder , EnvVarSourceBuilder , HasMetadata , PodBuilder , QuantityBuilder }
2222
23- import org .apache .spark .{ SparkConf , SparkException }
24- import org .apache .spark .deploy .k8s .{KubernetesUtils , MountSecretsBootstrap }
23+ import org .apache .spark .SparkException
24+ import org .apache .spark .deploy .k8s .{KubernetesConf , KubernetesExecutorSpecificConf , SparkPod }
2525import org .apache .spark .deploy .k8s .Config ._
2626import org .apache .spark .deploy .k8s .Constants ._
2727import org .apache .spark .internal .config .{EXECUTOR_CLASS_PATH , EXECUTOR_JAVA_OPTIONS , EXECUTOR_MEMORY , EXECUTOR_MEMORY_OVERHEAD }
28+ import org .apache .spark .rpc .RpcEndpointAddress
29+ import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
2830import org .apache .spark .util .Utils
2931
30- /**
31- * A factory class for bootstrapping and creating executor pods with the given bootstrapping
32- * components.
33- *
34- * @param sparkConf Spark configuration
35- * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto
36- * user-specified paths into the executor container
37- */
38- private [spark] class ExecutorPodFactory (
39- sparkConf : SparkConf ,
40- mountSecretsBootstrap : Option [MountSecretsBootstrap ]) {
41-
42- private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH )
43-
44- private val executorLabels = KubernetesUtils .parsePrefixedKeyValuePairs(
45- sparkConf,
46- KUBERNETES_EXECUTOR_LABEL_PREFIX )
47- require(
48- ! executorLabels.contains(SPARK_APP_ID_LABEL ),
49- s " Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark. " )
50- require(
51- ! executorLabels.contains(SPARK_EXECUTOR_ID_LABEL ),
52- s " Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for " +
53- " Spark." )
54- require(
55- ! executorLabels.contains(SPARK_ROLE_LABEL ),
56- s " Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark. " )
32+ private [spark] class BasicExecutorFeatureStep (
33+ kubernetesConf : KubernetesConf [KubernetesExecutorSpecificConf ])
34+ extends KubernetesFeatureConfigStep {
5735
58- private val executorAnnotations =
59- KubernetesUtils .parsePrefixedKeyValuePairs(
60- sparkConf,
61- KUBERNETES_EXECUTOR_ANNOTATION_PREFIX )
62- private val nodeSelector =
63- KubernetesUtils .parsePrefixedKeyValuePairs(
64- sparkConf,
65- KUBERNETES_NODE_SELECTOR_PREFIX )
66-
67- private val executorContainerImage = sparkConf
36+ // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
37+ private val executorExtraClasspath = kubernetesConf.get(EXECUTOR_CLASS_PATH )
38+ private val executorContainerImage = kubernetesConf
6839 .get(EXECUTOR_CONTAINER_IMAGE )
6940 .getOrElse(throw new SparkException (" Must specify the executor container image" ))
70- private val imagePullPolicy = sparkConf.get( CONTAINER_IMAGE_PULL_POLICY )
71- private val blockManagerPort = sparkConf
41+ private val blockManagerPort = kubernetesConf
42+ . sparkConf
7243 .getInt(" spark.blockmanager.port" , DEFAULT_BLOCKMANAGER_PORT )
7344
74- private val executorPodNamePrefix = sparkConf.get( KUBERNETES_EXECUTOR_POD_NAME_PREFIX )
45+ private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix
7546
76- private val executorMemoryMiB = sparkConf .get(EXECUTOR_MEMORY )
77- private val executorMemoryString = sparkConf .get(
47+ private val executorMemoryMiB = kubernetesConf .get(EXECUTOR_MEMORY )
48+ private val executorMemoryString = kubernetesConf .get(
7849 EXECUTOR_MEMORY .key, EXECUTOR_MEMORY .defaultValueString)
7950
80- private val memoryOverheadMiB = sparkConf
51+ private val memoryOverheadMiB = kubernetesConf
8152 .get(EXECUTOR_MEMORY_OVERHEAD )
8253 .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
8354 MEMORY_OVERHEAD_MIN_MIB ))
8455 private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
8556
86- private val executorCores = sparkConf.getDouble(" spark.executor.cores" , 1 )
87- private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES )
57+ private val executorCores = kubernetesConf.sparkConf.getDouble(
58+ " spark.executor.cores" , 1 )
59+ private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES )
60+ private val driverPod = kubernetesConf.roleSpecificConf.driverPod
61+
62+ private val driverUrl = RpcEndpointAddress (
63+ kubernetesConf.sparkConf.get(" spark.driver.host" ),
64+ kubernetesConf.sparkConf.getInt(" spark.driver.port" , DEFAULT_DRIVER_PORT ),
65+ CoarseGrainedSchedulerBackend .ENDPOINT_NAME ).toString
8866
89- /**
90- * Configure and construct an executor pod with the given parameters.
91- */
92- def createExecutorPod (
93- executorId : String ,
94- applicationId : String ,
95- driverUrl : String ,
96- executorEnvs : Seq [(String , String )],
97- driverPod : Pod ,
98- nodeToLocalTaskCount : Map [String , Int ]): Pod = {
99- val name = s " $executorPodNamePrefix-exec- $executorId"
67+ override def configurePod (pod : SparkPod ): SparkPod = {
68+ val name = s " $executorPodNamePrefix-exec- ${kubernetesConf.roleSpecificConf.executorId}"
10069
10170 // hostname must be no longer than 63 characters, so take the last 63 characters of the pod
10271 // name as the hostname. This preserves uniqueness since the end of name contains
10372 // executorId
10473 val hostname = name.substring(Math .max(0 , name.length - 63 ))
105- val resolvedExecutorLabels = Map (
106- SPARK_EXECUTOR_ID_LABEL -> executorId,
107- SPARK_APP_ID_LABEL -> applicationId,
108- SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE ) ++
109- executorLabels
11074 val executorMemoryQuantity = new QuantityBuilder (false )
11175 .withAmount(s " ${executorMemoryMiB}Mi " )
11276 .build()
@@ -122,7 +86,7 @@ private[spark] class ExecutorPodFactory(
12286 .withValue(cp)
12387 .build()
12488 }
125- val executorExtraJavaOptionsEnv = sparkConf
89+ val executorExtraJavaOptionsEnv = kubernetesConf
12690 .get(EXECUTOR_JAVA_OPTIONS )
12791 .map { opts =>
12892 val delimitedOpts = Utils .splitCommandString(opts)
@@ -136,10 +100,11 @@ private[spark] class ExecutorPodFactory(
136100 // Executor backend expects integral value for executor cores, so round it up to an int.
137101 (ENV_EXECUTOR_CORES , math.ceil(executorCores).toInt.toString),
138102 (ENV_EXECUTOR_MEMORY , executorMemoryString),
139- (ENV_APPLICATION_ID , applicationId ),
103+ (ENV_APPLICATION_ID , kubernetesConf.appId ),
140104 // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
141105 (ENV_SPARK_CONF_DIR , SPARK_CONF_DIR_INTERNAL ),
142- (ENV_EXECUTOR_ID , executorId)) ++ executorEnvs)
106+ (ENV_EXECUTOR_ID , kubernetesConf.roleSpecificConf.executorId)) ++
107+ kubernetesConf.sparkConf.getExecutorEnv)
143108 .map(env => new EnvVarBuilder ()
144109 .withName(env._1)
145110 .withValue(env._2)
@@ -161,10 +126,10 @@ private[spark] class ExecutorPodFactory(
161126 .build()
162127 }
163128
164- val executorContainer = new ContainerBuilder ()
129+ val executorContainer = new ContainerBuilder (pod.container )
165130 .withName(" executor" )
166131 .withImage(executorContainerImage)
167- .withImagePullPolicy(imagePullPolicy)
132+ .withImagePullPolicy(kubernetesConf. imagePullPolicy() )
168133 .withNewResources()
169134 .addToRequests(" memory" , executorMemoryQuantity)
170135 .addToLimits(" memory" , executorMemoryLimitQuantity)
@@ -174,49 +139,40 @@ private[spark] class ExecutorPodFactory(
174139 .withPorts(requiredPorts.asJava)
175140 .addToArgs(" executor" )
176141 .build()
177-
178- val executorPod = new PodBuilder ()
179- .withNewMetadata()
180- .withName(name)
181- .withLabels(resolvedExecutorLabels.asJava)
182- .withAnnotations(executorAnnotations.asJava)
183- .withOwnerReferences()
184- .addNewOwnerReference()
185- .withController(true )
186- .withApiVersion(driverPod.getApiVersion)
187- .withKind(driverPod.getKind)
188- .withName(driverPod.getMetadata.getName)
189- .withUid(driverPod.getMetadata.getUid)
190- .endOwnerReference()
191- .endMetadata()
192- .withNewSpec()
193- .withHostname(hostname)
194- .withRestartPolicy(" Never" )
195- .withNodeSelector(nodeSelector.asJava)
196- .endSpec()
197- .build()
198-
199142 val containerWithLimitCores = executorLimitCores.map { limitCores =>
200143 val executorCpuLimitQuantity = new QuantityBuilder (false )
201144 .withAmount(limitCores)
202145 .build()
203146 new ContainerBuilder (executorContainer)
204147 .editResources()
205- .addToLimits(" cpu" , executorCpuLimitQuantity)
206- .endResources()
148+ .addToLimits(" cpu" , executorCpuLimitQuantity)
149+ .endResources()
207150 .build()
208151 }.getOrElse(executorContainer)
209-
210- val (maybeSecretsMountedPod, maybeSecretsMountedContainer) =
211- mountSecretsBootstrap.map { bootstrap =>
212- (bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
213- }.getOrElse((executorPod, containerWithLimitCores))
214-
215-
216- new PodBuilder (maybeSecretsMountedPod)
217- .editSpec()
218- .addToContainers(maybeSecretsMountedContainer)
152+ val executorPod = new PodBuilder (pod.pod)
153+ .editOrNewMetadata()
154+ .withName(name)
155+ .withLabels(kubernetesConf.roleLabels.asJava)
156+ .withAnnotations(kubernetesConf.roleAnnotations.asJava)
157+ .withOwnerReferences()
158+ .addNewOwnerReference()
159+ .withController(true )
160+ .withApiVersion(driverPod.getApiVersion)
161+ .withKind(driverPod.getKind)
162+ .withName(driverPod.getMetadata.getName)
163+ .withUid(driverPod.getMetadata.getUid)
164+ .endOwnerReference()
165+ .endMetadata()
166+ .editOrNewSpec()
167+ .withHostname(hostname)
168+ .withRestartPolicy(" Never" )
169+ .withNodeSelector(kubernetesConf.nodeSelector().asJava)
219170 .endSpec()
220171 .build()
172+ SparkPod (executorPod, containerWithLimitCores)
221173 }
174+
175+ override def getAdditionalPodSystemProperties (): Map [String , String ] = Map .empty
176+
177+ override def getAdditionalKubernetesResources (): Seq [HasMetadata ] = Seq .empty
222178}
0 commit comments