Skip to content

Commit 6cd3ced

Browse files
author
Stavros Kontopoulos
committed
add pv tests
1 parent 7857c6d commit 6cd3ced

File tree

7 files changed

+261
-29
lines changed

7 files changed

+261
-29
lines changed

examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import java.io.File
2222

2323
import scala.io.Source._
2424

25+
import org.apache.hadoop.fs.FileSystem
26+
import org.apache.hadoop.fs.Path
27+
2528
import org.apache.spark.sql.SparkSession
2629

2730
/**
@@ -107,6 +110,13 @@ object DFSReadWriteTest {
107110

108111
println("Writing local file to DFS")
109112
val dfsFilename = s"$dfsDirPath/dfs_read_write_test"
113+
114+
// delete file if exists
115+
val fs = FileSystem.get(spark.sessionState.newHadoopConf())
116+
if (fs.exists(new Path(dfsFilename))) {
117+
fs.delete(new Path(dfsFilename), true)
118+
}
119+
110120
val fileRDD = spark.sparkContext.parallelize(fileContents)
111121
fileRDD.saveAsTextFile(dfsFilename)
112122

@@ -123,15 +133,13 @@ object DFSReadWriteTest {
123133
.sum
124134

125135
spark.stop()
126-
127136
if (localWordCount == dfsWordCount) {
128137
println(s"Success! Local Word Count $localWordCount and " +
129138
s"DFS Word Count $dfsWordCount agree.")
130139
} else {
131140
println(s"Failure! Local Word Count $localWordCount " +
132141
s"and DFS Word Count $dfsWordCount disagree.")
133142
}
134-
135143
}
136144
}
137145
// scalastyle:on println

resource-managers/kubernetes/integration-tests/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<download-maven-plugin.version>1.3.0</download-maven-plugin.version>
3030
<exec-maven-plugin.version>1.4.0</exec-maven-plugin.version>
3131
<extraScalaTestArgs></extraScalaTestArgs>
32-
<kubernetes-client.version>4.1.0</kubernetes-client.version>
32+
<kubernetes-client.version>4.1.2</kubernetes-client.version>
3333
<scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>
3434
<scalatest-maven-plugin.version>1.0</scalatest-maven-plugin.version>
3535
<sbt.project.name>kubernetes-integration-tests</sbt.project.name>

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.internal.config._
4040

4141
class KubernetesSuite extends SparkFunSuite
4242
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
43-
with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite
43+
with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite
4444
with Logging with Eventually with Matchers {
4545

4646
import KubernetesSuite._
@@ -170,6 +170,29 @@ class KubernetesSuite extends SparkFunSuite
170170
isJVM)
171171
}
172172

173+
protected def runDFSReadWriteAndVerifyCompletion(
174+
wordCount: Int,
175+
appResource: String = containerLocalSparkDistroExamplesJar,
176+
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
177+
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
178+
appArgs: Array[String] = Array.empty[String],
179+
appLocator: String = appLocator,
180+
isJVM: Boolean = true,
181+
interval: Option[PatienceConfiguration.Interval] = None): Unit = {
182+
runSparkApplicationAndVerifyCompletion(
183+
appResource,
184+
SPARK_DFS_READ_WRITE_TEST,
185+
Seq(s"Success! Local Word Count $wordCount and " +
186+
s"DFS Word Count $wordCount agree."),
187+
appArgs,
188+
driverPodChecker,
189+
executorPodChecker,
190+
appLocator,
191+
isJVM,
192+
None,
193+
interval)
194+
}
195+
173196
protected def runSparkRemoteCheckAndVerifyCompletion(
174197
appResource: String = containerLocalSparkDistroExamplesJar,
175198
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
@@ -233,7 +256,8 @@ class KubernetesSuite extends SparkFunSuite
233256
executorPodChecker: Pod => Unit,
234257
appLocator: String,
235258
isJVM: Boolean,
236-
pyFiles: Option[String] = None): Unit = {
259+
pyFiles: Option[String] = None,
260+
interval: Option[PatienceConfiguration.Interval] = None): Unit = {
237261
val appArguments = SparkAppArguments(
238262
mainAppResource = appResource,
239263
mainClass = mainClass,
@@ -273,10 +297,12 @@ class KubernetesSuite extends SparkFunSuite
273297
}
274298
}
275299
})
276-
Eventually.eventually(TIMEOUT, INTERVAL) { execPods.values.nonEmpty should be (true) }
300+
301+
val patienceInterval = interval.getOrElse(INTERVAL)
302+
Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) }
277303
execWatcher.close()
278304
execPods.values.foreach(executorPodChecker(_))
279-
Eventually.eventually(TIMEOUT, INTERVAL) {
305+
Eventually.eventually(TIMEOUT, patienceInterval) {
280306
expectedLogOnCompletion.foreach { e =>
281307
assert(kubernetesTestComponents.kubernetesClient
282308
.pods()
@@ -375,6 +401,7 @@ class KubernetesSuite extends SparkFunSuite
375401
private[spark] object KubernetesSuite {
376402
val k8sTestTag = Tag("k8s")
377403
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
404+
val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest"
378405
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
379406
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
380407
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.UUID
2121

2222
import scala.collection.JavaConverters._
2323
import scala.collection.mutable
24+
import scala.collection.mutable.ArrayBuffer
2425

2526
import io.fabric8.kubernetes.client.DefaultKubernetesClient
2627
import org.scalatest.concurrent.Eventually
@@ -124,7 +125,7 @@ private[spark] object SparkAppLauncher extends Logging {
124125
appConf.toStringArray :+ appArguments.mainAppResource
125126

126127
if (appArguments.appArgs.nonEmpty) {
127-
commandLine += appArguments.appArgs.mkString(" ")
128+
commandLine ++= appArguments.appArgs.to[ArrayBuffer]
128129
}
129130
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
130131
ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.integrationtest
18+
19+
import java.io.{File, PrintWriter}
20+
21+
import scala.collection.JavaConverters._
22+
23+
import io.fabric8.kubernetes.api.model._
24+
import io.fabric8.kubernetes.api.model.storage.StorageClassBuilder
25+
import org.scalatest.Tag
26+
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
27+
import org.scalatest.time.{Milliseconds, Span}
28+
29+
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
30+
31+
private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
32+
import PVTestsSuite._
33+
34+
private def setupLocalStorage(): Unit = {
35+
val scBuilder = new StorageClassBuilder()
36+
.withKind("StorageClass")
37+
.withApiVersion("storage.k8s.io/v1")
38+
.withNewMetadata()
39+
.withName(STORAGE_NAME)
40+
.endMetadata()
41+
.withProvisioner("kubernetes.io/no-provisioner")
42+
.withVolumeBindingMode("WaitForFirstConsumer")
43+
44+
val pvBuilder = new PersistentVolumeBuilder()
45+
.withKind("PersistentVolume")
46+
.withApiVersion("v1")
47+
.withNewMetadata()
48+
.withName("test-local-pv")
49+
.endMetadata()
50+
.withNewSpec()
51+
.withCapacity(Map("storage" -> new QuantityBuilder().withAmount("1Gi").build()).asJava)
52+
.withAccessModes("ReadWriteOnce")
53+
.withPersistentVolumeReclaimPolicy("Retain")
54+
.withStorageClassName("test-local-storage")
55+
.withLocal(new LocalVolumeSourceBuilder().withPath(VM_PATH).build())
56+
.withNewNodeAffinity()
57+
.withNewRequired()
58+
.withNodeSelectorTerms(new NodeSelectorTermBuilder()
59+
.withMatchExpressions(new NodeSelectorRequirementBuilder()
60+
.withKey("kubernetes.io/hostname")
61+
.withOperator("In")
62+
.withValues("minikube").build()).build())
63+
.endRequired()
64+
.endNodeAffinity()
65+
.endSpec()
66+
67+
val pvcBuilder = new PersistentVolumeClaimBuilder()
68+
.withKind("PersistentVolumeClaim")
69+
.withApiVersion("v1")
70+
.withNewMetadata()
71+
.withName(PVC_NAME)
72+
.endMetadata()
73+
.withNewSpec()
74+
.withAccessModes("ReadWriteOnce")
75+
.withStorageClassName("test-local-storage")
76+
.withResources(new ResourceRequirementsBuilder()
77+
.withRequests(Map("storage" -> new QuantityBuilder()
78+
.withAmount("1Gi").build()).asJava).build())
79+
.endSpec()
80+
81+
kubernetesTestComponents
82+
.kubernetesClient
83+
.storage()
84+
.storageClasses()
85+
.create(scBuilder.build())
86+
87+
kubernetesTestComponents
88+
.kubernetesClient
89+
.persistentVolumes()
90+
.create(pvBuilder.build())
91+
92+
kubernetesTestComponents
93+
.kubernetesClient
94+
.persistentVolumeClaims()
95+
.create(pvcBuilder.build())
96+
}
97+
98+
private def deleteLocalStorage(): Unit = {
99+
kubernetesTestComponents
100+
.kubernetesClient
101+
.persistentVolumeClaims()
102+
.withName(PVC_NAME)
103+
.delete()
104+
105+
kubernetesTestComponents
106+
.kubernetesClient
107+
.persistentVolumes()
108+
.withName(PV_NAME)
109+
.delete()
110+
111+
kubernetesTestComponents
112+
.kubernetesClient
113+
.storage()
114+
.storageClasses()
115+
.withName(STORAGE_NAME)
116+
.delete()
117+
}
118+
119+
private def checkPVs(pod: Pod, file: String) = {
120+
Eventually.eventually(TIMEOUT, INTERVAL) {
121+
implicit val podName: String = pod.getMetadata.getName
122+
implicit val components: KubernetesTestComponents = kubernetesTestComponents
123+
val contents = Utils.executeCommand("cat", s"$CONTAINER_MOUNT_PATH/$file")
124+
assert(contents.toString.trim.equals(FILE_CONTENTS))
125+
}
126+
}
127+
128+
private def createTempFile(): String = {
129+
val filename = try {
130+
val f = File.createTempFile("tmp", ".txt", new File(HOST_PATH))
131+
f.deleteOnExit()
132+
new PrintWriter(f) {
133+
try {
134+
write(FILE_CONTENTS)
135+
} finally {
136+
close()
137+
}
138+
}
139+
f.getName
140+
} catch {
141+
case e: Exception => e.printStackTrace(); throw e;
142+
}
143+
filename
144+
}
145+
146+
test("Test PVs with local storage", k8sTestTag, MinikubeTag) {
147+
sparkAppConf
148+
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path",
149+
CONTAINER_MOUNT_PATH)
150+
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName",
151+
PVC_NAME)
152+
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path",
153+
CONTAINER_MOUNT_PATH)
154+
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName",
155+
PVC_NAME)
156+
val file = createTempFile()
157+
try {
158+
setupLocalStorage()
159+
runDFSReadWriteAndVerifyCompletion(
160+
FILE_CONTENTS.split(" ").length,
161+
driverPodChecker = (driverPod: Pod) => {
162+
doBasicDriverPodCheck(driverPod)
163+
checkPVs(driverPod, file)
164+
},
165+
executorPodChecker = (executorPod: Pod) => {
166+
doBasicExecutorPodCheck(executorPod)
167+
checkPVs(executorPod, file)
168+
},
169+
appArgs = Array(s"$CONTAINER_MOUNT_PATH/$file", s"$CONTAINER_MOUNT_PATH"),
170+
interval = Some(PV_TESTS_INTERVAL)
171+
)
172+
} finally {
173+
// make sure this always run
174+
deleteLocalStorage()
175+
}
176+
}
177+
}
178+
179+
private[spark] object PVTestsSuite {
180+
val MinikubeTag = Tag("minikube")
181+
val STORAGE_NAME = "test-local-storage"
182+
val PV_NAME = "test-local-pv"
183+
val PVC_NAME = "test-local-pvc"
184+
val CONTAINER_MOUNT_PATH = "/opt/spark/pv-tests"
185+
val HOST_PATH = sys.env.getOrElse("PVC_TESTS_HOST_PATH", "/tmp")
186+
val VM_PATH = sys.env.getOrElse("PVC_TESTS_VM_PATH", "/tmp")
187+
val FILE_CONTENTS = "test PVs"
188+
val PV_TESTS_INTERVAL = PatienceConfiguration.Interval(Span(10, Milliseconds))
189+
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -83,33 +83,18 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
8383
private def checkSecrets(pod: Pod): Unit = {
8484
Eventually.eventually(TIMEOUT, INTERVAL) {
8585
implicit val podName: String = pod.getMetadata.getName
86-
val env = executeCommand("env")
86+
implicit val components: KubernetesTestComponents = kubernetesTestComponents
87+
val env = Utils.executeCommand("env")
8788
assert(env.toString.contains(ENV_SECRET_VALUE_1))
8889
assert(env.toString.contains(ENV_SECRET_VALUE_2))
89-
val fileUsernameContents = executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
90-
val filePasswordContents = executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
90+
val fileUsernameContents = Utils
91+
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
92+
val filePasswordContents = Utils
93+
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
9194
assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1))
9295
assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2))
9396
}
9497
}
95-
96-
private def executeCommand(cmd: String*)(implicit podName: String): String = {
97-
val out = new ByteArrayOutputStream()
98-
val watch = kubernetesTestComponents
99-
.kubernetesClient
100-
.pods()
101-
.withName(podName)
102-
.readingInput(System.in)
103-
.writingOutput(out)
104-
.writingError(System.err)
105-
.withTTY()
106-
.exec(cmd.toArray: _*)
107-
// wait to get some result back
108-
Thread.sleep(1000)
109-
watch.close()
110-
out.flush()
111-
out.toString()
112-
}
11398
}
11499

115100
private[spark] object SecretsTestsSuite {

0 commit comments

Comments
 (0)