Skip to content

Commit e7002c1

Browse files
author
jinxing
committed
[SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry
1 parent f27e024 commit e7002c1

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.rpc
1919

2020
import scala.concurrent.Future
2121
import scala.reflect.ClassTag
22+
import scala.util.control.NonFatal
2223

2324
import org.apache.spark.{SparkConf, SparkException}
2425
import org.apache.spark.internal.Logging
@@ -63,8 +64,38 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
6364
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
6465

6566
/**
66-
* Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
67-
* timeout, or throw a SparkException if this fails even after the default number of retries.
67+
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
68+
* default timeout, throw an exception if this fails.
69+
*
70+
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
71+
* loop of [[RpcEndpoint]].
72+
73+
* @param message the message to send
74+
* @tparam T type of the reply message
75+
* @return the reply message from the corresponding [[RpcEndpoint]]
76+
*/
77+
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
78+
79+
/**
80+
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
81+
* specified timeout, throw an exception if this fails.
82+
*
83+
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
84+
* loop of [[RpcEndpoint]].
85+
*
86+
* @param message the message to send
87+
* @param timeout the timeout duration
88+
* @tparam T type of the reply message
89+
* @return the reply message from the corresponding [[RpcEndpoint]]
90+
*/
91+
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
92+
val future = ask[T](message, timeout)
93+
timeout.awaitResult(future)
94+
}
95+
96+
/**
97+
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
98+
* default timeout, throw a SparkException if this fails even after the default number of retries.
6899
* The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
69100
* method retries, the message handling in the receiver side should be idempotent.
70101
*
@@ -75,10 +106,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
75106
* @tparam T type of the reply message
76107
* @return the reply message from the corresponding [[RpcEndpoint]]
77108
*/
109+
@deprecated("use 'askSync' instead.", "2.1.0")
78110
def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)
79111

80112
/**
81-
* Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
113+
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
82114
* specified timeout, throw a SparkException if this fails even after the specified number of
83115
* retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
84116
* retries, the message handling in the receiver side should be idempotent.
@@ -91,6 +123,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
91123
* @tparam T type of the reply message
92124
* @return the reply message from the corresponding [[RpcEndpoint]]
93125
*/
126+
@deprecated("use 'askSync' instead.", "2.1.0")
94127
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
95128
// TODO: Consider removing multiple attempts
96129
var attempts = 0

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private[streaming] class ReceiverSupervisorImpl(
159159
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
160160
val numRecords = blockStoreResult.numRecords
161161
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
162-
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
162+
trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))
163163
logDebug(s"Reported block $blockId")
164164
}
165165

0 commit comments

Comments
 (0)