2929#include " src/core/ext/filters/fault_injection/service_config_parser.h"
3030#include " src/core/lib/channel/channel_stack.h"
3131#include " src/core/lib/channel/status_util.h"
32+ #include " src/core/lib/gprpp/capture.h"
3233#include " src/core/lib/promise/sleep.h"
3334#include " src/core/lib/promise/try_seq.h"
3435#include " src/core/lib/service_config/service_config_call_data.h"
@@ -62,6 +63,33 @@ inline bool UnderFraction(const uint32_t numerator,
6263 return random_number < numerator;
6364}
6465
66+ // Tracks an active faults lifetime.
67+ // Increments g_active_faults when created, and decrements it when destroyed.
68+ class FaultHandle {
69+ public:
70+ explicit FaultHandle (bool active) : active_(active) {
71+ if (active) {
72+ g_active_faults.fetch_add (1 , std::memory_order_relaxed);
73+ }
74+ }
75+ ~FaultHandle () {
76+ if (active_) {
77+ g_active_faults.fetch_sub (1 , std::memory_order_relaxed);
78+ }
79+ }
80+ FaultHandle (const FaultHandle&) = delete ;
81+ FaultHandle& operator =(const FaultHandle&) = delete ;
82+ FaultHandle (FaultHandle&& other) noexcept
83+ : active_(absl::exchange(other.active_, false )) {}
84+ FaultHandle& operator =(FaultHandle&& other) noexcept {
85+ std::swap (active_, other.active_ );
86+ return *this ;
87+ }
88+
89+ private:
90+ bool active_;
91+ };
92+
6593} // namespace
6694
6795class FaultInjectionFilter ::InjectionDecision {
@@ -73,15 +101,16 @@ class FaultInjectionFilter::InjectionDecision {
73101 abort_request_(abort_request) {}
74102
75103 std::string ToString () const ;
76- Timestamp DelayUntil () const ;
104+ Timestamp DelayUntil ();
77105 absl::Status MaybeAbort () const ;
78106
79107 private:
80- bool HaveActiveFaultsQuota (bool increment ) const ;
108+ bool HaveActiveFaultsQuota () const ;
81109
82110 uint32_t max_faults_;
83111 Duration delay_time_;
84112 absl::optional<absl::Status> abort_request_;
113+ FaultHandle active_fault_{false };
85114};
86115
87116absl::StatusOr<FaultInjectionFilter> FaultInjectionFilter::Create (
@@ -104,9 +133,12 @@ ArenaPromise<ServerMetadataHandle> FaultInjectionFilter::MakeCallPromise(
104133 gpr_log (GPR_INFO, " chand=%p: Fault injection triggered %s" , this ,
105134 decision.ToString ().c_str ());
106135 }
136+ auto delay = decision.DelayUntil ();
107137 return TrySeq (
108- Sleep (decision.DelayUntil ()),
109- [decision]() { return decision.MaybeAbort (); },
138+ Sleep (delay),
139+ Capture (
140+ [](InjectionDecision* decision) { return decision->MaybeAbort (); },
141+ std::move (decision)),
110142 next_promise_factory (std::move (call_args)));
111143}
112144
@@ -190,25 +222,21 @@ FaultInjectionFilter::MakeInjectionDecision(
190222 : absl::nullopt );
191223}
192224
193- bool FaultInjectionFilter::InjectionDecision::HaveActiveFaultsQuota (
194- bool increment) const {
195- if (g_active_faults.load (std::memory_order_acquire) >= max_faults_) {
196- return false ;
197- }
198- if (increment) g_active_faults.fetch_add (1 , std::memory_order_relaxed);
199- return true ;
225+ bool FaultInjectionFilter::InjectionDecision::HaveActiveFaultsQuota () const {
226+ return g_active_faults.load (std::memory_order_acquire) < max_faults_;
200227}
201228
202- Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil () const {
203- if (delay_time_ != Duration::Zero () && HaveActiveFaultsQuota (true )) {
229+ Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil () {
230+ if (delay_time_ != Duration::Zero () && HaveActiveFaultsQuota ()) {
231+ active_fault_ = FaultHandle{true };
204232 return ExecCtx::Get ()->Now () + delay_time_;
205233 }
206234 return Timestamp::InfPast ();
207235}
208236
209237absl::Status FaultInjectionFilter::InjectionDecision::MaybeAbort () const {
210238 if (abort_request_.has_value () &&
211- (delay_time_ != Duration::Zero () || HaveActiveFaultsQuota (false ))) {
239+ (delay_time_ != Duration::Zero () || HaveActiveFaultsQuota ())) {
212240 return abort_request_.value ();
213241 }
214242 return absl::OkStatus ();
0 commit comments