Skip to content

feature: Add graceful shutdown support for Netty server on SIGTERM#4137

Merged
xerial merged 9 commits intomainfrom
claude/netty-graceful-shutdown-sigterm-nnp54
Jan 9, 2026
Merged

feature: Add graceful shutdown support for Netty server on SIGTERM#4137
xerial merged 9 commits intomainfrom
claude/netty-graceful-shutdown-sigterm-nnp54

Conversation

@xerial
Copy link
Copy Markdown
Member

@xerial xerial commented Jan 8, 2026

This adds support for gracefully shutting down the Netty server when
receiving SIGTERM signals, which is essential for Kubernetes deployments.

Changes:

  • Add shutdownQuietPeriodSeconds and shutdownTimeoutSeconds config options
  • Add registerShutdownHook option to automatically handle SIGTERM/SIGINT
  • Add withGracefulShutdown(), withShutdownHook(), withShutdownQuietPeriod(),
    withShutdownTimeout() builder methods
  • Update stop() to use configurable grace periods instead of immediate shutdown
  • Add JVM shutdown hook registration for automatic signal handling
  • Add tests for the new graceful shutdown functionality

Usage:
Netty.server
.withGracefulShutdown(quietPeriodSeconds = 5, timeoutSeconds = 30)
.withShutdownHook // Enable automatic SIGTERM handling
.design
.build { server => server.awaitTermination() }

claude added 2 commits January 8, 2026 03:27
This adds support for gracefully shutting down the Netty server when
receiving SIGTERM signals, which is essential for Kubernetes deployments.

Changes:
- Add shutdownQuietPeriodSeconds and shutdownTimeoutSeconds config options
- Add registerShutdownHook option to automatically handle SIGTERM/SIGINT
- Add withGracefulShutdown(), withShutdownHook(), withShutdownQuietPeriod(),
  withShutdownTimeout() builder methods
- Update stop() to use configurable grace periods instead of immediate shutdown
- Add JVM shutdown hook registration for automatic signal handling
- Add tests for the new graceful shutdown functionality

Usage:
  Netty.server
    .withGracefulShutdown(quietPeriodSeconds = 5, timeoutSeconds = 30)
    .withShutdownHook  // Enable automatic SIGTERM handling
    .design
    .build { server => server.awaitTermination() }
This adds the ability to track active connections and in-flight requests
(including streaming responses like SSE) during graceful shutdown.

Changes:
- Add NettyConnectionTracker class with ChannelGroup and AtomicLong counter
- Track active connections using Netty's ChannelGroup
- Track in-flight requests including SSE streaming responses
- Add activeConnectionCount, activeRequestCount, hasActiveRequests to NettyServer
- Wait for active requests to complete during shutdown with configurable timeout
- Log active connection/request counts during shutdown for debugging

Usage:
  server.activeConnectionCount  // Number of connected clients
  server.activeRequestCount     // Number of in-flight requests
  server.hasActiveRequests      // Whether any requests are being processed

During shutdown, the server will:
1. Stop accepting new connections
2. Wait for active requests (including streaming) to complete
3. Log progress and timeout warnings
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @xerial, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Netty server's resilience and operational stability by implementing a robust graceful shutdown mechanism. It allows the server to respond to termination signals (like SIGTERM in Kubernetes) by first stopping new connections, then patiently waiting for ongoing requests to complete within defined quiet periods and timeouts, before finally shutting down. This ensures that client requests are not abruptly interrupted, leading to a smoother and more reliable service termination process.

Highlights

  • Graceful Shutdown Configuration: Introduced configurable graceful shutdown parameters (shutdownQuietPeriodSeconds, shutdownTimeoutSeconds) for the Netty server, allowing for a controlled termination process.
  • Automatic Shutdown Hook: Added an option (registerShutdownHook) to automatically register a JVM shutdown hook, enabling the Netty server to gracefully handle SIGTERM/SIGINT signals, crucial for containerized deployments like Kubernetes.
  • Connection and Request Tracking: Implemented a NettyConnectionTracker to monitor active client connections and in-flight requests, including streaming responses, ensuring all ongoing work can complete before shutdown.
  • Refactored Server Stop Logic: The server's stop() method has been refactored to incorporate the graceful shutdown sequence: stopping new connections, waiting for active requests to complete, and then gracefully shutting down Netty's event loop groups.
  • Fluent API for Configuration: Provided new builder methods (withGracefulShutdown, withShutdownHook, withShutdownQuietPeriod, withShutdownTimeout) in NettyServerConfig for easy and fluent configuration of the new shutdown features.
  • Comprehensive Testing: Added new unit tests (NettyGracefulShutdownTest, NettyConnectionTrackingTest) to validate the functionality and configuration of the graceful shutdown and connection tracking mechanisms.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a crucial feature for graceful shutdown of the Netty server, which is particularly important for containerized environments like Kubernetes. The changes are well-structured, with new configuration options, builder methods, and a dedicated NettyConnectionTracker. The implementation of the shutdown logic in NettyServer.stop() is a significant improvement over the previous immediate shutdown. I've identified a few critical issues related to request tracking and timeout handling that could prevent the graceful shutdown from working as expected. I've also suggested an improvement to one of the new tests to make it more robust and effective. Overall, this is a great addition, and with these fixes, it will be a very solid feature.

