Skip to content

Commit 5960d24

Browse files
Dmitriy DrinfeldJim Kleckner
authored andcommitted
[SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
This is a backport of #29533 from master. It includes the shockdm/pull/1 which has been squashed and the import review comment include. It has also been rebased to branch-2.4 Address review comments.
1 parent fa1b476 commit 5960d24

File tree

3 files changed

+93
-35
lines changed

3 files changed

+93
-35
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
package org.apache.spark.deploy.k8s.submit
1818

1919
import java.io.StringWriter
20+
import java.net.HttpURLConnection.HTTP_GONE
2021
import java.util.{Collections, UUID}
2122
import java.util.Properties
2223

2324
import io.fabric8.kubernetes.api.model._
24-
import io.fabric8.kubernetes.client.KubernetesClient
25+
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch}
26+
import io.fabric8.kubernetes.client.Watcher.Action
2527
import scala.collection.mutable
28+
import scala.util.control.Breaks._
2629
import scala.util.control.NonFatal
2730

2831
import org.apache.spark.SparkConf
@@ -133,29 +136,37 @@ private[spark] class Client(
133136
.endVolume()
134137
.endSpec()
135138
.build()
136-
Utils.tryWithResource(
137-
kubernetesClient
138-
.pods()
139-
.withName(resolvedDriverPod.getMetadata.getName)
140-
.watch(watcher)) { _ =>
141-
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
142-
try {
143-
val otherKubernetesResources =
144-
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
145-
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
146-
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
147-
} catch {
148-
case NonFatal(e) =>
149-
kubernetesClient.pods().delete(createdDriverPod)
150-
throw e
151-
}
152139

153-
if (waitForAppCompletion) {
154-
logInfo(s"Waiting for application $appName to finish...")
155-
watcher.awaitCompletion()
156-
logInfo(s"Application $appName finished.")
157-
} else {
158-
logInfo(s"Deployed Spark application $appName into Kubernetes.")
140+
val driverPodName = resolvedDriverPod.getMetadata.getName
141+
var watch: Watch = null
142+
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
143+
try {
144+
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
145+
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
146+
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
147+
} catch {
148+
case NonFatal(e) =>
149+
kubernetesClient.pods().delete(createdDriverPod)
150+
throw e
151+
}
152+
val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":")
153+
breakable {
154+
while (true) {
155+
val podWithName = kubernetesClient
156+
.pods()
157+
.withName(driverPodName)
158+
159+
watcher.reset()
160+
161+
watch = podWithName.watch(watcher)
162+
163+
watcher.eventReceived(Action.MODIFIED, podWithName.get())
164+
165+
if(watcher.watchOrStop(sId)) {
166+
logInfo(s"Stop watching as the pod has completed.")
167+
watch.close()
168+
break
169+
}
159170
}
160171
}
161172
}
@@ -230,7 +241,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
230241
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
231242
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
232243

233-
val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
244+
val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId,
245+
loggingInterval,
246+
waitForAppCompletion)
234247

235248
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
236249
master,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.submit
1818

19+
import java.net.HttpURLConnection.HTTP_GONE
1920
import java.util.concurrent.{CountDownLatch, TimeUnit}
2021

2122
import scala.collection.JavaConverters._
@@ -28,8 +29,10 @@ import org.apache.spark.SparkException
2829
import org.apache.spark.internal.Logging
2930
import org.apache.spark.util.ThreadUtils
3031

32+
3133
private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
32-
def awaitCompletion(): Unit
34+
def watchOrStop(submissionId: String): Boolean
35+
def reset(): Unit
3336
}
3437

