@@ -19,6 +19,7 @@ package org.apache.spark.rpc
1919
2020import scala .concurrent .Future
2121import scala .reflect .ClassTag
22+ import scala .util .control .NonFatal
2223
2324import org .apache .spark .{SparkConf , SparkException }
2425import org .apache .spark .internal .Logging
@@ -63,8 +64,38 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
6364 def ask [T : ClassTag ](message : Any ): Future [T ] = ask(message, defaultAskTimeout)
6465
6566 /**
66- * Send a message to the corresponding [[RpcEndpoint ]] and get its result within a default
67- * timeout, or throw a SparkException if this fails even after the default number of retries.
67+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply ]] and get its result within a
68+ * default timeout, throw an exception if this fails.
69+ *
70+ * Note: this is a blocking action which may cost a lot of time, so don't call it in a message
71+ * loop of [[RpcEndpoint ]].
72+
73+ * @param message the message to send
74+ * @tparam T type of the reply message
75+ * @return the reply message from the corresponding [[RpcEndpoint ]]
76+ */
77+ def askSync [T : ClassTag ](message : Any ): T = askSync(message, defaultAskTimeout)
78+
79+ /**
80+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply ]] and get its result within a
81+ * specified timeout, throw an exception if this fails.
82+ *
83+ * Note: this is a blocking action which may cost a lot of time, so don't call it in a message
84+ * loop of [[RpcEndpoint ]].
85+ *
86+ * @param message the message to send
87+ * @param timeout the timeout duration
88+ * @tparam T type of the reply message
89+ * @return the reply message from the corresponding [[RpcEndpoint ]]
90+ */
91+ def askSync [T : ClassTag ](message : Any , timeout : RpcTimeout ): T = {
92+ val future = ask[T ](message, timeout)
93+ timeout.awaitResult(future)
94+ }
95+
96+ /**
97+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply ]] and get its result within a
98+ * default timeout, throw a SparkException if this fails even after the default number of retries.
6899 * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
69100 * method retries, the message handling in the receiver side should be idempotent.
70101 *
@@ -75,10 +106,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
75106 * @tparam T type of the reply message
76107 * @return the reply message from the corresponding [[RpcEndpoint ]]
77108 */
109+ @ deprecated(" use 'askSync' instead." , " 2.1.0" )
78110 def askWithRetry [T : ClassTag ](message : Any ): T = askWithRetry(message, defaultAskTimeout)
79111
80112 /**
81- * Send a message to the corresponding [[RpcEndpoint.receive ]] and get its result within a
113+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply ]] and get its result within a
82114 * specified timeout, throw a SparkException if this fails even after the specified number of
83115 * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
84116 * retries, the message handling in the receiver side should be idempotent.
@@ -91,6 +123,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
91123 * @tparam T type of the reply message
92124 * @return the reply message from the corresponding [[RpcEndpoint ]]
93125 */
126+ @ deprecated(" use 'askSync' instead." , " 2.1.0" )
94127 def askWithRetry [T : ClassTag ](message : Any , timeout : RpcTimeout ): T = {
95128 // TODO: Consider removing multiple attempts
96129 var attempts = 0
0 commit comments