Skip to content

Commit a90f88f

Browse files
committed
Merge remote-tracking branch 'remotes/origin/main' into ImproveCompression
2 parents d418b2d + 2b9bff9 commit a90f88f

File tree

4 files changed

+118
-16
lines changed

4 files changed

+118
-16
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ Increment the:
1515

1616
## [Unreleased]
1717

18+
* [SDK] Do not frequently create and destroy http client threads
19+
[#3198](https://github.com/open-telemetry/opentelemetry-cpp/pull/3198)
20+
1821
## [1.18 2024-11-25]
1922

2023
* [EXPORTER] Fix crash in ElasticsearchLogRecordExporter

ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -322,25 +322,16 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
322322

323323
inline CURLM *GetMultiHandle() noexcept { return multi_handle_; }
324324

325-
void MaybeSpawnBackgroundThread();
325+
// return true if create background thread, false is already exist background thread
326+
bool MaybeSpawnBackgroundThread();
326327

327328
void ScheduleAddSession(uint64_t session_id);
328329
void ScheduleAbortSession(uint64_t session_id);
329330
void ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource &&resource);
330331

331-
void WaitBackgroundThreadExit()
332-
{
333-
std::unique_ptr<std::thread> background_thread;
334-
{
335-
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
336-
background_thread.swap(background_thread_);
337-
}
332+
void SetBackgroundWaitFor(std::chrono::milliseconds ms);
338333

339-
if (background_thread && background_thread->joinable())
340-
{
341-
background_thread->join();
342-
}
343-
}
334+
void WaitBackgroundThreadExit();
344335

345336
private:
346337
void wakeupBackgroundThread();
@@ -366,6 +357,9 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
366357
std::unique_ptr<std::thread> background_thread_;
367358
std::chrono::milliseconds scheduled_delay_milliseconds_;
368359

360+
std::chrono::milliseconds background_thread_wait_for_;
361+
std::atomic<bool> is_shutdown_{false};
362+
369363
nostd::shared_ptr<HttpCurlGlobalInitializer> curl_global_initializer_;
370364
};
371365

ext/src/http/client/curl/http_client_curl.cc

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,11 +269,13 @@ HttpClient::HttpClient()
269269
next_session_id_{0},
270270
max_sessions_per_connection_{8},
271271
scheduled_delay_milliseconds_{std::chrono::milliseconds(256)},
272+
background_thread_wait_for_{std::chrono::minutes{1}},
272273
curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance())
273274
{}
274275

275276
HttpClient::~HttpClient()
276277
{
278+
is_shutdown_.store(true, std::memory_order_release);
277279
while (true)
278280
{
279281
std::unique_ptr<std::thread> background_thread;
@@ -291,6 +293,7 @@ HttpClient::~HttpClient()
291293
}
292294
if (background_thread->joinable())
293295
{
296+
wakeupBackgroundThread(); // if delay quit, wake up first
294297
background_thread->join();
295298
}
296299
}
@@ -415,29 +418,33 @@ void HttpClient::CleanupSession(uint64_t session_id)
415418
}
416419
}
417420