Comment on lines +66 to +67
// Flag to track if this is a streaming response (which completes asynchronously)
var isStreaming = false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The isStreaming flag is assigned but its value is never read, which indicates it's unused and can be removed. There's also a related issue where a request-tracking leak can occur if the dispatcher returns an empty Rx stream, because the OnCompletion handler doesn't account for this scenario.

Both problems can be resolved by replacing isStreaming with a new flag, such as onNextCalled, to correctly track the request lifecycle and prevent leaks.

Suggested change
// Flag to track if this is a streaming response (which completes asynchronously)
var isStreaming = false
// Flag to track if OnNext has been called to handle Rx.empty case
var onNextCalled = false

Comment on lines 472 to 518
if (stopped.compareAndSet(false, true)) {
info(s"Stopping ${config.name} server at ${localAddress}")
workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS)
bossGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS)
val activeConns = connectionTracker.activeConnectionCount
val activeReqs = connectionTracker.activeRequestCount
info(
s"Stopping ${config.name} server at ${localAddress} " +
s"(activeConnections=$activeConns, activeRequests=$activeReqs, " +
s"quietPeriod=${config.shutdownQuietPeriodSeconds}s, timeout=${config.shutdownTimeoutSeconds}s)"
)

// Unregister shutdown hook to prevent double-stop during JVM shutdown
unregisterShutdownHook()

// Close the server channel first to stop accepting new connections
channelFuture.foreach(_.close().await(config.shutdownTimeoutSeconds, TimeUnit.SECONDS))

// Wait for active requests (including streaming responses) to complete
if (connectionTracker.hasActiveRequests) {
info(s"Waiting for ${connectionTracker.activeRequestCount} active requests to complete...")
val completed = connectionTracker.awaitCompletion(config.shutdownTimeoutSeconds, TimeUnit.SECONDS)
if (completed) {
info("All active requests completed")
} else {
warn(s"Timeout waiting for active requests, ${connectionTracker.activeRequestCount} requests still pending")
}
}

// Gracefully shutdown worker group first to complete in-flight requests
val workerFuture = workerGroup.shutdownGracefully(
config.shutdownQuietPeriodSeconds,
config.shutdownTimeoutSeconds,
TimeUnit.SECONDS
)
workerFuture.await(config.shutdownTimeoutSeconds, TimeUnit.SECONDS)

// Then shutdown boss group
val bossFuture = bossGroup.shutdownGracefully(
config.shutdownQuietPeriodSeconds,
config.shutdownTimeoutSeconds,
TimeUnit.SECONDS
)
bossFuture.await(config.shutdownTimeoutSeconds, TimeUnit.SECONDS)

// Close the HTTP logger
httpLogger.close()
channelFuture.foreach(_.close().await(1, TimeUnit.SECONDS))

info(s"${config.name} server stopped")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of the stop method does not correctly enforce the overall shutdownTimeoutSeconds. Each blocking await call uses the full timeout value, which can cause the total shutdown time to be much longer than intended (e.g., up to 3x shutdownTimeoutSeconds or more). This can be problematic in environments that expect a timely shutdown.

To fix this, you should calculate a single deadline at the start of the stop method and then use the remaining time for each subsequent blocking operation. This ensures the entire shutdown process respects the configured timeout.

    if (stopped.compareAndSet(false, true)) {
      val activeConns = connectionTracker.activeConnectionCount
      val activeReqs = connectionTracker.activeRequestCount
      info(
        s"Stopping ${config.name} server at ${localAddress} " +
          s"(activeConnections=$activeConns, activeRequests=$activeReqs, " +
          s"quietPeriod=${config.shutdownQuietPeriodSeconds}s, timeout=${config.shutdownTimeoutSeconds}s)"
      )

      // Unregister shutdown hook to prevent double-stop during JVM shutdown
      unregisterShutdownHook()

      val deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(config.shutdownTimeoutSeconds)
      def remainingNanos: Long = math.max(0, deadline - System.nanoTime())

      // Close the server channel first to stop accepting new connections
      channelFuture.foreach(_.close().await(remainingNanos, TimeUnit.NANOSECONDS))

      // Wait for active requests (including streaming responses) to complete
      if (connectionTracker.hasActiveRequests) {
        info(s"Waiting for ${connectionTracker.activeRequestCount} active requests to complete...")
        val completed = connectionTracker.awaitCompletion(remainingNanos, TimeUnit.NANOSECONDS)
        if (completed) {
          info("All active requests completed")
        } else {
          warn(s"Timeout waiting for active requests, ${connectionTracker.activeRequestCount} requests still pending")
        }
      }

      // Gracefully shutdown worker group first to complete in-flight requests
      val workerFuture = workerGroup.shutdownGracefully(
        TimeUnit.SECONDS.toNanos(config.shutdownQuietPeriodSeconds),
        remainingNanos,
        TimeUnit.NANOSECONDS
      )
      workerFuture.await(remainingNanos, TimeUnit.NANOSECONDS)

      // Then shutdown boss group
      val bossFuture = bossGroup.shutdownGracefully(
        TimeUnit.SECONDS.toNanos(config.shutdownQuietPeriodSeconds),
        remainingNanos,
        TimeUnit.NANOSECONDS
      )
      bossFuture.await(remainingNanos, TimeUnit.NANOSECONDS)

      // Close the HTTP logger
      httpLogger.close()

      info(s"${config.name} server stopped")
    }

