Skip to content

Commit 6f53da3

Browse files
authored
Async callback Exporter to Processor changes (open-telemetry#1275)
1 parent c3eaa9d commit 6f53da3

File tree

41 files changed

+785
-64
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+785
-64
lines changed

exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_exporter.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,17 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor
8989
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
9090
&records) noexcept override;
9191

92+
/**
93+
* Exports a vector of log records to the Elasticsearch instance asynchronously.
94+
* @param records A list of log records to send to Elasticsearch.
95+
* @param result_callback callback function accepting ExportResult as argument
96+
*/
97+
void Export(
98+
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
99+
&records,
100+
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback) noexcept
101+
override;
102+
92103
/**
93104
* Shutdown this exporter.
94105
* @param timeout The maximum time to wait for the shutdown method to return

exporters/elasticsearch/src/es_log_exporter.cc

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,89 @@ class ResponseHandler : public http_client::EventHandler
110110
bool console_debug_ = false;
111111
};
112112

113+
/**
114+
* This class handles the async response message from the Elasticsearch request
115+
*/
116+
class AsyncResponseHandler : public http_client::EventHandler
117+
{
118+
public:
119+
/**
120+
* Creates a response handler, that by default doesn't display to console
121+
*/
122+
AsyncResponseHandler(
123+
std::shared_ptr<ext::http::client::Session> session,
124+
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback,
125+
bool console_debug = false)
126+
: console_debug_{console_debug},
127+
session_{std::move(session)},
128+
result_callback_{result_callback}
129+
{}
130+
131+
/**
132+
* Cleans up the session in the destructor.
133+
*/
134+
~AsyncResponseHandler() { session_->FinishSession(); }
135+
136+
/**
137+
* Automatically called when the response is received
138+
*/
139+
void OnResponse(http_client::Response &response) noexcept override
140+
{
141+
142+
// Store the body of the request
143+
body_ = std::string(response.GetBody().begin(), response.GetBody().end());
144+
if (body_.find("\"failed\" : 0") == std::string::npos)
145+
{
146+
OTEL_INTERNAL_LOG_ERROR(
147+
"[ES Trace Exporter] Logs were not written to Elasticsearch correctly, response body: "
148+
<< body_);
149+
result_callback_(sdk::common::ExportResult::kFailure);
150+
}
151+
else
152+
{
153+
result_callback_(sdk::common::ExportResult::kSuccess);
154+
}
155+
}
156+
157+
// Callback method when an http event occurs
158+
void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override
159+
{
160+
// If any failure event occurs, release the condition variable to unblock main thread
161+
switch (state)
162+
{
163+
case http_client::SessionState::ConnectFailed:
164+
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Connection to elasticsearch failed");
165+
break;
166+
case http_client::SessionState::SendFailed:
167+
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Request failed to be sent to elasticsearch");
168+
169+
break;
170+
case http_client::SessionState::TimedOut:
171+
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Request to elasticsearch timed out");
172+
173+
break;
174+
case http_client::SessionState::NetworkError:
175+
OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Network error to elasticsearch");
176+
break;
177+
default:
178+
break;
179+
}
180+
result_callback_(sdk::common::ExportResult::kFailure);
181+
}
182+
183+
private:
184+
// Stores the session object for the request
185+
std::shared_ptr<ext::http::client::Session> session_;
186+
// Callback to call to on receiving events
187+
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback_;
188+
189+
// A string to store the response body
190+
std::string body_ = "";
191+
192+
// Whether to print the results from the callback
193+
bool console_debug_ = false;
194+
};
195+
113196
ElasticsearchLogExporter::ElasticsearchLogExporter()
114197
: options_{ElasticsearchExporterOptions()},
115198
http_client_{new ext::http::client::curl::HttpClient()}
@@ -162,8 +245,8 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
162245
request->SetBody(body_vec);
163246

164247
// Send the request
165-
std::unique_ptr<ResponseHandler> handler(new ResponseHandler(options_.console_debug_));
166-
session->SendRequest(*handler);
248+
auto handler = std::make_shared<ResponseHandler>(options_.console_debug_);
249+
session->SendRequest(handler);
167250

168251
// Wait for the response to be received
169252
if (options_.console_debug_)
@@ -198,6 +281,51 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
198281
return sdk::common::ExportResult::kSuccess;
199282
}
200283

