@@ -396,6 +396,28 @@ ConnectionManagerImpl::ActiveStream::~ActiveStream() {
396396 ASSERT (state_.filter_call_state_ == 0 );
397397}
398398
399+ void ConnectionManagerImpl::ActiveStream::resetIdleTimer () {
400+ if (idle_timer_ != nullptr ) {
401+ // TODO(htuch): If this shows up in performance profiles, optimize by only
402+ // updating a timestamp here and doing periodic checks for idle timeouts
403+ // instead, or reducing the accuracy of timers.
404+ idle_timer_->enableTimer (idle_timeout_ms_);
405+ }
406+ }
407+
408+ void ConnectionManagerImpl::ActiveStream::onIdleTimeout () {
409+ connection_manager_.stats_ .named_ .downstream_rq_idle_timeout_ .inc ();
410+ // If headers have not been sent to the user, send a 408.
411+ if (response_headers_ != nullptr ) {
412+ // TODO(htuch): We could send trailers here with an x-envoy timeout header
413+ // or gRPC status code, and/or set H2 RST_STREAM error.
414+ connection_manager_.doEndStream (*this );
415+ } else {
416+ sendLocalReply (Grpc::Common::hasGrpcContentType (*request_headers_), Http::Code::RequestTimeout,
417+ " stream timeout" , nullptr );
418+ }
419+ }
420+
399421void ConnectionManagerImpl::ActiveStream::addStreamDecoderFilterWorker (
400422 StreamDecoderFilterSharedPtr filter, bool dual_filter) {
401423 ActiveStreamDecoderFilterPtr wrapper (new ActiveStreamDecoderFilter (*this , filter, dual_filter));
@@ -579,6 +601,16 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers,
579601 // Allow non websocket requests to go through websocket enabled routes.
580602 }
581603
604+ if (cached_route_.value ()) {
605+ const Router::RouteEntry* route_entry = cached_route_.value ()->routeEntry ();
606+ if (route_entry != nullptr && route_entry->idleTimeout ()) {
607+ idle_timeout_ms_ = route_entry->idleTimeout ().value ();
608+ idle_timer_ = connection_manager_.read_callbacks_ ->connection ().dispatcher ().createTimer (
609+ [this ]() -> void { onIdleTimeout (); });
610+ resetIdleTimer ();
611+ }
612+ }
613+
582614 // Check if tracing is enabled at all.
583615 if (connection_manager_.config_ .tracingConfig ()) {
584616 traceRequest ();
@@ -702,6 +734,8 @@ void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, boo
702734
703735void ConnectionManagerImpl::ActiveStream::decodeData (ActiveStreamDecoderFilter* filter,
704736 Buffer::Instance& data, bool end_stream) {
737+ resetIdleTimer ();
738+
705739 // If a response is complete or a reset has been sent, filters do not care about further body
706740 // data. Just drop it.
707741 if (state_.local_complete_ ) {
@@ -750,6 +784,7 @@ void ConnectionManagerImpl::ActiveStream::addDecodedData(ActiveStreamDecoderFilt
750784}
751785
752786void ConnectionManagerImpl::ActiveStream::decodeTrailers (HeaderMapPtr&& trailers) {
787+ resetIdleTimer ();
753788 maybeEndDecode (true );
754789 request_trailers_ = std::move (trailers);
755790 decodeTrailers (nullptr , *request_trailers_);
@@ -846,6 +881,7 @@ void ConnectionManagerImpl::ActiveStream::sendLocalReply(
846881
847882void ConnectionManagerImpl::ActiveStream::encode100ContinueHeaders (
848883 ActiveStreamEncoderFilter* filter, HeaderMap& headers) {
884+ resetIdleTimer ();
849885 ASSERT (connection_manager_.config_ .proxy100Continue ());
850886 // Make sure commonContinue continues encode100ContinueHeaders.
851887 has_continue_headers_ = true ;
@@ -882,6 +918,8 @@ void ConnectionManagerImpl::ActiveStream::encode100ContinueHeaders(
882918
883919void ConnectionManagerImpl::ActiveStream::encodeHeaders (ActiveStreamEncoderFilter* filter,
884920 HeaderMap& headers, bool end_stream) {
921+ resetIdleTimer ();
922+
885923 std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix (filter, end_stream);
886924 std::list<ActiveStreamEncoderFilterPtr>::iterator continue_data_entry = encoder_filters_.end ();
887925
@@ -1019,6 +1057,7 @@ void ConnectionManagerImpl::ActiveStream::addEncodedData(ActiveStreamEncoderFilt
10191057
10201058void ConnectionManagerImpl::ActiveStream::encodeData (ActiveStreamEncoderFilter* filter,
10211059 Buffer::Instance& data, bool end_stream) {
1060+ resetIdleTimer ();
10221061 std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix (filter, end_stream);
10231062 for (; entry != encoder_filters_.end (); entry++) {
10241063 ASSERT (!(state_.filter_call_state_ & FilterCallState::EncodeData));
@@ -1042,6 +1081,7 @@ void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter*
10421081
10431082void ConnectionManagerImpl::ActiveStream::encodeTrailers (ActiveStreamEncoderFilter* filter,
10441083 HeaderMap& trailers) {
1084+ resetIdleTimer ();
10451085 std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix (filter, true );
10461086 for (; entry != encoder_filters_.end (); entry++) {
10471087 ASSERT (!(state_.filter_call_state_ & FilterCallState::EncodeTrailers));
0 commit comments