418-
void HttpClient::MaybeSpawnBackgroundThread()
421+
bool HttpClient::MaybeSpawnBackgroundThread()
419422
{
420423
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
421424
if (background_thread_)
422425
{
423-
return;
426+
return false;
424427
}
425428

426429
background_thread_.reset(new std::thread(
427430
[](HttpClient *self) {
428431
int still_running = 1;
432+
std::chrono::system_clock::time_point last_free_job_timepoint =
433+
std::chrono::system_clock::now();
434+
bool need_wait_more = false;
429435
while (true)
430436
{
431437
CURLMsg *msg;
432438
int queued;
433439
CURLMcode mc = curl_multi_perform(self->multi_handle_, &still_running);
440+
434441
// According to https://curl.se/libcurl/c/curl_multi_perform.html, when mc is not OK, we
435442
// can not curl_multi_perform it again
436443
if (mc != CURLM_OK)
437444
{
438445
self->resetMultiHandle();
439446
}
440-
else if (still_running)
447+
else if (still_running || need_wait_more)
441448
{
442449
// curl_multi_poll is added from libcurl 7.66.0, before 7.68.0, we can only wait util
443450
// timeout to do the rest jobs
@@ -496,6 +503,32 @@ void HttpClient::MaybeSpawnBackgroundThread()
496503
still_running = 1;
497504
}
498505

506+
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
507+
if (still_running > 0)
508+
{
509+
last_free_job_timepoint = now;
510+
need_wait_more = false;
511+
continue;
512+
}
513+
514+
std::chrono::milliseconds wait_for = std::chrono::milliseconds::zero();
515+
516+
#if LIBCURL_VERSION_NUM >= 0x074400
517+
// only available with curl_multi_poll+curl_multi_wakeup, because curl_multi_wait would
518+
// cause CPU busy, curl_multi_wait+sleep could not wakeup quickly
519+
wait_for = self->background_thread_wait_for_;
520+
#endif
521+
if (self->is_shutdown_.load(std::memory_order_acquire))
522+
{
523+
wait_for = std::chrono::milliseconds::zero();
524+
}
525+
526+
if (now - last_free_job_timepoint < wait_for)
527+
{
528+
need_wait_more = true;
529+
continue;
530+
}
531+
499532
if (still_running == 0)
500533
{
501534
std::lock_guard<std::mutex> lock_guard{self->background_thread_m_};
@@ -534,6 +567,7 @@ void HttpClient::MaybeSpawnBackgroundThread()
534567
}
535568
},
536569
this));
570+
return true;
537571
}
538572

539573
void HttpClient::ScheduleAddSession(uint64_t session_id)
@@ -582,6 +616,28 @@ void HttpClient::ScheduleRemoveSession(uint64_t session_id, HttpCurlEasyResource
582616
wakeupBackgroundThread();
583617
}
584618

619+
void HttpClient::SetBackgroundWaitFor(std::chrono::milliseconds ms)
620+
{
621+
background_thread_wait_for_ = ms;
622+
}
623+
624+
void HttpClient::WaitBackgroundThreadExit()
625+
{
626+
is_shutdown_.store(true, std::memory_order_release);
627+
std::unique_ptr<std::thread> background_thread;
628+
{
629+
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
630+
background_thread.swap(background_thread_);
631+
}
632+
633+
if (background_thread && background_thread->joinable())
634+
{
635+
wakeupBackgroundThread();
636+
background_thread->join();
637+
}
638+
is_shutdown_.store(false, std::memory_order_release);
639+
}
640+
585641
void HttpClient::wakeupBackgroundThread()
586642
{
587643
// Before libcurl 7.68.0, we can only wait for timeout and do the rest jobs

ext/test/http/curl_http_test.cc

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4+
#include <curl/curlver.h>
45
#include <gtest/gtest.h>
56
#include <string.h>
67
#include <atomic>
@@ -12,6 +13,7 @@
1213
#include <mutex>
1314
#include <numeric>
1415
#include <string>
16+
#include <thread>
1517
#include <utility>
1618
#include <vector>
1719

@@ -533,6 +535,53 @@ TEST_F(BasicCurlHttpTests, FinishInAsyncCallback)
533535
}
534536
}
535537

538+
TEST_F(BasicCurlHttpTests, ElegantQuitQuick)
539+
{
540+
auto http_client = http_client::HttpClientFactory::Create();
541+
std::static_pointer_cast<curl::HttpClient>(http_client)->MaybeSpawnBackgroundThread();
542+
// start background first, then test it could wakeup
543+
auto session = http_client->CreateSession("http://127.0.0.1:19000/get/");
544+
auto request = session->CreateRequest();
545+
request->SetUri("get/");
546+
auto handler = std::make_shared<GetEventHandler>();
547+
session->SendRequest(handler);
548+
std::this_thread::sleep_for(std::chrono::milliseconds{10}); // let it enter poll state
549+
auto beg = std::chrono::system_clock::now();
550+
http_client->FinishAllSessions();
551+
http_client.reset();
552+
// when background_thread_wait_for_ is used, it should have no side effect on elegant quit
553+
// wait should be less than scheduled_delay_milliseconds_
554+
// Due to load on CI hosts (some take 10ms), we assert it is less than 20ms
555+
auto cost = std::chrono::system_clock::now() - beg;
556+
ASSERT_TRUE(cost < std::chrono::milliseconds{20})
557+
<< "cost ms: " << std::chrono::duration_cast<std::chrono::milliseconds>(cost).count()
558+
<< " libcurl version: 0x" << std::hex << LIBCURL_VERSION_NUM;
559+
ASSERT_TRUE(handler->is_called_);
560+
ASSERT_TRUE(handler->got_response_);
561+
}
562+
TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore)
563+
{
564+
{
565+
curl::HttpClient http_client;
566+
http_client.MaybeSpawnBackgroundThread();
567+
std::this_thread::sleep_for(std::chrono::milliseconds{10});
568+
#if LIBCURL_VERSION_NUM >= 0x074200
569+
ASSERT_FALSE(http_client.MaybeSpawnBackgroundThread());
570+
#else
571+
// low version curl do not support delay quit, so old background would quit
572+
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
573+
#endif
574+
}
575+
{
576+
curl::HttpClient http_client;
577+
http_client.SetBackgroundWaitFor(std::chrono::milliseconds::zero());
578+
http_client.MaybeSpawnBackgroundThread();
579+
std::this_thread::sleep_for(std::chrono::milliseconds{10});
580+
// we can disable delay quit by set wait for 0
581+
ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread());
582+
}
583+
}
584+
536585
#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW
537586
struct GzipEventHandler : public CustomEventHandler
538587
{

0 commit comments

Comments
 (0)