Skip to content

[xDS] fix TSAN failure in callback-based LRS impl#39806

Closed
markdroth wants to merge 1 commit intogrpc:masterfrom
markdroth:lrs_server_callback_fix
Closed

[xDS] fix TSAN failure in callback-based LRS impl#39806
markdroth wants to merge 1 commit intogrpc:masterfrom
markdroth:lrs_server_callback_fix

Conversation

@markdroth
Copy link
Copy Markdown
Member

This fixes a bug introduced in #39736 whereby the LRS server may call StartWrite() after cancellation has already called Finish(), thus leading to a TSAN failure.

This is arguably a deficiency in the callback API. The write should definitely fail, but it shouldn't cause a TSAN problem. But we can consider that as part of a separate effort.

Example stack trace of the TSAN failure:

WARNING: ThreadSanitizer: data race (pid=17)
  Read of size 1 at 0x72680002dd80 by thread T5:
    #0 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::Finish(grpc::Status) /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:742:18 (xds_cluster_end2end_test@poller=epoll1+0x32896b2)
    #1 grpc::ServerBidiReactor::Finish(grpc::Status) /proc/self/cwd/include/grpcpp/support/server_callback.h:414:13 (xds_cluster_end2end_test@poller=epoll1+0x328dbb4)
    #2 grpc::testing::LrsServiceImpl::Reactor::OnCancel() /proc/self/cwd/test/cpp/end2end/xds/xds_server.cc:494:3 (xds_cluster_end2end_test@poller=epoll1+0x32c6ec1)
    #3 grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0::operator()() const /proc/self/cwd/src/cpp/server/server_callback.cc:38:14 (xds_cluster_end2end_test@poller=epoll1+0x3b4755b)
    #4 void std::__invoke_impl(std::__invoke_other, grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x3b474d5)
    #5 std::__invoke_result::type std::__invoke(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x3b47485)
    #6 std::invoke_result::type std::invoke(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x3b47435)
    #7 void absl::lts_20250127::internal_any_invocable::InvokeR(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:132:3 (xds_cluster_end2end_test@poller=epoll1+0x3b473d5)
    #8 void absl::lts_20250127::internal_any_invocable::LocalInvoker(absl::lts_20250127::internal_any_invocable::TypeErasedState*) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:310:10 (xds_cluster_end2end_test@poller=epoll1+0x3b472f2)
    #9 absl::lts_20250127::internal_any_invocable::Impl::operator()() /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:868:1 (xds_cluster_end2end_test@poller=epoll1+0x4aa14ef)
    #10 grpc_event_engine::experimental::SelfDeletingClosure::Run() /proc/self/cwd/./src/core/lib/event_engine/common_closures.h:54:5 (xds_cluster_end2end_test@poller=epoll1+0x56a78ad)
    #11 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::Step() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:552:14 (xds_cluster_end2end_test@poller=epoll1+0x56a4663)
    #12 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::ThreadBody() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:515:10 (xds_cluster_end2end_test@poller=epoll1+0x56a3d47)
    #13 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::operator()(void*) const /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:277:17 (xds_cluster_end2end_test@poller=epoll1+0x56a5049)
    #14 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::__invoke(void*) /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:275:7 (xds_cluster_end2end_test@poller=epoll1+0x56a4fe9)
    #15 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::operator()(void*) const /proc/self/cwd/src/core/util/posix/thd.cc:145:11 (xds_cluster_end2end_test@poller=epoll1+0x58479f0)
    #16 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::__invoke(void*) /proc/self/cwd/src/core/util/posix/thd.cc:115:9 (xds_cluster_end2end_test@poller=epoll1+0x5847819)

  Previous write of size 1 at 0x72680002dd80 by thread T4:
    #0 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::Write(envoy::service::load_stats::v3::LoadStatsResponse const*, grpc::WriteOptions) /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:790:38 (xds_cluster_end2end_test@poller=epoll1+0x3289e85)
    #1 grpc::ServerBidiReactor::StartWrite(envoy::service::load_stats::v3::LoadStatsResponse const*, grpc::WriteOptions) /proc/self/cwd/include/grpcpp/support/server_callback.h:350:13 (xds_cluster_end2end_test@poller=epoll1+0x32ef139)
    #2 grpc::ServerBidiReactor::StartWrite(envoy::service::load_stats::v3::LoadStatsResponse const*) /proc/self/cwd/include/grpcpp/support/server_callback.h:328:5 (xds_cluster_end2end_test@poller=epoll1+0x32cb6df)
    #3 grpc::testing::LrsServiceImpl::Reactor::OnReadDone(bool) /proc/self/cwd/test/cpp/end2end/xds/xds_server.cc:461:5 (xds_cluster_end2end_test@poller=epoll1+0x32c6535)
    #4 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::SetupReactor(grpc::ServerBidiReactor*)::'lambda0'(bool)::operator()(bool) const /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:839:22 (xds_cluster_end2end_test@poller=epoll1+0x328f196)
    #5 std::_Function_handler::ServerCallbackReaderWriterImpl::SetupReactor(grpc::ServerBidiReactor*)::'lambda0'(bool)>::_M_invoke(std::_Any_data const&, bool&&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:300:2 (xds_cluster_end2end_test@poller=epoll1+0x328ef1a)
    #6 std::function::operator()(bool) const /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:688:14 (xds_cluster_end2end_test@poller=epoll1+0x3279bb8)
    #7 grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'()::operator()() const /proc/self/cwd/include/grpcpp/support/callback_common.h:230:11 (xds_cluster_end2end_test@poller=epoll1+0x3279ae5)
    #8 void std::__invoke_impl(std::__invoke_other, grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x3279a65)
    #9 std::__invoke_result::type std::__invoke(grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x3279a15)
    #10 std::invoke_result::type std::invoke(grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x32799c5)
    #11 void absl::lts_20250127::functional_internal::InvokeObject(absl::lts_20250127::functional_internal::VoidPtr) /proc/self/cwd/external/com_google_absl/absl/functional/internal/function_ref.h:78:7 (xds_cluster_end2end_test@poller=epoll1+0x327996d)
    #12 absl::lts_20250127::FunctionRef::operator()() const /proc/self/cwd/external/com_google_absl/absl/functional/function_ref.h:132:12 (xds_cluster_end2end_test@poller=epoll1+0x3bd0221)
    #13 void grpc::GlobalCallbackHook::CatchingCallback&>(absl::lts_20250127::FunctionRef&) /proc/self/cwd/include/grpcpp/support/global_callback_hook.h:41:5 (xds_cluster_end2end_test@poller=epoll1+0x3bd01a1)
    #14 grpc::DefaultGlobalCallbackHook::RunCallback(grpc_call*, absl::lts_20250127::FunctionRef) /proc/self/cwd/include/grpcpp/support/global_callback_hook.h:50:5 (xds_cluster_end2end_test@poller=epoll1+0x3bd011d)
    #15 grpc::internal::CallbackWithSuccessTag::Run(bool) /proc/self/cwd/include/grpcpp/support/callback_common.h:227:32 (xds_cluster_end2end_test@poller=epoll1+0x3279715)
    #16 grpc::internal::CallbackWithSuccessTag::StaticRun(grpc_completion_queue_functor*, int) /proc/self/cwd/include/grpcpp/support/callback_common.h:212:47 (xds_cluster_end2end_test@poller=epoll1+0x3279329)
    #17 cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0::operator()() const /proc/self/cwd/src/core/lib/surface/completion_queue.cc:913:9 (xds_cluster_end2end_test@poller=epoll1+0x545e2e3)
    #18 void std::__invoke_impl(std::__invoke_other, cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x545e205)
    #19 std::__invoke_result::type std::__invoke(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x545e1b5)
    #20 std::invoke_result::type std::invoke(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x545e165)
    #21 void absl::lts_20250127::internal_any_invocable::InvokeR(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:132:3 (xds_cluster_end2end_test@poller=epoll1+0x545e115)
    #22 void absl::lts_20250127::internal_any_invocable::RemoteInvoker(absl::lts_20250127::internal_any_invocable::TypeErasedState*) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:368:10 (xds_cluster_end2end_test@poller=epoll1+0x545df6d)
    #23 absl::lts_20250127::internal_any_invocable::Impl::operator()() /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:868:1 (xds_cluster_end2end_test@poller=epoll1+0x4aa14ef)
    #24 grpc_event_engine::experimental::SelfDeletingClosure::Run() /proc/self/cwd/./src/core/lib/event_engine/common_closures.h:54:5 (xds_cluster_end2end_test@poller=epoll1+0x56a78ad)
    #25 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::Step() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:552:14 (xds_cluster_end2end_test@poller=epoll1+0x56a4663)
    #26 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::ThreadBody() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:515:10 (xds_cluster_end2end_test@poller=epoll1+0x56a3d47)
    #27 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::operator()(void*) const /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:277:17 (xds_cluster_end2end_test@poller=epoll1+0x56a5049)
    #28 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::__invoke(void*) /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:275:7 (xds_cluster_end2end_test@poller=epoll1+0x56a4fe9)
    #29 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::operator()(void*) const /proc/self/cwd/src/core/util/posix/thd.cc:145:11 (xds_cluster_end2end_test@poller=epoll1+0x58479f0)
    #30 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::__invoke(void*) /proc/self/cwd/src/core/util/posix/thd.cc:115:9 (xds_cluster_end2end_test@poller=epoll1+0x5847819)