3538
/**
@@ -42,13 +45,20 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
4245
*/
4346
private[k8s] class LoggingPodStatusWatcherImpl(
4447
appId: String,
45-
maybeLoggingInterval: Option[Long])
48+
maybeLoggingInterval: Option[Long],
49+
waitForCompletion: Boolean)
4650
extends LoggingPodStatusWatcher with Logging {
4751

52+
private var podCompleted = false
53+
54+
private var resourceTooOldReceived: Boolean = false
55+
4856
private val podCompletedFuture = new CountDownLatch(1)
57+
4958
// start timer for periodic logging
5059
private val scheduler =
5160
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
61+
5262
private val logRunnable: Runnable = new Runnable {
5363
override def run() = logShortStatus()
5464
}
@@ -77,9 +87,18 @@ private[k8s] class LoggingPodStatusWatcherImpl(
7787
}
7888
}
7989

90+
override def reset(): Unit = {
91+
resourceTooOldReceived = false
92+
}
93+
8094
override def onClose(e: KubernetesClientException): Unit = {
8195
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
82-
closeWatch()
96+
if (e != null && e.getCode==HTTP_GONE) {
97+
resourceTooOldReceived = true
98+
logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e")
99+
} else {
100+
closeWatch()
101+
}
83102
}
84103

85104
private def logShortStatus() = {
@@ -97,6 +116,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
97116
private def closeWatch(): Unit = {
98117
podCompletedFuture.countDown()
99118
scheduler.shutdown()
119+
podCompleted = true
100120
}
101121

102122
private def formatPodState(pod: Pod): String = {
@@ -134,13 +154,6 @@ private[k8s] class LoggingPodStatusWatcherImpl(
134154
}.mkString("")
135155
}
136156

137-
override def awaitCompletion(): Unit = {
138-
podCompletedFuture.await()
139-
logInfo(pod.map { p =>
140-
s"Container final statuses:\n\n${containersDescription(p)}"
141-
}.getOrElse("No containers were found in the driver pod."))
142-
}
143-
144157
private def containersDescription(p: Pod): String = {
145158
p.getStatus.getContainerStatuses.asScala.map { status =>
146159
Seq(
@@ -177,4 +190,34 @@ private[k8s] class LoggingPodStatusWatcherImpl(
177190
private def formatTime(time: String): String = {
178191
if (time != null || time != "") time else "N/A"
179192
}
193+
194+
override def watchOrStop(sId: String): Boolean = if (waitForCompletion) {
195+
logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
196+
val interval = maybeLoggingInterval
197+
198+
synchronized {
199+
while (!podCompleted && !resourceTooOldReceived) {
200+
wait(interval.get)
201+
logDebug(s"Application status for $appId (phase: $phase)")
202+
}
203+
}
204+
205+
if(podCompleted) {
206+
logInfo(
207+
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
208+
.getOrElse("No containers were found in the driver pod."))
209+
logInfo(s"Application ${appId} with submission ID $sId finished")
210+
} else {
211+
logInfo(s"Got HTTP Gone code, resource version changed in k8s api. Creating a new watcher.")
212+
}
213+
214+
logInfo(s"Watcher has stopped, pod completed status: ${podCompleted}")
215+
216+
podCompleted
217+
} else {
218+
logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes")
219+
logInfo(s"It seems we end up here, because we never want to wait for completion...")
220+
// Always act like the application has completed since we don't want to wait for app completion
221+
true
222+
}
180223
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
151151
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
152152
when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
153153
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
154+
when(loggingPodStatusWatcher.watchOrStop(kubernetesConf.namespace() + ":" + POD_NAME))
155+
.thenReturn(true)
154156
doReturn(resourceList)
155157
.when(kubernetesClient)
156158
.resourceList(createdResourcesArgumentCaptor.capture())
@@ -205,6 +207,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
205207
loggingPodStatusWatcher,
206208
KUBERNETES_RESOURCE_PREFIX)
207209
submissionClient.run()
208-
verify(loggingPodStatusWatcher).awaitCompletion()
210+
verify(loggingPodStatusWatcher).watchOrStop("default:driver")
209211
}
210212
}

0 commit comments

Comments
 (0)