Skip to content

Commit 09da07c

Browse files
authored
Merge pull request #5638 from square/jwilson.1208.degraded_connections
Degrade connections after a timeout
2 parents 15b35c6 + b5efb41 commit 09da07c

File tree

6 files changed

+243
-57
lines changed

6 files changed

+243
-57
lines changed

mockwebserver/src/main/java/okhttp3/mockwebserver/MockWebServer.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ class MockWebServer : ExternalResource(), Closeable {
526526
}
527527

528528
if (socketPolicy === STALL_SOCKET_AT_START) {
529+
dispatchBookkeepingRequest(sequenceNumber, socket)
529530
return // Ignore the socket until the server is shut down!
530531
}
531532

okhttp/src/main/java/okhttp3/internal/connection/RealConnection.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ class RealConnection(
614614

615615
val http2Connection = this.http2Connection
616616
if (http2Connection != null) {
617-
return !http2Connection.isShutdown
617+
return http2Connection.isHealthy(System.nanoTime())
618618
}
619619

620620
if (doExtensiveChecks) {

okhttp/src/main/java/okhttp3/internal/http2/Http2Connection.kt

Lines changed: 94 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
7676
/** http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1 */
7777
internal var nextStreamId = if (builder.client) 3 else 2
7878

79-
@get:Synchronized var isShutdown = false
80-
internal set
79+
private var isShutdown = false
8180

8281
/** For scheduling everything asynchronous. */
8382
private val taskRunner = builder.taskRunner
@@ -94,8 +93,16 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
9493
/** User code to run in response to push promise events. */
9594
private val pushObserver: PushObserver = builder.pushObserver
9695

97-
/** True if we have sent a ping that is still awaiting a reply. */
98-
private var awaitingPong = false
96+
// Total number of pings send and received of the corresponding types. All guarded by this.
97+
private var intervalPingsSent = 0L
98+
private var intervalPongsReceived = 0L
99+
private var degradedPingsSent = 0L
100+
private var degradedPongsReceived = 0L
101+
private var awaitPingsSent = 0L
102+
private var awaitPongsReceived = 0L
103+
104+
/** Consider this connection to be unhealthy if a degraded pong isn't received by this time. */
105+
private var degradedPongDeadlineNs = 0L
99106

100107
/** Settings we communicate to the peer. */
101108
val okHttpSettings = Settings().apply {
@@ -142,8 +149,21 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
142149
if (builder.pingIntervalMillis != 0) {
143150
val pingIntervalNanos = TimeUnit.MILLISECONDS.toNanos(builder.pingIntervalMillis.toLong())
144151
writerQueue.schedule("$connectionName ping", pingIntervalNanos) {
145-
writePing(false, 0, 0)
146-
return@schedule pingIntervalNanos
152+
val failDueToMissingPong = synchronized(this@Http2Connection) {
153+
if (intervalPongsReceived < intervalPingsSent) {
154+
return@synchronized true
155+
} else {
156+
intervalPingsSent++
157+
return@synchronized false
158+
}
159+
}
160+
if (failDueToMissingPong) {
161+
failConnection(null)
162+
return@schedule -1L
163+
} else {
164+
writePing(false, INTERVAL_PING, 0)
165+
return@schedule pingIntervalNanos
166+
}
147167
}
148168
}
149169
}
@@ -351,18 +371,6 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
351371
payload1: Int,
352372
payload2: Int
353373
) {
354-
if (!reply) {
355-
val failedDueToMissingPong: Boolean
356-
synchronized(this) {
357-
failedDueToMissingPong = awaitingPong
358-
awaitingPong = true
359-
}
360-
if (failedDueToMissingPong) {
361-
failConnection(null)
362-
return
363-
}
364-
}
365-
366374
try {
367375
writer.ping(reply, payload1, payload2)
368376
} catch (e: IOException) {
@@ -373,14 +381,23 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
373381
/** For testing: sends a ping and waits for a pong. */
374382
@Throws(InterruptedException::class)
375383
fun writePingAndAwaitPong() {
376-
writePing(false, 0x4f4b6f6b /* "OKok" */, -0xf607257 /* donut */)
384+
writePing()
377385
awaitPong()
378386
}
379387

380-
/** For testing: waits until `requiredPongCount` pings have been received from the peer. */
388+
/** For testing: sends a ping to be awaited with [awaitPong]. */
389+
@Throws(InterruptedException::class)
390+
fun writePing() {
391+
synchronized(this) {
392+
awaitPingsSent++
393+
}
394+
writePing(false, AWAIT_PING, 0x4f4b6f6b /* "OKok" */)
395+
}
396+
397+
/** For testing: awaits a pong. */
381398
@Synchronized @Throws(InterruptedException::class)
382399
fun awaitPong() {
383-
while (awaitingPong) {
400+
while (awaitPongsReceived < awaitPingsSent) {
384401
wait()
385402
}
386403
}
@@ -499,6 +516,42 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
499516
}
500517
}
501518

519+
@Synchronized
520+
fun isHealthy(nowNs: Long): Boolean {
521+
if (isShutdown) return false
522+
523+
// A degraded pong is overdue.
524+
if (degradedPongsReceived < degradedPingsSent && nowNs >= degradedPongDeadlineNs) return false
525+
526+
return true
527+
}
528+
529+
/**
530+
* HTTP/2 can have both stream timeouts (due to a problem with a single stream) and connection
531+
* timeouts (due to a problem with the transport). When a stream times out we don't know whether
532+
* the problem impacts just one stream or the entire connection.
533+
*
534+
* To differentiate the two cases we ping the server when a stream times out. If the overall
535+
* connection is fine the ping will receive a pong; otherwise it won't.
536+
*
537+
* The deadline to respond to this ping attempts to limit the cost of being wrong. If it is too
538+
* long, streams created while we await the pong will reuse broken connections and inevitably
539+
* fail. If it is too short, slow connections will be marked as failed and extra TCP and TLS
540+
* handshakes will be required.
541+
*
542+
* The deadline is currently hardcoded. We may make this configurable in the future!
543+
*/
544+
internal fun sendDegradedPingLater() {
545+
synchronized(this) {
546+
if (degradedPongsReceived < degradedPingsSent) return // Already awaiting a degraded pong.
547+
degradedPingsSent++
548+
degradedPongDeadlineNs = System.nanoTime() + DEGRADED_PONG_TIMEOUT_NS
549+
}
550+
writerQueue.execute("$connectionName ping") {
551+
writePing(false, DEGRADED_PING, 0)
552+
}
553+
}
554+
502555
class Builder(
503556
/** True if this peer initiated the connection; false if this peer accepted the connection. */
504557
internal var client: Boolean,
@@ -728,8 +781,21 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
728781
) {
729782
if (ack) {
730783
synchronized(this@Http2Connection) {
731-
awaitingPong = false
732-
this@Http2Connection.notifyAll()
784+
when (payload1) {
785+
INTERVAL_PING -> {
786+
intervalPongsReceived++
787+
}
788+
DEGRADED_PING -> {
789+
degradedPongsReceived++
790+
}
791+
AWAIT_PING -> {
792+
awaitPongsReceived++
793+
this@Http2Connection.notifyAll()
794+
}
795+
else -> {
796+
// Ignore an unexpected pong.
797+
}
798+
}
733799
}
734800
} else {
735801
// Send a reply to a client ping if this is a server and vice versa.
@@ -926,5 +992,10 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {
926992
set(Settings.INITIAL_WINDOW_SIZE, DEFAULT_INITIAL_WINDOW_SIZE)
927993
set(Settings.MAX_FRAME_SIZE, Http2.INITIAL_MAX_FRAME_SIZE)
928994
}
995+
996+
const val INTERVAL_PING = 1
997+
const val DEGRADED_PING = 2
998+
const val AWAIT_PING = 3
999+
const val DEGRADED_PONG_TIMEOUT_NS = 1_000_000_000 // 1 second.
9291000
}
9301001
}

okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,7 @@ class Http2Stream internal constructor(
670670
internal inner class StreamTimeout : AsyncTimeout() {
671671
override fun timedOut() {
672672
closeLater(ErrorCode.CANCEL)
673+
connection.sendDegradedPingLater()
673674
}
674675

675676
override fun newTimeoutException(cause: IOException?): IOException {

0 commit comments

Comments
 (0)