Comment on lines +153 to +168
test("request tracking with simple HTTP request") {
val router = RxRouter.of[SimpleApi]
val config = Netty.server
.withRouter(router)
.noLogging

config.designWithSyncClient.build[SyncClient] { client =>
// Make a request
val response = client.send(Http.GET("/hello"))
response.status shouldBe HttpStatus.Ok_200

// After request completes, count should return to 0
// (there may be a small delay for async completion)
Thread.sleep(100)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test is named request tracking with simple HTTP request, but it doesn't actually perform any assertions on the request tracking counters. This means a key part of the graceful shutdown functionality is not being verified. The use of Thread.sleep is also not ideal as it can lead to flaky tests.

To make this test effective, you should get a reference to the NettyServer instance and assert that activeRequestCount is zero after the request completes. Using eventually from AirSpec will make the assertion more robust against timing variations in the asynchronous counter updates.

  test("request tracking with simple HTTP request") {
    val router = RxRouter.of[SimpleApi]
    val config = Netty.server
      .withRouter(router)
      .noLogging

    config.designWithSyncClient.build[NettyServer, SyncClient] { (server, client) =>
      // Initially, the request count should be zero
      server.activeRequestCount shouldBe 0

      // Make a request
      val response = client.send(Http.GET("/hello"))
      response.status shouldBe HttpStatus.Ok_200

      // After the request completes, the count should return to 0.
      // Use `eventually` to handle the asynchronous nature of request completion tracking.
      eventually {
        server.activeRequestCount shouldBe 0
      }
    }
  }

- Fix Rx.empty request tracking leak by tracking if OnNext was called
- Use single deadline in stop() to ensure total shutdown respects timeout
- Improve test to assert on request tracking counters
- Use idiomatic Scala tail recursion instead of return statements

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@xerial
Copy link
Copy Markdown
Member Author

xerial commented Jan 8, 2026

/gemini reveiw

@gemini-code-assist
Copy link
Copy Markdown
Contributor

It looks like there might be a typo in your command. Did you mean to type /gemini review? The command /gemini review will perform a code review for the current pull request in its current state.

- Remove unused closeAllConnections() method and ChannelGroupFuture import
- Fix potential busy loop by ensuring minimum 1ms sleep interval
- Fix SSE pattern match exhaustivity warning with catch-all case
- Remove redundant SyncClient binding in test
- Add validation for shutdown config values (require non-negative/positive)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@xerial
Copy link
Copy Markdown
Member Author

xerial commented Jan 8, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds excellent support for graceful shutdown to the Netty server, a critical feature for modern deployment environments like Kubernetes. The implementation is robust, covering configuration, request tracking with NettyConnectionTracker, and a well-orchestrated shutdown sequence that respects timeouts. The inclusion of a JVM shutdown hook is a thoughtful addition. My review includes a fix for a potential timing bug in the request completion waiting logic and a small refactoring suggestion to improve code maintainability.

Comment on lines +278 to +304
def sleepUnlessInterrupted(): Boolean = {
try {
Thread.sleep(math.min(pollIntervalMs, math.max(1L, unit.toMillis(timeout))))
true
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
false
}
}

@scala.annotation.tailrec
def loop(): Boolean = {
if (!hasActiveRequests) {
true
} else {
val remainingNanos = deadlineNanos - System.nanoTime()
if (remainingNanos <= 0) {
warn(s"Timeout waiting for ${activeRequests.get()} active requests to complete")
false
} else if (!sleepUnlessInterrupted()) {
false
} else {
loop()
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The awaitCompletion method has a subtle bug in its polling loop. The sleepUnlessInterrupted function calculates the sleep duration based on the initial timeout parameter, not the dynamically updated remaining time. This can cause the method to block for longer than the intended timeout, especially when the remaining time is less than the pollIntervalMs. The sleep duration should be calculated using the remainingNanos within the loop to ensure the timeout is strictly respected.

    def sleepUnlessInterrupted(remainingNanos: Long): Boolean = {
      try {
        val sleepMillis = math.min(pollIntervalMs, math.max(1L, TimeUnit.NANOSECONDS.toMillis(remainingNanos)))
        Thread.sleep(sleepMillis)
        true
      } catch {
        case _: InterruptedException =>
          Thread.currentThread().interrupt()
          false
      }
    }

    @scala.annotation.tailrec
    def loop(): Boolean = {
      if (!hasActiveRequests) {
        true
      } else {
        val remainingNanos = deadlineNanos - System.nanoTime()
        if (remainingNanos <= 0) {
          warn(s"Timeout waiting for ${activeRequests.get()} active requests to complete")
          false
        } else if (!sleepUnlessInterrupted(remainingNanos)) {
          false
        } else {
          loop()
        }
      }
    }

Comment on lines 129 to 147
val c = RxRunner.run(resp.events) {
case OnNext(e: ServerSentEvent) =>
val event = e.toContent
val buf = Unpooled.copiedBuffer(event.getBytes("UTF-8"))
ctx.writeAndFlush(new DefaultHttpContent(buf))
case _ =>
case OnNext(_) =>
// Ignore non-ServerSentEvent values
()
case OnError(_) =>
// Stream error - mark request as completed and close
connectionTracker.requestCompleted()
val f = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
f.addListener(ChannelFutureListener.CLOSE)
case OnCompletion =>
// Stream completed - mark request as completed and close
connectionTracker.requestCompleted()
val f = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
f.addListener(ChannelFutureListener.CLOSE)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's some duplicated code for handling OnError and OnCompletion for SSE streams. Both cases perform the same actions: mark the request as completed and close the channel. This logic can be extracted into a local helper function to improve readability and maintainability.

            def completeSseStream(): Unit = {
              connectionTracker.requestCompleted()
              val f = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
              f.addListener(ChannelFutureListener.CLOSE)
            }

            val c = RxRunner.run(resp.events) {
              case OnNext(e: ServerSentEvent) =>
                val event = e.toContent
                val buf   = Unpooled.copiedBuffer(event.getBytes("UTF-8"))
                ctx.writeAndFlush(new DefaultHttpContent(buf))
              case OnNext(_) =>
                // Ignore non-ServerSentEvent values
                ()
              case OnError(_) =>
                // Stream error - mark request as completed and close
                completeSseStream()
              case OnCompletion =>
                // Stream completed - mark request as completed and close
                completeSseStream()
            }

xerial and others added 5 commits January 8, 2026 20:07
The sleep duration should be calculated using the dynamically updated
remainingNanos within the loop, not the initial timeout parameter.
This ensures the timeout is strictly respected.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
- Use channel attribute to track if request is in-flight
- In exceptionCaught, decrement request count if request was in-flight
- Catch all Exception types (not just RPCException) to prevent leaks
- Use markCompleted() helper to safely clear in-flight flag and decrement

This prevents activeRequests from leaking when:
1. Non-RPCException is thrown before RxRunner is set up
2. exceptionCaught is called during mid-request disconnect/pipeline error

Co-Authored-By: Claude Opus 4.5 <[email protected]>
- Fixes NPE when exceptionCaught is called before any request was processed
  (attribute unset, getAndSet returns null, unboxing fails)
- Fixes undercount with HTTP/1.1 pipelining by using a counter instead of
  boolean flag (multiple requests can overlap on same connection)
- Added 'completed' flag to prevent double-completion within same request

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Removed per-request tracking (NettyConnectionTracker) and simplified the
implementation to rely on Netty's shutdownGracefully which already handles:
- Stopping new connection acceptance
- Waiting for in-flight tasks during quiet period
- Completing tasks within timeout

Changes:
- Remove NettyConnectionTracker class entirely
- Revert NettyRequestHandler to original shape (no tracking)
- Simplify stop() to: close server channel, shutdownGracefully worker/boss
- Remove connection/request counting tests
- Keep only config knobs and shutdown hook functionality

This significantly reduces complexity while achieving the same goal of
handling SIGTERM gracefully for Kubernetes deployments.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
- Changed stop() to use a single deadline across all shutdown steps
  (channel close, worker group, boss group) to ensure total shutdown
  time respects the configured timeout
- Removed unused imports from test file (RxRouter, TimeUnit, etc.)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@xerial xerial enabled auto-merge (squash) January 9, 2026 17:49
@xerial xerial merged commit c229ecf into main Jan 9, 2026
23 checks passed
@xerial xerial deleted the claude/netty-graceful-shutdown-sigterm-nnp54 branch January 9, 2026 17:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants