Skip to content

Commit 4b96add

Browse files
juliuszsompolskihvanhovell
authored andcommitted
[SPARK-44872][CONNECT] Server testing infra and ReattachableExecuteSuite
### What changes were proposed in this pull request? Add `SparkConnectServerTest` with infra to test real server with real client in the same process, but communicating over RPC. Add `ReattachableExecuteSuite` with some tests for reattachable execute. Two bugs were found by the tests: * Fix bug in `SparkConnectExecutionManager.createExecuteHolder` when attempting to resubmit an operation that was deemed abandoned. This bug is benign in reattachable execute, because reattachable execute would first send a ReattachExecute, which would be handled correctly in SparkConnectReattachExecuteHandler. For non-reattachable execute (disabled or old client), this is also a very unlikely scenario, because the retrying mechanism should be able to resubmit before the query is declared abandoned, and hence get an INVALID_HANDLE.OPERATION_ALREADY_EXISTS. This bug can manifest only if a non-reattachable execution is retried with so much delay that the operation was declared abandoned. * In `ExecuteGrpcResponseSender` there was an assertion that assumed that if `sendResponse` did not send, it was because deadline was reached. But it can also be because of interrupt. This would have resulted in interrupt returning an assertion error instead of CURSOR_DISCONNECTED in testing. Outside of testing assertions are not enabled, so this was not a problem outside of testing. ### Why are the changes needed? Testing of reattachable execute. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tests added. Closes #42560 from juliuszsompolski/sc-reattachable-tests. Authored-by: Juliusz Sompolski <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
1 parent d8298bf commit 4b96add

File tree

12 files changed

+735
-28
lines changed

12 files changed

+735
-28
lines changed

connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,20 @@ private[sql] trait CloseableIterator[E] extends Iterator[E] with AutoCloseable {
2727
}
2828
}
2929

30+
private[sql] abstract class WrappedCloseableIterator[E] extends CloseableIterator[E] {
31+
32+
def innerIterator: Iterator[E]
33+
34+
override def next(): E = innerIterator.next()
35+
36+
override def hasNext(): Boolean = innerIterator.hasNext
37+
38+
override def close(): Unit = innerIterator match {
39+
case it: CloseableIterator[E] => it.close()
40+
case _ => // nothing
41+
}
42+
}
43+
3044
private[sql] object CloseableIterator {
3145

3246
/**
@@ -35,12 +49,8 @@ private[sql] object CloseableIterator {
3549
def apply[T](iterator: Iterator[T]): CloseableIterator[T] = iterator match {
3650
case closeable: CloseableIterator[T] => closeable
3751
case _ =>
38-
new CloseableIterator[T] {
39-
override def next(): T = iterator.next()
40-
41-
override def hasNext(): Boolean = iterator.hasNext
42-
43-
override def close() = { /* empty */ }
52+
new WrappedCloseableIterator[T] {
53+
override def innerIterator = iterator
4454
}
4555
}
4656
}

connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import io.grpc.ManagedChannel
2222

2323
import org.apache.spark.connect.proto._
2424