284+
void ElasticsearchLogExporter::Export(
285+
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
286+
&records,
287+
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)> result_callback) noexcept
288+
{
289+
// Return failure if this exporter has been shutdown
290+
if (isShutdown())
291+
{
292+
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] Exporting "
293+
<< records.size() << " log(s) failed, exporter is shutdown");
294+
return;
295+
}
296+
297+
// Create a connection to the ElasticSearch instance
298+
auto session = http_client_->CreateSession(options_.host_ + std::to_string(options_.port_));
299+
auto request = session->CreateRequest();
300+
301+
// Populate the request with headers and methods
302+
request->SetUri(options_.index_ + "/_bulk?pretty");
303+
request->SetMethod(http_client::Method::Post);
304+
request->AddHeader("Content-Type", "application/json");
305+
request->SetTimeoutMs(std::chrono::milliseconds(1000 * options_.response_timeout_));
306+
307+
// Create the request body
308+
std::string body = "";
309+
for (auto &record : records)
310+
{
311+
// Append {"index":{}} before JSON body, which tells Elasticsearch to write to index specified
312+
// in URI
313+
body += "{\"index\" : {}}\n";
314+
315+
// Add the context of the Recordable
316+
auto json_record = std::unique_ptr<ElasticSearchRecordable>(
317+
static_cast<ElasticSearchRecordable *>(record.release()));
318+
body += json_record->GetJSON().dump() + "\n";
319+
}
320+
std::vector<uint8_t> body_vec(body.begin(), body.end());
321+
request->SetBody(body_vec);
322+
323+
// Send the request
324+
auto handler =
325+
std::make_shared<AsyncResponseHandler>(session, result_callback, options_.console_debug_);
326+
session->SendRequest(handler);
327+
}
328+
201329
bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
202330
{
203331
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);

exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,15 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
6161
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
6262
override;
6363

64+
/**
65+
* Exports a batch of span recordables asynchronously.
66+
* @param spans a span of unique pointers to span recordables
67+
* @param result_callback callback function accepting ExportResult as argument
68+
*/
69+
void Export(const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
70+
nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)>
71+
result_callback) noexcept override;
72+
6473
/**
6574
* Shutdown the exporter.
6675
* @param timeout an option timeout, default to max.

exporters/jaeger/src/jaeger_exporter.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,15 @@ sdk_common::ExportResult JaegerExporter::Export(
7070
return sdk_common::ExportResult::kSuccess;
7171
}
7272

73+
void JaegerExporter::Export(
74+
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
75+
nostd::function_ref<bool(sdk::common::ExportResult)> result_callback) noexcept
76+
{
77+
OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call");
78+
auto status = Export(spans);
79+
result_callback(status);
80+
}
81+
7382
void JaegerExporter::InitializeEndpoint()
7483
{
7584
if (options_.transport_format == TransportFormat::kThriftUdpCompact)

exporters/memory/include/opentelemetry/exporters/memory/in_memory_span_exporter.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,20 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
6464
return sdk::common::ExportResult::kSuccess;
6565
}
6666

67+
/**
68+
* Exports a batch of span recordables asynchronously.
69+
* @param spans a span of unique pointers to span recordables
70+
* @param result_callback callback function accepting ExportResult as argument
71+
*/
72+
void Export(
73+
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
74+
nostd::function_ref<bool(sdk::common::ExportResult)> result_callback) noexcept override
75+
{
76+
OTEL_INTERNAL_LOG_WARN(" async not supported. Making sync interface call");
77+
auto status = Export(spans);
78+
result_callback(status);
79+
}
80+
6781
/**
6882
* @param timeout an optional value containing the timeout of the exporter
6983
* note: passing custom timeout values is not currently supported for this exporter

exporters/ostream/include/opentelemetry/exporters/ostream/log_exporter.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter
3939
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records) noexcept
4040
override;
4141

42+
/**
43+
* Exports a span of logs sent from the processor asynchronously.
44+
*/
45+
void Export(const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
46+
opentelemetry::nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)>
47+
result_callback) noexcept;
48+
4249
/**
4350
* Marks the OStream Log Exporter as shut down.
4451
*/

exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter
3838
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
3939
&spans) noexcept override;
4040

41+
void Export(
42+
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
43+
&spans,
44+
opentelemetry::nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)>
45+
result_callback) noexcept override;
46+
4147
bool Shutdown(
4248
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
4349

exporters/ostream/src/log_exporter.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,16 @@ sdk::common::ExportResult OStreamLogExporter::Export(
180180
return sdk::common::ExportResult::kSuccess;
181181
}
182182

183+
void OStreamLogExporter::Export(
184+
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
185+
opentelemetry::nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)>
186+
result_callback) noexcept
187+
{
188+
// Do not have async support
189+
auto result = Export(records);
190+
result_callback(result);
191+
}
192+
183193
bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept
184194
{
185195
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);

exporters/ostream/src/span_exporter.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ sdk::common::ExportResult OStreamSpanExporter::Export(
9696
return sdk::common::ExportResult::kSuccess;
9797
}
9898

99+
void OStreamSpanExporter::Export(
100+
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
101+
opentelemetry::nostd::function_ref<bool(opentelemetry::sdk::common::ExportResult)>
102+
result_callback) noexcept
103+
{
104+
auto result = Export(spans);
105+
result_callback(result);
106+
}
107+
99108
bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
100109
{
101110
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);

exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,15 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
5252
sdk::common::ExportResult Export(
5353
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept override;
5454

55+
/**
56+
* Exports a batch of span recordables asynchronously.
57+
* @param spans a span of unique pointers to span recordables
58+
* @param result_callback callback function accepting ExportResult as argument
59+
*/
60+
virtual void Export(
61+
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
62+
nostd::function_ref<bool(sdk::common::ExportResult)> result_callback) noexcept override;
63+
5564
/**
5665
* Shut down the exporter.
5766
* @param timeout an optional timeout, the default timeout of 0 means that no

0 commit comments

Comments
 (0)