@copybara-service copybara-service bot closed this in ff2b097 Jun 9, 2025
@markdroth markdroth deleted the lrs_server_callback_fix branch June 9, 2025 22:12
anniefrchz pushed a commit to anniefrchz/grpc that referenced this pull request Jun 25, 2025
This fixes a bug introduced in grpc#39736 whereby the LRS server may call `StartWrite()` after cancellation has already called `Finish()`, thus leading to a TSAN failure.

This is arguably a deficiency in the callback API.  The write should definitely fail, but it shouldn't cause a TSAN problem.  But we can consider that as part of a separate effort.

Example stack trace of the TSAN failure:

```
WARNING: ThreadSanitizer: data race (pid=17)
  Read of size 1 at 0x72680002dd80 by thread T5:
    #0 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::Finish(grpc::Status) /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:742:18 (xds_cluster_end2end_test@poller=epoll1+0x32896b2)
    #1 grpc::ServerBidiReactor::Finish(grpc::Status) /proc/self/cwd/include/grpcpp/support/server_callback.h:414:13 (xds_cluster_end2end_test@poller=epoll1+0x328dbb4)
    #2 grpc::testing::LrsServiceImpl::Reactor::OnCancel() /proc/self/cwd/test/cpp/end2end/xds/xds_server.cc:494:3 (xds_cluster_end2end_test@poller=epoll1+0x32c6ec1)
    #3 grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0::operator()() const /proc/self/cwd/src/cpp/server/server_callback.cc:38:14 (xds_cluster_end2end_test@poller=epoll1+0x3b4755b)
    #4 void std::__invoke_impl(std::__invoke_other, grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x3b474d5)
    #5 std::__invoke_result::type std::__invoke(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x3b47485)
    #6 std::invoke_result::type std::invoke(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x3b47435)
    #7 void absl::lts_20250127::internal_any_invocable::InvokeR(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:132:3 (xds_cluster_end2end_test@poller=epoll1+0x3b473d5)
    #8 void absl::lts_20250127::internal_any_invocable::LocalInvoker(absl::lts_20250127::internal_any_invocable::TypeErasedState*) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:310:10 (xds_cluster_end2end_test@poller=epoll1+0x3b472f2)
    #9 absl::lts_20250127::internal_any_invocable::Impl::operator()() /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:868:1 (xds_cluster_end2end_test@poller=epoll1+0x4aa14ef)
    #10 grpc_event_engine::experimental::SelfDeletingClosure::Run() /proc/self/cwd/./src/core/lib/event_engine/common_closures.h:54:5 (xds_cluster_end2end_test@poller=epoll1+0x56a78ad)
    #11 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::Step() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:552:14 (xds_cluster_end2end_test@poller=epoll1+0x56a4663)
    #12 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::ThreadBody() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:515:10 (xds_cluster_end2end_test@poller=epoll1+0x56a3d47)
    #13 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::operator()(void*) const /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:277:17 (xds_cluster_end2end_test@poller=epoll1+0x56a5049)
    #14 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::__invoke(void*) /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:275:7 (xds_cluster_end2end_test@poller=epoll1+0x56a4fe9)
    #15 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::operator()(void*) const /proc/self/cwd/src/core/util/posix/thd.cc:145:11 (xds_cluster_end2end_test@poller=epoll1+0x58479f0)
    #16 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::__invoke(void*) /proc/self/cwd/src/core/util/posix/thd.cc:115:9 (xds_cluster_end2end_test@poller=epoll1+0x5847819)

  Previous write of size 1 at 0x72680002dd80 by thread T4:
    #0 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::Write(envoy::service::load_stats::v3::LoadStatsResponse const*, grpc::WriteOptions) /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:790:38 (xds_cluster_end2end_test@poller=epoll1+0x3289e85)
    #1 grpc::ServerBidiReactor::StartWrite(envoy::service::load_stats::v3::LoadStatsResponse const*, grpc::WriteOptions) /proc/self/cwd/include/grpcpp/support/server_callback.h:350:13 (xds_cluster_end2end_test@poller=epoll1+0x32ef139)
    #2 grpc::ServerBidiReactor::StartWrite(envoy::service::load_stats::v3::LoadStatsResponse const*) /proc/self/cwd/include/grpcpp/support/server_callback.h:328:5 (xds_cluster_end2end_test@poller=epoll1+0x32cb6df)
    #3 grpc::testing::LrsServiceImpl::Reactor::OnReadDone(bool) /proc/self/cwd/test/cpp/end2end/xds/xds_server.cc:461:5 (xds_cluster_end2end_test@poller=epoll1+0x32c6535)
    #4 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::SetupReactor(grpc::ServerBidiReactor*)::'lambda0'(bool)::operator()(bool) const /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:839:22 (xds_cluster_end2end_test@poller=epoll1+0x328f196)
    #5 std::_Function_handler::ServerCallbackReaderWriterImpl::SetupReactor(grpc::ServerBidiReactor*)::'lambda0'(bool)>::_M_invoke(std::_Any_data const&, bool&&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:300:2 (xds_cluster_end2end_test@poller=epoll1+0x328ef1a)
    #6 std::function::operator()(bool) const /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:688:14 (xds_cluster_end2end_test@poller=epoll1+0x3279bb8)
    #7 grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'()::operator()() const /proc/self/cwd/include/grpcpp/support/callback_common.h:230:11 (xds_cluster_end2end_test@poller=epoll1+0x3279ae5)
    #8 void std::__invoke_impl(std::__invoke_other, grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x3279a65)
    #9 std::__invoke_result::type std::__invoke(grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x3279a15)
    #10 std::invoke_result::type std::invoke(grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x32799c5)
    #11 void absl::lts_20250127::functional_internal::InvokeObject(absl::lts_20250127::functional_internal::VoidPtr) /proc/self/cwd/external/com_google_absl/absl/functional/internal/function_ref.h:78:7 (xds_cluster_end2end_test@poller=epoll1+0x327996d)
    #12 absl::lts_20250127::FunctionRef::operator()() const /proc/self/cwd/external/com_google_absl/absl/functional/function_ref.h:132:12 (xds_cluster_end2end_test@poller=epoll1+0x3bd0221)
    #13 void grpc::GlobalCallbackHook::CatchingCallback&>(absl::lts_20250127::FunctionRef&) /proc/self/cwd/include/grpcpp/support/global_callback_hook.h:41:5 (xds_cluster_end2end_test@poller=epoll1+0x3bd01a1)
    #14 grpc::DefaultGlobalCallbackHook::RunCallback(grpc_call*, absl::lts_20250127::FunctionRef) /proc/self/cwd/include/grpcpp/support/global_callback_hook.h:50:5 (xds_cluster_end2end_test@poller=epoll1+0x3bd011d)
    #15 grpc::internal::CallbackWithSuccessTag::Run(bool) /proc/self/cwd/include/grpcpp/support/callback_common.h:227:32 (xds_cluster_end2end_test@poller=epoll1+0x3279715)
    #16 grpc::internal::CallbackWithSuccessTag::StaticRun(grpc_completion_queue_functor*, int) /proc/self/cwd/include/grpcpp/support/callback_common.h:212:47 (xds_cluster_end2end_test@poller=epoll1+0x3279329)
    #17 cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0::operator()() const /proc/self/cwd/src/core/lib/surface/completion_queue.cc:913:9 (xds_cluster_end2end_test@poller=epoll1+0x545e2e3)
    #18 void std::__invoke_impl(std::__invoke_other, cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x545e205)
    #19 std::__invoke_result::type std::__invoke(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x545e1b5)
    #20 std::invoke_result::type std::invoke(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x545e165)
    #21 void absl::lts_20250127::internal_any_invocable::InvokeR(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:132:3 (xds_cluster_end2end_test@poller=epoll1+0x545e115)
    #22 void absl::lts_20250127::internal_any_invocable::RemoteInvoker(absl::lts_20250127::internal_any_invocable::TypeErasedState*) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:368:10 (xds_cluster_end2end_test@poller=epoll1+0x545df6d)
    #23 absl::lts_20250127::internal_any_invocable::Impl::operator()() /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:868:1 (xds_cluster_end2end_test@poller=epoll1+0x4aa14ef)
    #24 grpc_event_engine::experimental::SelfDeletingClosure::Run() /proc/self/cwd/./src/core/lib/event_engine/common_closures.h:54:5 (xds_cluster_end2end_test@poller=epoll1+0x56a78ad)
    #25 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::Step() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:552:14 (xds_cluster_end2end_test@poller=epoll1+0x56a4663)
    #26 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::ThreadBody() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:515:10 (xds_cluster_end2end_test@poller=epoll1+0x56a3d47)
    #27 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::operator()(void*) const /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:277:17 (xds_cluster_end2end_test@poller=epoll1+0x56a5049)
    #28 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::__invoke(void*) /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:275:7 (xds_cluster_end2end_test@poller=epoll1+0x56a4fe9)
    #29 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::operator()(void*) const /proc/self/cwd/src/core/util/posix/thd.cc:145:11 (xds_cluster_end2end_test@poller=epoll1+0x58479f0)
    #30 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::__invoke(void*) /proc/self/cwd/src/core/util/posix/thd.cc:115:9 (xds_cluster_end2end_test@poller=epoll1+0x5847819)
```

