Skip to content

Commit daab565

Browse files
author
Karen Xu
authored
Add simple log processor and log exporter interface (#403)
* Add simple log processor and log exporter interface * review comments * update default duration and description * Use span instead of vector for Export() * Minor: timeout changed (for consistency) * Remove TODOs, run format CI, and rebase master
1 parent 956270e commit daab565

File tree

10 files changed

+398
-12
lines changed

10 files changed

+398
-12
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <memory>
20+
#include <vector>
21+
#include "opentelemetry/logs/log_record.h"
22+
#include "opentelemetry/nostd/span.h"
23+
#include "opentelemetry/sdk/logs/processor.h"
24+
25+
OPENTELEMETRY_BEGIN_NAMESPACE
26+
namespace sdk
27+
{
28+
namespace logs
29+
{
30+
/**
31+
* ExportResult is returned as result of exporting a batch of Log Records.
32+
*/
33+
enum class ExportResult
34+
{
35+
// The batch was exported successfully
36+
kSuccess = 0,
37+
// The batch was exported unsuccessfully and was dropped, but can not be retried
38+
kFailure
39+
};
40+
41+
/**
42+
* LogExporter defines the interface that log exporters must implement.
43+
*/
44+
class LogExporter
45+
{
46+
public:
47+
virtual ~LogExporter() = default;
48+
49+
/**
50+
* Exports the batch of log records to their export destination.
51+
* This method must not be called concurrently for the same exporter instance.
52+
* The exporter may attempt to retry sending the batch, but should drop
53+
* and return kFailure after a certain timeout.
54+
* @param records a span of unique pointers to log records
55+
* @returns an ExportResult code (whether export was success or failure)
56+
*/
57+
virtual ExportResult Export(
58+
const nostd::span<std::unique_ptr<opentelemetry::logs::LogRecord>> &records) noexcept = 0;
59+
60+
/**
61+
* Marks the exporter as ShutDown and cleans up any resources as required.
62+
* Shutdown should be called only once for each Exporter instance.
63+
* @param timeout minimum amount of microseconds to wait for shutdown before giving up and
64+
* returning failure.
65+
* @return true if the exporter shutdown succeeded, false otherwise
66+
*/
67+
virtual bool Shutdown(
68+
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0;
69+
};
70+
} // namespace logs
71+
} // namespace sdk
72+
OPENTELEMETRY_END_NAMESPACE

sdk/include/opentelemetry/sdk/logs/processor.h

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,37 @@ namespace sdk
2626
namespace logs
2727
{
2828
/**
29-
* This Log Processor is responsible for conversion of logs to exportable
30-
* representation and passing them to exporters.
29+
* The Log Processor is responsible for passing log records
30+
* to the configured exporter.
3131
*/
3232
class LogProcessor
3333
{
3434
public:
3535
virtual ~LogProcessor() = default;
3636

37+
/**
38+
* OnReceive is called by the SDK once a log record has been successfully created.
39+
* @param record the log record
40+
*/
3741
virtual void OnReceive(std::unique_ptr<opentelemetry::logs::LogRecord> &&record) noexcept = 0;
3842

39-
virtual void ForceFlush(
40-
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;
43+
/**
44+
* Exports all log records that have not yet been exported to the configured Exporter.
45+
* @param timeout that the forceflush is required to finish within.
46+
* @return a result code indicating whether it succeeded, failed or timed out
47+
*/
48+
virtual bool ForceFlush(
49+
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0;
4150

42-
virtual void Shutdown(
43-
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;
51+
/**
52+
* Shuts down the processor and does any cleanup required.
53+
* ShutDown should only be called once for each processor.
54+
* @param timeout minimum amount of microseconds to wait for
55+
* shutdown before giving up and returning failure.
56+
* @return true if the shutdown succeeded, false otherwise
57+
*/
58+
virtual bool Shutdown(
59+
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0;
4460
};
4561
} // namespace logs
4662
} // namespace sdk
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <atomic>
20+
#include <mutex>
21+
22+
#include "opentelemetry/common/spin_lock_mutex.h"
23+
#include "opentelemetry/sdk/logs/exporter.h"
24+
#include "opentelemetry/sdk/logs/processor.h"
25+
26+
OPENTELEMETRY_BEGIN_NAMESPACE
27+
namespace sdk
28+
{
29+
namespace logs
30+
{
31+
/**
32+
* The simple log processor passes all log records
33+
* in a batch of 1 to the configured
34+
* LogExporter.
35+
*
36+
* All calls to the configured LogExporter are synchronized using a
37+
* spin-lock on an atomic_flag.
38+
*/
39+
class SimpleLogProcessor : public LogProcessor
40+
{
41+
42+
public:
43+
explicit SimpleLogProcessor(std::unique_ptr<LogExporter> &&exporter);
44+
virtual ~SimpleLogProcessor() = default;
45+
46+
void OnReceive(std::unique_ptr<opentelemetry::logs::LogRecord> &&record) noexcept override;
47+
48+
bool ForceFlush(
49+
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
50+
51+
bool Shutdown(
52+
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
53+
54+
private:
55+
// The configured exporter
56+
std::unique_ptr<LogExporter> exporter_;
57+
// The lock used to ensure the exporter is not called concurrently
58+
opentelemetry::common::SpinLockMutex lock_;
59+
// The atomic boolean flag to ensure the ShutDown() function is only called once
60+
std::atomic_flag shutdown_latch_{ATOMIC_FLAG_INIT};
61+
};
62+
} // namespace logs
63+
} // namespace sdk
64+
OPENTELEMETRY_END_NAMESPACE

sdk/src/logs/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
add_library(opentelemetry_logs logger_provider.cc logger.cc)
1+
add_library(opentelemetry_logs logger_provider.cc logger.cc
2+
simple_log_processor.cc)
23

34
target_link_libraries(opentelemetry_logs opentelemetry_common)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "opentelemetry/sdk/logs/simple_log_processor.h"
18+
19+
#include <chrono>
20+
#include <vector>
21+
22+
OPENTELEMETRY_BEGIN_NAMESPACE
23+
namespace sdk
24+
{
25+
namespace logs
26+
{
27+
/**
28+
* Initialize a simple log processor.
29+
* @param exporter the configured exporter where log records are sent
30+
*/
31+
SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr<LogExporter> &&exporter)
32+
: exporter_(std::move(exporter))
33+
{}
34+
35+
/**
36+
* Batches the log record it receives in a batch of 1 and immediately sends it
37+
* to the configured exporter
38+
*/
39+
void SimpleLogProcessor::OnReceive(
40+
std::unique_ptr<opentelemetry::logs::LogRecord> &&record) noexcept
41+
{
42+
nostd::span<std::unique_ptr<opentelemetry::logs::LogRecord>> batch(&record, 1);
43+
// Get lock to ensure Export() is never called concurrently
44+
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
45+
46+
if (exporter_->Export(batch) != ExportResult::kSuccess)
47+
{
48+
/* Alert user of the failed export */
49+
}
50+
}
51+
/**
52+
* The simple processor does not have any log records to flush so this method is not used
53+
*/
54+
bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
55+
{
56+
return true;
57+
}
58+
59+
bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
60+
{
61+
// Should only shutdown exporter ONCE.
62+
if (!shutdown_latch_.test_and_set(std::memory_order_acquire))
63+
{
64+
return exporter_->Shutdown(timeout);
65+
}
66+
67+
return false;
68+
}
69+
} // namespace logs
70+
} // namespace sdk
71+
OPENTELEMETRY_END_NAMESPACE

sdk/test/logs/BUILD

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,14 @@ cc_test(
2020
"@com_google_googletest//:gtest_main",
2121
],
2222
)
23+
24+
cc_test(
25+
name = "simple_log_processor_test",
26+
srcs = [
27+
"simple_log_processor_test.cc",
28+
],
29+
deps = [
30+
"//sdk/src/logs",
31+
"@com_google_googletest//:gtest_main",
32+
],
33+
)

sdk/test/logs/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
foreach(testname logger_provider_sdk_test logger_sdk_test)
1+
foreach(testname logger_provider_sdk_test logger_sdk_test
2+
simple_log_processor_test)
23
add_executable(${testname} "${testname}.cc")
34
target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES}
45
${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs)

sdk/test/logs/logger_provider_sdk_test.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,14 @@ TEST(LoggerProviderSDK, LoggerProviderLoggerArguments)
7070
class DummyProcessor : public LogProcessor
7171
{
7272
void OnReceive(std::unique_ptr<opentelemetry::logs::LogRecord> &&record) noexcept {}
73-
void ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {}
74-
void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {}
73+
bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept
74+
{
75+
return true;
76+
}
77+
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept
78+
{
79+
return true;
80+
}
7581
};
7682

7783
TEST(LoggerProviderSDK, GetAndSetProcessor)

sdk/test/logs/logger_sdk_test.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,14 @@ TEST(LoggerSDK, LogToNullProcessor)
3838
class DummyProcessor : public LogProcessor
3939
{
4040
void OnReceive(std::unique_ptr<opentelemetry::logs::LogRecord> &&record) noexcept {}
41-
void ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {}
42-
void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {}
41+
bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept
42+
{
43+
return true;
44+
}
45+
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept
46+
{
47+
return true;
48+
}
4349
};
4450

4551
TEST(LoggerSDK, LogToAProcessor)

0 commit comments

Comments
 (0)