25-
private[client] class CustomSparkConnectBlockingStub(
25+
private[connect] class CustomSparkConnectBlockingStub(
2626
channel: ManagedChannel,
2727
retryPolicy: GrpcRetryHandler.RetryPolicy) {
2828

connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.connect.client
1818

1919
import java.util.UUID
2020

21+
import scala.collection.JavaConverters._
2122
import scala.util.control.NonFatal
2223

2324
import io.grpc.{ManagedChannel, StatusRuntimeException}
@@ -50,7 +51,7 @@ class ExecutePlanResponseReattachableIterator(
5051
request: proto.ExecutePlanRequest,
5152
channel: ManagedChannel,
5253
retryPolicy: GrpcRetryHandler.RetryPolicy)
53-
extends CloseableIterator[proto.ExecutePlanResponse]
54+
extends WrappedCloseableIterator[proto.ExecutePlanResponse]
5455
with Logging {
5556

5657
val operationId = if (request.hasOperationId) {
@@ -86,14 +87,25 @@ class ExecutePlanResponseReattachableIterator(
8687
// True after ResultComplete message was seen in the stream.
8788
// Server will always send this message at the end of the stream, if the underlying iterator
8889
// finishes without producing one, another iterator needs to be reattached.
89-
private var resultComplete: Boolean = false
90+
// Visible for testing.
91+
private[connect] var resultComplete: Boolean = false
9092

9193
// Initial iterator comes from ExecutePlan request.
9294
// Note: This is not retried, because no error would ever be thrown here, and GRPC will only
9395
// throw error on first iter.hasNext() or iter.next()
94-
private var iter: Option[java.util.Iterator[proto.ExecutePlanResponse]] =
96+
// Visible for testing.
97+
private[connect] var iter: Option[java.util.Iterator[proto.ExecutePlanResponse]] =
9598
Some(rawBlockingStub.executePlan(initialRequest))
9699

100+
override def innerIterator: Iterator[proto.ExecutePlanResponse] = iter match {
101+
case Some(it) => it.asScala
102+
case None =>
103+
// The iterator is only unset for short moments while retry exception is thrown.
104+
// It should only happen in the middle of internal processing. Since this iterator is not
105+
// thread safe, no-one should be accessing it at this moment.
106+
throw new IllegalStateException("innerIterator unset")
107+
}
108+
97109
override def next(): proto.ExecutePlanResponse = synchronized {
98110
// hasNext will trigger reattach in case the stream completed without resultComplete
99111
if (!hasNext()) {

connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ private[client] object GrpcExceptionConverter extends JsonUtils {
4343
}
4444

4545
def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
46-
new CloseableIterator[T] {
46+
new WrappedCloseableIterator[T] {
47+
48+
override def innerIterator: Iterator[T] = iter
49+
4750
override def hasNext: Boolean = {
4851
convert {
4952
iter.hasNext

connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,13 @@ private[sql] class GrpcRetryHandler(
4848
* The type of the response.
4949
*/
5050
class RetryIterator[T, U](request: T, call: T => CloseableIterator[U])
51-
extends CloseableIterator[U] {
51+
extends WrappedCloseableIterator[U] {
5252

5353
private var opened = false // we only retry if it fails on first call when using the iterator
5454
private var iter = call(request)
5555

56+
override def innerIterator: Iterator[U] = iter
57+
5658
private def retryIter[V](f: Iterator[U] => V) = {
5759
if (!opened) {
5860
opened = true

connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
4747

4848
private var interrupted = false
4949

50+
// Time at which this sender should finish if the response stream is not finished by then.
51+
private var deadlineTimeMillis = Long.MaxValue
52+
5053
// Signal to wake up when grpcCallObserver.isReady()
5154
private val grpcCallObserverReadySignal = new Object
5255

@@ -65,6 +68,12 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
6568
executionObserver.notifyAll()
6669
}
6770

71+
// For testing
72+
private[connect] def setDeadline(deadlineMs: Long) = executionObserver.synchronized {
73+
deadlineTimeMillis = deadlineMs
74+
executionObserver.notifyAll()
75+
}
76+
6877
def run(lastConsumedStreamIndex: Long): Unit = {
6978
if (executeHolder.reattachable) {
7079
// In reattachable execution we use setOnReadyHandler and grpcCallObserver.isReady to control
@@ -150,7 +159,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
150159
var finished = false
151160

152161
// Time at which this sender should finish if the response stream is not finished by then.
153-
val deadlineTimeMillis = if (!executeHolder.reattachable) {
162+
deadlineTimeMillis = if (!executeHolder.reattachable) {
154163
Long.MaxValue
155164
} else {
156165
val confSize =
@@ -232,8 +241,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
232241
assert(finished == false)
233242
} else {
234243
// If it wasn't sent, time deadline must have been reached before stream became available,
235-
// will exit in the enxt loop iterattion.
236-
assert(deadlineLimitReached)
244+
// or it was intterupted. Will exit in the next loop iterattion.
245+
assert(deadlineLimitReached || interrupted)
237246
}
238247
} else if (streamFinished) {
239248
// Stream is finished and all responses have been sent
@@ -301,7 +310,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
301310
val sleepStart = System.nanoTime()
302311
var sleepEnd = 0L
303312
// Conditions for exiting the inner loop
304-
// 1. was detached
313+
// 1. was interrupted
305314
// 2. grpcCallObserver is ready to send more data
306315
// 3. time deadline is reached
307316
while (!interrupted &&

connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,16 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
7373
/** The index of the last response produced by execution. */
7474
private var lastProducedIndex: Long = 0 // first response will have index 1
7575

76+
// For testing
77+
private[connect] var releasedUntilIndex: Long = 0
78+
7679
/**
7780
* Highest response index that was consumed. Keeps track of it to decide which responses needs
7881
* to be cached, and to assert that all responses are consumed.
82+
*
83+
* Visible for testing.
7984
*/
80-
private var highestConsumedIndex: Long = 0
85+
private[connect] var highestConsumedIndex: Long = 0
8186

8287
/**
8388
* Consumer that waits for available responses. There can be only one at a time, @see
@@ -284,6 +289,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
284289
responses.remove(i)
285290
i -= 1
286291
}
292+
releasedUntilIndex = index
287293
}
288294

289295
/**

connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,16 @@ private[connect] class ExecuteHolder(
183183
}
184184
}
185185

186+
// For testing.
187+
private[connect] def setGrpcResponseSendersDeadline(deadlineMs: Long) = synchronized {
188+
grpcResponseSenders.foreach(_.setDeadline(deadlineMs))
189+
}
190+
191+
// For testing
192+
private[connect] def interruptGrpcResponseSenders() = synchronized {
193+
grpcResponseSenders.foreach(_.interrupt())
194+
}
195+
186196
/**
187197
* For a short period in ExecutePlan after creation and until runGrpcResponseSender is called,
188198
* there is no attached response sender, but yet we start with lastAttachedRpcTime = None, so we

connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,14 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
7171
// The latter is to prevent double execution when a client retries execution, thinking it
7272
// never reached the server, but in fact it did, and already got removed as abandoned.
7373
if (executions.get(executeHolder.key).isDefined) {
74-
if (getAbandonedTombstone(executeHolder.key).isDefined) {
75-
throw new SparkSQLException(
76-
errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
77-
messageParameters = Map("handle" -> executeHolder.operationId))
78-
} else {
79-
throw new SparkSQLException(
80-
errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
81-
messageParameters = Map("handle" -> executeHolder.operationId))
82-
}
74+
throw new SparkSQLException(
75+
errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
76+
messageParameters = Map("handle" -> executeHolder.operationId))
77+
}
78+
if (getAbandonedTombstone(executeHolder.key).isDefined) {
79+
throw new SparkSQLException(
80+
errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
81+
messageParameters = Map("handle" -> executeHolder.operationId))
8382
}
8483
sessionHolder.addExecuteHolder(executeHolder)
8584
executions.put(executeHolder.key, executeHolder)
@@ -141,12 +140,17 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
141140
abandonedTombstones.asMap.asScala.values.toBuffer.toSeq
142141
}
143142

144-
private[service] def shutdown(): Unit = executionsLock.synchronized {
143+
private[connect] def shutdown(): Unit = executionsLock.synchronized {
145144
scheduledExecutor.foreach { executor =>
146145
executor.shutdown()
147146
executor.awaitTermination(1, TimeUnit.MINUTES)
148147
}
149148
scheduledExecutor = None
149+
executions.clear()
150+
abandonedTombstones.invalidateAll()
151+
if (!lastExecutionTime.isDefined) {
152+
lastExecutionTime = Some(System.currentTimeMillis())
153+
}
150154
}
151155

152156
/**
@@ -188,7 +192,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
188192
executions.values.foreach { executeHolder =>
189193
executeHolder.lastAttachedRpcTime match {
190194
case Some(detached) =>
191-
if (detached + timeout < nowMs) {
195+
if (detached + timeout <= nowMs) {
192196
toRemove += executeHolder
193197
}
194198
case _ => // execution is active
@@ -206,4 +210,18 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
206210
}
207211
logInfo("Finished periodic run of SparkConnectExecutionManager maintenance.")
208212
}
213+
214+
// For testing.
215+
private[connect] def setAllRPCsDeadline(deadlineMs: Long) = executionsLock.synchronized {
216+
executions.values.foreach(_.setGrpcResponseSendersDeadline(deadlineMs))
217+
}
218+
219+
// For testing.
220+
private[connect] def interruptAllRPCs() = executionsLock.synchronized {
221+
executions.values.foreach(_.interruptGrpcResponseSenders())
222+
}
223+
224+
private[connect] def listExecuteHolders = executionsLock.synchronized {
225+
executions.values.toBuffer.toSeq
226+
}
209227
}

0 commit comments

Comments
 (0)