Closes grpc#39806

COPYBARA_INTEGRATE_REVIEW=grpc#39806 from markdroth:lrs_server_callback_fix e10ed85
PiperOrigin-RevId: 769289887
paulosjca pushed a commit to paulosjca/grpc that referenced this pull request Aug 23, 2025
This fixes a bug introduced in grpc#39736 whereby the LRS server may call `StartWrite()` after cancellation has already called `Finish()`, thus leading to a TSAN failure.

This is arguably a deficiency in the callback API.  The write should definitely fail, but it shouldn't cause a TSAN problem.  But we can consider that as part of a separate effort.

Example stack trace of the TSAN failure:

```
WARNING: ThreadSanitizer: data race (pid=17)
  Read of size 1 at 0x72680002dd80 by thread T5:
    #0 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::Finish(grpc::Status) /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:742:18 (xds_cluster_end2end_test@poller=epoll1+0x32896b2)
    #1 grpc::ServerBidiReactor::Finish(grpc::Status) /proc/self/cwd/include/grpcpp/support/server_callback.h:414:13 (xds_cluster_end2end_test@poller=epoll1+0x328dbb4)
    #2 grpc::testing::LrsServiceImpl::Reactor::OnCancel() /proc/self/cwd/test/cpp/end2end/xds/xds_server.cc:494:3 (xds_cluster_end2end_test@poller=epoll1+0x32c6ec1)
    #3 grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0::operator()() const /proc/self/cwd/src/cpp/server/server_callback.cc:38:14 (xds_cluster_end2end_test@poller=epoll1+0x3b4755b)
    #4 void std::__invoke_impl(std::__invoke_other, grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x3b474d5)
    #5 std::__invoke_result::type std::__invoke(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x3b47485)
    #6 std::invoke_result::type std::invoke(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x3b47435)
    #7 void absl::lts_20250127::internal_any_invocable::InvokeR(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:132:3 (xds_cluster_end2end_test@poller=epoll1+0x3b473d5)
    grpc#8 void absl::lts_20250127::internal_any_invocable::LocalInvoker(absl::lts_20250127::internal_any_invocable::TypeErasedState*) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:310:10 (xds_cluster_end2end_test@poller=epoll1+0x3b472f2)
    grpc#9 absl::lts_20250127::internal_any_invocable::Impl::operator()() /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:868:1 (xds_cluster_end2end_test@poller=epoll1+0x4aa14ef)
    grpc#10 grpc_event_engine::experimental::SelfDeletingClosure::Run() /proc/self/cwd/./src/core/lib/event_engine/common_closures.h:54:5 (xds_cluster_end2end_test@poller=epoll1+0x56a78ad)
    grpc#11 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::Step() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:552:14 (xds_cluster_end2end_test@poller=epoll1+0x56a4663)
    grpc#12 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::ThreadBody() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:515:10 (xds_cluster_end2end_test@poller=epoll1+0x56a3d47)
    grpc#13 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::operator()(void*) const /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:277:17 (xds_cluster_end2end_test@poller=epoll1+0x56a5049)
    grpc#14 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::__invoke(void*) /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:275:7 (xds_cluster_end2end_test@poller=epoll1+0x56a4fe9)
    grpc#15 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::operator()(void*) const /proc/self/cwd/src/core/util/posix/thd.cc:145:11 (xds_cluster_end2end_test@poller=epoll1+0x58479f0)
    grpc#16 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::__invoke(void*) /proc/self/cwd/src/core/util/posix/thd.cc:115:9 (xds_cluster_end2end_test@poller=epoll1+0x5847819)

  Previous write of size 1 at 0x72680002dd80 by thread T4:
    #0 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::Write(envoy::service::load_stats::v3::LoadStatsResponse const*, grpc::WriteOptions) /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:790:38 (xds_cluster_end2end_test@poller=epoll1+0x3289e85)
    #1 grpc::ServerBidiReactor::StartWrite(envoy::service::load_stats::v3::LoadStatsResponse const*, grpc::WriteOptions) /proc/self/cwd/include/grpcpp/support/server_callback.h:350:13 (xds_cluster_end2end_test@poller=epoll1+0x32ef139)
    #2 grpc::ServerBidiReactor::StartWrite(envoy::service::load_stats::v3::LoadStatsResponse const*) /proc/self/cwd/include/grpcpp/support/server_callback.h:328:5 (xds_cluster_end2end_test@poller=epoll1+0x32cb6df)
    #3 grpc::testing::LrsServiceImpl::Reactor::OnReadDone(bool) /proc/self/cwd/test/cpp/end2end/xds/xds_server.cc:461:5 (xds_cluster_end2end_test@poller=epoll1+0x32c6535)
    #4 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::SetupReactor(grpc::ServerBidiReactor*)::'lambda0'(bool)::operator()(bool) const /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:839:22 (xds_cluster_end2end_test@poller=epoll1+0x328f196)
    #5 std::_Function_handler::ServerCallbackReaderWriterImpl::SetupReactor(grpc::ServerBidiReactor*)::'lambda0'(bool)>::_M_invoke(std::_Any_data const&, bool&&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:300:2 (xds_cluster_end2end_test@poller=epoll1+0x328ef1a)
    #6 std::function::operator()(bool) const /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:688:14 (xds_cluster_end2end_test@poller=epoll1+0x3279bb8)
    #7 grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'()::operator()() const /proc/self/cwd/include/grpcpp/support/callback_common.h:230:11 (xds_cluster_end2end_test@poller=epoll1+0x3279ae5)
    grpc#8 void std::__invoke_impl(std::__invoke_other, grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x3279a65)
    grpc#9 std::__invoke_result::type std::__invoke(grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x3279a15)
    grpc#10 std::invoke_result::type std::invoke(grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x32799c5)
    grpc#11 void absl::lts_20250127::functional_internal::InvokeObject(absl::lts_20250127::functional_internal::VoidPtr) /proc/self/cwd/external/com_google_absl/absl/functional/internal/function_ref.h:78:7 (xds_cluster_end2end_test@poller=epoll1+0x327996d)
    grpc#12 absl::lts_20250127::FunctionRef::operator()() const /proc/self/cwd/external/com_google_absl/absl/functional/function_ref.h:132:12 (xds_cluster_end2end_test@poller=epoll1+0x3bd0221)
    grpc#13 void grpc::GlobalCallbackHook::CatchingCallback&>(absl::lts_20250127::FunctionRef&) /proc/self/cwd/include/grpcpp/support/global_callback_hook.h:41:5 (xds_cluster_end2end_test@poller=epoll1+0x3bd01a1)
    grpc#14 grpc::DefaultGlobalCallbackHook::RunCallback(grpc_call*, absl::lts_20250127::FunctionRef) /proc/self/cwd/include/grpcpp/support/global_callback_hook.h:50:5 (xds_cluster_end2end_test@poller=epoll1+0x3bd011d)
    grpc#15 grpc::internal::CallbackWithSuccessTag::Run(bool) /proc/self/cwd/include/grpcpp/support/callback_common.h:227:32 (xds_cluster_end2end_test@poller=epoll1+0x3279715)
    grpc#16 grpc::internal::CallbackWithSuccessTag::StaticRun(grpc_completion_queue_functor*, int) /proc/self/cwd/include/grpcpp/support/callback_common.h:212:47 (xds_cluster_end2end_test@poller=epoll1+0x3279329)
    grpc#17 cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0::operator()() const /proc/self/cwd/src/core/lib/surface/completion_queue.cc:913:9 (xds_cluster_end2end_test@poller=epoll1+0x545e2e3)
    grpc#18 void std::__invoke_impl(std::__invoke_other, cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x545e205)
    grpc#19 std::__invoke_result::type std::__invoke(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x545e1b5)
    grpc#20 std::invoke_result::type std::invoke(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x545e165)
    grpc#21 void absl::lts_20250127::internal_any_invocable::InvokeR(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:132:3 (xds_cluster_end2end_test@poller=epoll1+0x545e115)
    grpc#22 void absl::lts_20250127::internal_any_invocable::RemoteInvoker(absl::lts_20250127::internal_any_invocable::TypeErasedState*) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:368:10 (xds_cluster_end2end_test@poller=epoll1+0x545df6d)
    grpc#23 absl::lts_20250127::internal_any_invocable::Impl::operator()() /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:868:1 (xds_cluster_end2end_test@poller=epoll1+0x4aa14ef)
    grpc#24 grpc_event_engine::experimental::SelfDeletingClosure::Run() /proc/self/cwd/./src/core/lib/event_engine/common_closures.h:54:5 (xds_cluster_end2end_test@poller=epoll1+0x56a78ad)
    grpc#25 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::Step() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:552:14 (xds_cluster_end2end_test@poller=epoll1+0x56a4663)
    grpc#26 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::ThreadBody() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:515:10 (xds_cluster_end2end_test@poller=epoll1+0x56a3d47)
    grpc#27 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::operator()(void*) const /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:277:17 (xds_cluster_end2end_test@poller=epoll1+0x56a5049)
    grpc#28 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::__invoke(void*) /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:275:7 (xds_cluster_end2end_test@poller=epoll1+0x56a4fe9)
    grpc#29 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::operator()(void*) const /proc/self/cwd/src/core/util/posix/thd.cc:145:11 (xds_cluster_end2end_test@poller=epoll1+0x58479f0)
    grpc#30 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::__invoke(void*) /proc/self/cwd/src/core/util/posix/thd.cc:115:9 (xds_cluster_end2end_test@poller=epoll1+0x5847819)
```

Closes grpc#39806

COPYBARA_INTEGRATE_REVIEW=grpc#39806 from markdroth:lrs_server_callback_fix e10ed85
PiperOrigin-RevId: 769289887
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants