Skip to content

Commit fd4c1fa

Browse files
author
Marcelo Vanzin
committed
Test uniqueness.
1 parent 92eb16d commit fd4c1fa

File tree

3 files changed

+34
-14
lines changed

3 files changed

+34
-14
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ import org.apache.commons.codec.binary.Hex
2727

2828
import org.apache.spark.{SparkConf, SparkException}
2929
import org.apache.spark.internal.Logging
30-
import org.apache.spark.util.Utils
30+
import org.apache.spark.util.{Clock, SystemClock, Utils}
3131

3232
private[spark] object KubernetesUtils extends Logging {
3333

34+
private val systemClock = new SystemClock()
3435
private lazy val RNG = new SecureRandom()
3536

3637
/**
@@ -218,13 +219,13 @@ private[spark] object KubernetesUtils extends Logging {
218219
* This avoids using a UUID for uniqueness (too long), and relying solely on the current time
219220
* (not unique enough).
220221
*/
221-
def uniqueID(): String = {
222+
def uniqueID(clock: Clock = systemClock): String = {
222223
val random = new Array[Byte](3)
223224
synchronized {
224225
RNG.nextBytes(random)
225226
}
226227

227-
val time = java.lang.Long.toHexString(System.currentTimeMillis() & 0xFFFFFFFFFFL)
228+
val time = java.lang.Long.toHexString(clock.getTimeMillis() & 0xFFFFFFFFFFL)
228229
Hex.encodeHexString(random) + time
229230
}
230231

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
2323
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
2424
import org.apache.spark.deploy.k8s.Constants._
2525
import org.apache.spark.internal.{config, Logging}
26+
import org.apache.spark.util.{Clock, SystemClock}
2627

27-
private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverConf)
28+
private[spark] class DriverServiceFeatureStep(
29+
kubernetesConf: KubernetesDriverConf,
30+
clock: Clock = new SystemClock())
2831
extends KubernetesFeatureConfigStep with Logging {
2932
import DriverServiceFeatureStep._
3033

@@ -39,7 +42,7 @@ private[spark] class DriverServiceFeatureStep(kubernetesConf: KubernetesDriverCo
3942
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
4043
preferredServiceName
4144
} else {
42-
val randomServiceId = KubernetesUtils.uniqueID()
45+
val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
4346
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
4447
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
4548
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.features
1818

1919
import scala.collection.JavaConverters._
2020

21+
import com.google.common.net.InternetDomainName
2122
import io.fabric8.kubernetes.api.model.Service
2223

2324
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -26,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
2627
import org.apache.spark.deploy.k8s.Constants._
2728
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
2829
import org.apache.spark.internal.config._
30+
import org.apache.spark.util.ManualClock
2931

3032
class DriverServiceFeatureStepSuite extends SparkFunSuite {
3133

@@ -90,23 +92,37 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
9092
assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === DEFAULT_BLOCKMANAGER_PORT.toString)
9193
}
9294

93-
test("Long prefixes should switch to using a generated name.") {
95+
test("Long prefixes should switch to using a generated unique name.") {
9496
val sparkConf = new SparkConf(false)
9597
.set(KUBERNETES_NAMESPACE, "my-namespace")
9698
val kconf = KubernetesTestConf.createDriverConf(
9799
sparkConf = sparkConf,
98100
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
99101
labels = DRIVER_LABELS)
100-
val configurationStep = new DriverServiceFeatureStep(kconf)
102+
val clock = new ManualClock()
101103

102-
val driverService = configurationStep
103-
.getAdditionalKubernetesResources()
104-
.head
105-
.asInstanceOf[Service]
106-
assert(!driverService.getMetadata.getName.startsWith(kconf.resourceNamePrefix))
104+
// Ensure that multiple services created at the same time generate unique names.
105+
val services = (1 to 10).map { _ =>
106+
val configurationStep = new DriverServiceFeatureStep(kconf, clock = clock)
107+
val serviceName = configurationStep
108+
.getAdditionalKubernetesResources()
109+
.head
110+
.asInstanceOf[Service]
111+
.getMetadata
112+
.getName
107113

108-
val additionalProps = configurationStep.getAdditionalPodSystemProperties()
109-
assert(!additionalProps(DRIVER_HOST_ADDRESS.key).startsWith(kconf.resourceNamePrefix))
114+
val hostAddress = configurationStep
115+
.getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key)
116+
117+
(serviceName -> hostAddress)
118+
}.toMap
119+
120+
assert(services.size === 10)
121+
services.foreach { case (name, address) =>
122+
assert(!name.startsWith(kconf.resourceNamePrefix))
123+
assert(!address.startsWith(kconf.resourceNamePrefix))
124+
assert(InternetDomainName.isValid(address))
125+
}
110126
}
111127

112128
test("Disallow bind address and driver host to be set explicitly.") {

0 commit comments

Comments
 (0)