@@ -65,7 +65,7 @@ void BatchLogRecordProcessor::OnEmit(std::unique_ptr<Recordable> &&record) noexc
6565 {
6666 // signal the worker thread
6767 synchronization_data_->is_force_wakeup_background_worker .store (true , std::memory_order_release);
68- synchronization_data_->cv .notify_one ();
68+ synchronization_data_->cv .notify_all ();
6969 }
7070}
7171
@@ -79,21 +79,25 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
7979 // Now wait for the worker thread to signal back from the Export method
8080 std::unique_lock<std::mutex> lk_cv (synchronization_data_->force_flush_cv_m );
8181
82- synchronization_data_->is_force_flush_pending .store (true , std::memory_order_release);
82+ std::uint64_t current_sequence =
83+ synchronization_data_->force_flush_pending_sequence .fetch_add (1 , std::memory_order_release) +
84+ 1 ;
8385 synchronization_data_->force_flush_timeout_us .store (timeout.count (), std::memory_order_release);
84- auto break_condition = [this ]() {
86+ auto break_condition = [this , current_sequence ]() {
8587 if (synchronization_data_->is_shutdown .load () == true )
8688 {
8789 return true ;
8890 }
8991
9092 // Wake up the worker thread once.
91- if (synchronization_data_->is_force_flush_pending .load (std::memory_order_acquire))
93+ if (synchronization_data_->force_flush_pending_sequence .load (std::memory_order_acquire) >
94+ synchronization_data_->force_flush_notified_sequence .load (std::memory_order_acquire))
9295 {
93- synchronization_data_->cv .notify_one ();
96+ synchronization_data_->cv .notify_all ();
9497 }
9598
96- return synchronization_data_->is_force_flush_notified .load (std::memory_order_acquire);
99+ return synchronization_data_->force_flush_notified_sequence .load (std::memory_order_acquire) >=
100+ current_sequence;
97101 };
98102
99103 // Fix timeout to meet requirement of wait_for
@@ -110,35 +114,22 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
110114 bool result = false ;
111115 while (!result && timeout_steady > std::chrono::steady_clock::duration::zero ())
112116 {
113- // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
114- // between is_force_flush_pending.load () and force_flush_cv.wait(). We must not wait
115- // for ever
117+ // When force_flush_notified_sequence.compare_exchange_strong(...) and
118+ // force_flush_cv.notify_all () is called between force_flush_pending_sequence.load(...) and
119+ // force_flush_cv.wait(). We must not wait for ever
116120 std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now ();
117- result = synchronization_data_->force_flush_cv .wait_for (lk_cv, scheduled_delay_millis_,
118- break_condition);
119- timeout_steady -= std::chrono::steady_clock::now () - start_timepoint;
120- }
121+ std::chrono::microseconds wait_timeout = scheduled_delay_millis_;
121122
122- // If it's already signaled, we must wait util notified.
123- // We use a spin lock here
124- if (false ==
125- synchronization_data_->is_force_flush_pending .exchange (false , std::memory_order_acq_rel))
126- {
127- for (int retry_waiting_times = 0 ;
128- false == synchronization_data_->is_force_flush_notified .load (std::memory_order_acquire);
129- ++retry_waiting_times)
123+ if (wait_timeout > timeout_steady)
130124 {
131- opentelemetry::common::SpinLockMutex::fast_yield ();
132- if ((retry_waiting_times & 127 ) == 127 )
133- {
134- std::this_thread::yield ();
135- }
125+ wait_timeout = std::chrono::duration_cast<std::chrono::microseconds>(timeout_steady);
136126 }
127+ result = synchronization_data_->force_flush_cv .wait_for (lk_cv, wait_timeout, break_condition);
128+ timeout_steady -= std::chrono::steady_clock::now () - start_timepoint;
137129 }
138130
139- synchronization_data_->is_force_flush_notified .store (false , std::memory_order_release);
140-
141- return result;
131+ return synchronization_data_->force_flush_notified_sequence .load (std::memory_order_acquire) >=
132+ current_sequence;
142133}
143134
144135void BatchLogRecordProcessor::DoBackgroundWork ()
@@ -182,8 +173,8 @@ void BatchLogRecordProcessor::Export()
182173 {
183174 std::vector<std::unique_ptr<Recordable>> records_arr;
184175 size_t num_records_to_export;
185- bool notify_force_flush =
186- synchronization_data_->is_force_flush_pending . exchange ( false , std::memory_order_acq_rel );
176+ std:: uint64_t notify_force_flush =
177+ synchronization_data_->force_flush_pending_sequence . load ( std::memory_order_acquire );
187178 if (notify_force_flush)
188179 {
189180 num_records_to_export = buffer_.size ();
@@ -217,7 +208,7 @@ void BatchLogRecordProcessor::Export()
217208}
218209
219210void BatchLogRecordProcessor::NotifyCompletion (
220- bool notify_force_flush,
211+ std:: uint64_t notify_force_flush,
221212 const std::unique_ptr<LogRecordExporter> &exporter,
222213 const std::shared_ptr<SynchronizationData> &synchronization_data)
223214{
@@ -226,7 +217,8 @@ void BatchLogRecordProcessor::NotifyCompletion(
226217 return ;
227218 }
228219
229- if (notify_force_flush)
220+ if (notify_force_flush >
221+ synchronization_data->force_flush_notified_sequence .load (std::memory_order_acquire))
230222 {
231223 if (exporter)
232224 {
@@ -236,8 +228,15 @@ void BatchLogRecordProcessor::NotifyCompletion(
236228 std::chrono::microseconds::zero ());
237229 exporter->ForceFlush (timeout);
238230 }
239- synchronization_data->is_force_flush_notified .store (true , std::memory_order_release);
240- synchronization_data->force_flush_cv .notify_one ();
231+
232+ std::uint64_t notified_sequence =
233+ synchronization_data->force_flush_notified_sequence .load (std::memory_order_acquire);
234+ while (notify_force_flush > notified_sequence)
235+ {
236+ synchronization_data->force_flush_notified_sequence .compare_exchange_strong (
237+ notified_sequence, notify_force_flush, std::memory_order_acq_rel);
238+ synchronization_data->force_flush_cv .notify_all ();
239+ }
241240 }
242241}
243242
@@ -246,7 +245,8 @@ void BatchLogRecordProcessor::DrainQueue()
246245 while (true )
247246 {
248247 if (buffer_.empty () &&
249- false == synchronization_data_->is_force_flush_pending .load (std::memory_order_acquire))
248+ synchronization_data_->force_flush_pending_sequence .load (std::memory_order_acquire) <=
249+ synchronization_data_->force_flush_notified_sequence .load (std::memory_order_acquire))
250250 {
251251 break ;
252252 }
@@ -285,7 +285,7 @@ bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexce
285285 if (worker_thread_.joinable ())
286286 {
287287 synchronization_data_->is_force_wakeup_background_worker .store (true , std::memory_order_release);
288- synchronization_data_->cv .notify_one ();
288+ synchronization_data_->cv .notify_all ();
289289 worker_thread_.join ();
290290 }
291291
0 commit comments