Skip to content

Commit 6399f17

Browse files
Yuhtafacebook-github-bot
authored andcommitted
Add TraceContext to various system calls that can block (apache#8583)
Summary: Pull Request resolved: facebookincubator/velox#8583 Also optimize `TraceContext` for writing by using thread local memory. Reviewed By: oerling Differential Revision: D53146503 fbshipit-source-id: b108e7a890afc97d006c907e3ac75c025ebfcd77
1 parent 12ed56b commit 6399f17

File tree

9 files changed

+122
-49
lines changed

9 files changed

+122
-49
lines changed

velox/common/caching/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ target_link_libraries(
2828
velox_exception
2929
velox_file
3030
velox_memory
31+
velox_process
3132
Folly::folly
3233
fmt::fmt
3334
gflags::gflags

velox/common/caching/SsdFile.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
*/
1616

1717
#include "velox/common/caching/SsdFile.h"
18+
1819
#include <folly/Executor.h>
1920
#include <folly/portability/SysUio.h>
2021
#include "velox/common/base/AsyncSource.h"
2122
#include "velox/common/base/SuccinctPrinter.h"
2223
#include "velox/common/caching/FileIds.h"
2324
#include "velox/common/caching/SsdCache.h"
25+
#include "velox/common/process/TraceContext.h"
2426

2527
#include <fcntl.h>
2628
#ifdef linux
@@ -128,6 +130,7 @@ SsdFile::SsdFile(
128130
shardId_(shardId),
129131
checkpointIntervalBytes_(checkpointIntervalBytes),
130132
executor_(executor) {
133+
process::TraceContext trace("SsdFile::SsdFile");
131134
int32_t oDirect = 0;
132135
#ifdef linux
133136
oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0;
@@ -266,6 +269,7 @@ CoalesceIoStats SsdFile::load(
266269
void SsdFile::read(
267270
uint64_t offset,
268271
const std::vector<folly::Range<char*>>& buffers) {
272+
process::TraceContext trace("SsdFile::read");
269273
readFile_->preadv(offset, buffers);
270274
}
271275

@@ -307,6 +311,7 @@ std::optional<std::pair<uint64_t, int32_t>> SsdFile::getSpace(
307311
}
308312

309313
bool SsdFile::growOrEvictLocked() {
314+
process::TraceContext trace("SsdFile::growOrEvictLocked");
310315
if (numRegions_ < maxRegions_) {
311316
const auto newSize = (numRegions_ + 1) * kRegionSize;
312317
const auto rc = ::ftruncate(fd_, newSize);
@@ -360,6 +365,7 @@ void SsdFile::clearRegionEntriesLocked(const std::vector<int32_t>& regions) {
360365
}
361366

362367
void SsdFile::write(std::vector<CachePin>& pins) {
368+
process::TraceContext trace("SsdFile::write");
363369
// Sorts the pins by their file/offset. In this way what is adjacent in
364370
// storage is likely adjacent on SSD.
365371
std::sort(pins.begin(), pins.end());
@@ -444,6 +450,7 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) {
444450
} // namespace
445451

446452
void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) {
453+
process::TraceContext trace("SsdFile::verifyWrite");
447454
auto testData = std::make_unique<char[]>(entry.size());
448455
const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset());
449456
VELOX_CHECK_EQ(rc, entry.size());
@@ -512,6 +519,7 @@ void SsdFile::clear() {
512519
}
513520

514521
void SsdFile::deleteFile() {
522+
process::TraceContext trace("SsdFile::deleteFile");
515523
if (fd_) {
516524
close(fd_);
517525
fd_ = 0;
@@ -651,6 +659,7 @@ inline const char* asChar(const T* ptr) {
651659
} // namespace
652660

653661
void SsdFile::checkpoint(bool force) {
662+
process::TraceContext trace("SsdFile::checkpoint");
654663
std::lock_guard<std::shared_mutex> l(mutex_);
655664
if (!force && (bytesAfterCheckpoint_ < checkpointIntervalBytes_)) {
656665
return;

velox/common/process/TraceContext.cpp

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,27 @@
1616

1717
#include "velox/common/process/TraceContext.h"
1818

19+
#include "velox/common/process/ThreadLocalRegistry.h"
20+
1921
#include <sstream>
2022

2123
namespace facebook::velox::process {
2224

2325
namespace {
24-
folly::Synchronized<std::unordered_map<std::string, TraceData>>& traceMap() {
25-
static folly::Synchronized<std::unordered_map<std::string, TraceData>>
26-
staticTraceMap;
27-
return staticTraceMap;
28-
}
26+
27+
// We use thread local instead lock here since the critical path is on write
28+
// side.
29+
using Registry = ThreadLocalRegistry<folly::F14FastMap<std::string, TraceData>>;
30+
auto registry = std::make_shared<Registry>();
31+
thread_local Registry::Reference threadLocalTraceData(registry);
32+
2933
} // namespace
3034

3135
TraceContext::TraceContext(std::string label, bool isTemporary)
3236
: label_(std::move(label)),
3337
enterTime_(std::chrono::steady_clock::now()),
3438
isTemporary_(isTemporary) {
35-
traceMap().withWLock([&](auto& counts) {
39+
threadLocalTraceData.withValue([&](auto& counts) {
3640
auto& data = counts[label_];
3741
++data.numThreads;
3842
if (data.numThreads == 1) {
@@ -43,45 +47,59 @@ TraceContext::TraceContext(std::string label, bool isTemporary)
4347
}
4448

4549
TraceContext::~TraceContext() {
46-
traceMap().withWLock([&](auto& counts) {
47-
auto& data = counts[label_];
48-
--data.numThreads;
50+
threadLocalTraceData.withValue([&](auto& counts) {
51+
auto it = counts.find(label_);
52+
auto& data = it->second;
53+
if (--data.numThreads == 0 && isTemporary_) {
54+
counts.erase(it);
55+
return;
56+
}
4957
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
5058
std::chrono::steady_clock::now() - enterTime_)
5159
.count();
5260
data.totalMs += ms;
5361
data.maxMs = std::max<uint64_t>(data.maxMs, ms);
54-
if (!data.numThreads && isTemporary_) {
55-
counts.erase(label_);
56-
}
5762
});
5863
}
5964

6065
// static
6166
std::string TraceContext::statusLine() {
6267
std::stringstream out;
6368
auto now = std::chrono::steady_clock::now();
64-
traceMap().withRLock([&](auto& counts) {
65-
for (auto& pair : counts) {
66-
if (pair.second.numThreads) {
67-
auto continued = std::chrono::duration_cast<std::chrono::milliseconds>(
68-
now - pair.second.startTime)
69-
.count();
69+
auto counts = status();
70+
for (auto& pair : counts) {
71+
if (pair.second.numThreads) {
72+
auto continued = std::chrono::duration_cast<std::chrono::milliseconds>(
73+
now - pair.second.startTime)
74+
.count();
7075

71-
out << pair.first << "=" << pair.second.numThreads << " entered "
72-
<< pair.second.numEnters << " avg ms "
73-
<< (pair.second.totalMs / pair.second.numEnters) << " max ms "
74-
<< pair.second.maxMs << " continuous for " << continued
75-
<< std::endl;
76-
}
76+
out << pair.first << "=" << pair.second.numThreads << " entered "
77+
<< pair.second.numEnters << " avg ms "
78+
<< (pair.second.totalMs / pair.second.numEnters) << " max ms "
79+
<< pair.second.maxMs << " continuous for " << continued << std::endl;
7780
}
78-
});
81+
}
7982
return out.str();
8083
}
8184

8285
// static
83-
std::unordered_map<std::string, TraceData> TraceContext::status() {
84-
return traceMap().withRLock([&](auto& map) { return map; });
86+
folly::F14FastMap<std::string, TraceData> TraceContext::status() {
87+
folly::F14FastMap<std::string, TraceData> total;
88+
registry->forAllValues([&](auto& counts) {
89+
for (auto& [k, v] : counts) {
90+
auto& sofar = total[k];
91+
if (sofar.numEnters == 0) {
92+
sofar.startTime = v.startTime;
93+
} else if (v.numEnters > 0) {
94+
sofar.startTime = std::min(sofar.startTime, v.startTime);
95+
}
96+
sofar.numThreads += v.numThreads;
97+
sofar.numEnters += v.numEnters;
98+
sofar.totalMs += v.totalMs;
99+
sofar.maxMs = std::max(sofar.maxMs, v.maxMs);
100+
}
101+
});
102+
return total;
85103
}
86104

87105
} // namespace facebook::velox::process

velox/common/process/TraceContext.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#include <string>
2121
#include <unordered_map>
2222

23-
#include <folly/Synchronized.h>
23+
#include <folly/container/F14Map.h>
2424

2525
namespace facebook::velox::process {
2626

@@ -63,7 +63,7 @@ class TraceContext {
6363
static std::string statusLine();
6464

6565
// Returns a copy of the trace status.
66-
static std::unordered_map<std::string, TraceData> status();
66+
static folly::F14FastMap<std::string, TraceData> status();
6767

6868
private:
6969
const std::string label_;

velox/common/process/tests/TraceContextTest.cpp

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,69 @@
1515
*/
1616

1717
#include "velox/common/process/TraceContext.h"
18+
1819
#include <fmt/format.h>
20+
#include <folly/synchronization/Baton.h>
21+
#include <folly/synchronization/Latch.h>
1922
#include <gtest/gtest.h>
23+
2024
#include <thread>
2125

22-
using namespace facebook::velox::process;
26+
namespace facebook::velox::process {
27+
namespace {
2328

24-
TEST(TraceContextTest, basic) {
25-
constexpr int32_t kNumThreads = 10;
29+
class TraceContextTest : public testing::Test {
30+
public:
31+
void SetUp() override {
32+
ASSERT_TRUE(TraceContext::status().empty());
33+
}
34+
35+
void TearDown() override {
36+
ASSERT_TRUE(TraceContext::status().empty());
37+
}
38+
};
39+
40+
TEST_F(TraceContextTest, basic) {
41+
constexpr int kNumThreads = 3;
2642
std::vector<std::thread> threads;
43+
folly::Baton<> batons[2][kNumThreads];
44+
folly::Latch latches[2] = {
45+
folly::Latch(kNumThreads),
46+
folly::Latch(kNumThreads),
47+
};
2748
threads.reserve(kNumThreads);
28-
for (int32_t i = 0; i < kNumThreads; ++i) {
29-
threads.push_back(std::thread([i]() {
30-
TraceContext trace1("process data");
31-
TraceContext trace2(fmt::format("Process chunk {}", i), true);
32-
std::this_thread::sleep_for(std::chrono::milliseconds(3));
33-
}));
49+
for (int i = 0; i < kNumThreads; ++i) {
50+
threads.emplace_back([&, i]() {
51+
{
52+
TraceContext trace1("process data");
53+
TraceContext trace2(fmt::format("Process chunk {}", i), true);
54+
latches[0].count_down();
55+
batons[0][i].wait();
56+
}
57+
latches[1].count_down();
58+
batons[1][i].wait();
59+
});
60+
}
61+
latches[0].wait();
62+
auto status = TraceContext::status();
63+
ASSERT_EQ(1 + kNumThreads, status.size());
64+
ASSERT_EQ(kNumThreads, status.at("process data").numThreads);
65+
for (int i = 0; i < kNumThreads; ++i) {
66+
ASSERT_EQ(1, status.at(fmt::format("Process chunk {}", i)).numThreads);
3467
}
35-
std::this_thread::sleep_for(std::chrono::milliseconds(1));
36-
LOG(INFO) << TraceContext::statusLine();
37-
for (auto& thread : threads) {
38-
thread.join();
68+
for (int i = 0; i < kNumThreads; ++i) {
69+
batons[0][i].post();
70+
}
71+
latches[1].wait();
72+
status = TraceContext::status();
73+
ASSERT_EQ(1, status.size());
74+
ASSERT_EQ(0, status.at("process data").numThreads);
75+
ASSERT_EQ(kNumThreads, status.at("process data").numEnters);
76+
for (int i = 0; i < kNumThreads; ++i) {
77+
batons[1][i].post();
78+
threads[i].join();
3979
}
40-
LOG(INFO) << TraceContext::statusLine();
41-
// We expect one entry for "process data". The temporary entries
42-
// are deleted after the treads complete.
43-
auto after = TraceContext::status();
44-
EXPECT_EQ(1, after.size());
45-
EXPECT_EQ(kNumThreads, after["process data"].numEnters);
46-
EXPECT_EQ(0, after["process data"].numThreads);
4780
}
81+
82+
} // namespace
83+
} // namespace facebook::velox::process

velox/dwio/common/ColumnLoader.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
#include "velox/dwio/common/ColumnLoader.h"
1818

19+
#include "velox/common/process/TraceContext.h"
20+
1921
namespace facebook::velox::dwio::common {
2022

2123
// Wraps '*result' in a dictionary to make the contiguous values
@@ -45,6 +47,7 @@ void ColumnLoader::loadInternal(
4547
ValueHook* hook,
4648
vector_size_t resultSize,
4749
VectorPtr* result) {
50+
process::TraceContext trace("ColumnLoader::loadInternal");
4851
VELOX_CHECK_EQ(
4952
version_,
5053
structReader_->numReads(),

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "velox/dwio/common/SelectiveStructColumnReader.h"
1818

19+
#include "velox/common/process/TraceContext.h"
1920
#include "velox/dwio/common/ColumnLoader.h"
2021

2122
namespace facebook::velox::dwio::common {
@@ -56,6 +57,7 @@ void SelectiveStructColumnReaderBase::next(
5657
uint64_t numValues,
5758
VectorPtr& result,
5859
const Mutation* mutation) {
60+
process::TraceContext trace("SelectiveStructColumnReaderBase::next");
5961
if (children_.empty()) {
6062
if (mutation && mutation->deletedRows) {
6163
numValues -= bits::countBits(mutation->deletedRows, 0, numValues);

velox/dwio/dwrf/reader/ReaderBase.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include <fmt/format.h>
2020

21+
#include "velox/common/process/TraceContext.h"
2122
#include "velox/dwio/common/exception/Exception.h"
2223

2324
namespace facebook::velox::dwrf {
@@ -100,6 +101,7 @@ ReaderBase::ReaderBase(
100101
footerEstimatedSize_(footerEstimatedSize),
101102
filePreloadThreshold_(filePreloadThreshold),
102103
input_(std::move(input)) {
104+
process::TraceContext trace("ReaderBase::ReaderBase");
103105
// read last bytes into buffer to get PostScript
104106
// If file is small, load the entire file.
105107
// TODO: make a config

velox/exec/HashTable.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "velox/common/base/Portability.h"
2121
#include "velox/common/base/SimdUtil.h"
2222
#include "velox/common/process/ProcessBase.h"
23+
#include "velox/common/process/TraceContext.h"
2324
#include "velox/common/testutil/TestValue.h"
2425
#include "velox/exec/OperatorUtils.h"
2526
#include "velox/vector/VectorTypeUtils.h"
@@ -857,6 +858,7 @@ bool HashTable<ignoreNullKeys>::canApplyParallelJoinBuild() const {
857858

858859
template <bool ignoreNullKeys>
859860
void HashTable<ignoreNullKeys>::parallelJoinBuild() {
861+
process::TraceContext trace("HashTable::parallelJoinBuild");
860862
TestValue::adjust(
861863
"facebook::velox::exec::HashTable::parallelJoinBuild", rows_->pool());
862864
VELOX_CHECK_LE(1 + otherTables_.size(), std::numeric_limits<uint8_t>::max());

0 commit comments

Comments
 (0)