Skip to content

Commit 12ed56b

Browse files
Yuhtafacebook-github-bot
authored andcommitted
Add ThreadLocalRegistry (apache#8623)
Summary: Pull Request resolved: facebookincubator/velox#8623 Add this utility class to keep static thread local objects while being able to iterate over all of them from one single thread. Reviewed By: oerling Differential Revision: D53277766 fbshipit-source-id: f36d38934358080ed7210bb36c4619b859df7758
1 parent e9301cb commit 12ed56b

File tree

3 files changed

+202
-1
lines changed

3 files changed

+202
-1
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <list>
20+
#include <memory>
21+
#include <mutex>
22+
23+
namespace facebook::velox::process {
24+
25+
/// A registry for keeping static thread local objects of type T. Similar to
26+
/// folly::ThreadLocal but a little bit more efficient in terms of performance
27+
/// and memory usage, because we do not support thread local with lexical scope.
28+
///
29+
/// NOTE: only one instance of ThreadLocalRegistry<T> can be created with each
30+
/// T.
31+
template <typename T>
32+
class ThreadLocalRegistry {
33+
public:
34+
class Reference;
35+
36+
/// Access values from all threads. Takes a global lock and should be used
37+
/// with caution.
38+
template <typename F>
39+
void forAllValues(F f) {
40+
std::lock_guard<std::mutex> entriesLock(entriesMutex_);
41+
for (auto& entry : entries_) {
42+
std::lock_guard<std::mutex> lk(entry->mutex);
43+
f(entry->value);
44+
}
45+
}
46+
47+
private:
48+
struct Entry {
49+
std::mutex mutex;
50+
T value;
51+
};
52+
53+
std::list<std::unique_ptr<Entry>> entries_;
54+
std::mutex entriesMutex_;
55+
};
56+
57+
/// Reference to one thread local value. Should be stored in thread local
58+
/// memory.
59+
template <typename T>
60+
class ThreadLocalRegistry<T>::Reference {
61+
public:
62+
explicit Reference(const std::shared_ptr<ThreadLocalRegistry>& registry)
63+
: registry_(registry) {
64+
auto entry = std::make_unique<Entry>();
65+
std::lock_guard<std::mutex> lk(registry_->entriesMutex_);
66+
iterator_ =
67+
registry_->entries_.insert(registry_->entries_.end(), std::move(entry));
68+
}
69+
70+
~Reference() {
71+
std::lock_guard<std::mutex> lk(registry_->entriesMutex_);
72+
registry_->entries_.erase(iterator_);
73+
}
74+
75+
/// Obtain the thread local value and process it with the functor `f'.
76+
template <typename F>
77+
auto withValue(F f) {
78+
auto* entry = iterator_->get();
79+
std::lock_guard<std::mutex> lk(entry->mutex);
80+
return f(entry->value);
81+
}
82+
83+
private:
84+
std::shared_ptr<ThreadLocalRegistry> const registry_;
85+
typename std::list<std::unique_ptr<Entry>>::iterator iterator_;
86+
};
87+
88+
} // namespace facebook::velox::process

velox/common/process/tests/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
add_executable(velox_process_test TraceContextTest.cpp)
15+
add_executable(velox_process_test TraceContextTest.cpp
16+
ThreadLocalRegistryTest.cpp)
1617

1718
add_test(velox_process_test velox_process_test)
1819

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/common/process/ThreadLocalRegistry.h"
18+
19+
#include <folly/synchronization/Baton.h>
20+
#include <folly/synchronization/Latch.h>
21+
#include <gtest/gtest.h>
22+
23+
#include <atomic>
24+
25+
namespace facebook::velox::process {
26+
namespace {
27+
28+
template <typename Tag>
29+
class TestObject {
30+
public:
31+
static std::atomic_int& count() {
32+
static std::atomic_int value;
33+
return value;
34+
}
35+
36+
TestObject() : threadId_(std::this_thread::get_id()) {
37+
++count();
38+
}
39+
40+
~TestObject() {
41+
--count();
42+
}
43+
44+
std::thread::id threadId() const {
45+
return threadId_;
46+
}
47+
48+
private:
49+
const std::thread::id threadId_;
50+
};
51+
52+
TEST(ThreadLocalRegistryTest, basic) {
53+
struct Tag {};
54+
using T = TestObject<Tag>;
55+
ASSERT_EQ(T::count(), 0);
56+
auto registry = std::make_shared<ThreadLocalRegistry<T>>();
57+
registry->forAllValues([](const T&) { FAIL(); });
58+
thread_local ThreadLocalRegistry<T>::Reference ref(registry);
59+
const T* object = ref.withValue([](const T& x) {
60+
EXPECT_EQ(T::count(), 1);
61+
return &x;
62+
});
63+
ASSERT_EQ(object->threadId(), std::this_thread::get_id());
64+
ref.withValue([&](const T& x) { ASSERT_EQ(&x, object); });
65+
int count = 0;
66+
registry->forAllValues([&](const T& x) {
67+
++count;
68+
ASSERT_EQ(x.threadId(), std::this_thread::get_id());
69+
});
70+
ASSERT_EQ(count, 1);
71+
ASSERT_EQ(T::count(), 1);
72+
}
73+
74+
TEST(ThreadLocalRegistryTest, multiThread) {
75+
struct Tag {};
76+
using T = TestObject<Tag>;
77+
ASSERT_EQ(T::count(), 0);
78+
auto registry = std::make_shared<ThreadLocalRegistry<T>>();
79+
constexpr int kNumThreads = 7;
80+
std::vector<std::thread> threads;
81+
folly::Latch latch(kNumThreads);
82+
folly::Baton<> batons[kNumThreads];
83+
const T* objects[kNumThreads];
84+
for (int i = 0; i < kNumThreads; ++i) {
85+
threads.emplace_back([&, i] {
86+
thread_local ThreadLocalRegistry<T>::Reference ref(registry);
87+
objects[i] = ref.withValue([](const T& x) { return &x; });
88+
latch.count_down();
89+
batons[i].wait();
90+
});
91+
}
92+
latch.wait();
93+
std::vector<int> indices;
94+
registry->forAllValues([&](const T& x) {
95+
auto it = std::find(std::begin(objects), std::end(objects), &x);
96+
indices.push_back(it - std::begin(objects));
97+
});
98+
ASSERT_EQ(indices.size(), kNumThreads);
99+
std::sort(indices.begin(), indices.end());
100+
for (int i = 0; i < kNumThreads; ++i) {
101+
ASSERT_EQ(indices[i], i);
102+
ASSERT_EQ(objects[i]->threadId(), threads[i].get_id());
103+
ASSERT_EQ(T::count(), kNumThreads - i);
104+
batons[i].post();
105+
threads[i].join();
106+
}
107+
ASSERT_EQ(T::count(), 0);
108+
registry->forAllValues([](const T&) { FAIL(); });
109+
}
110+
111+
} // namespace
112+
} // namespace facebook::velox::process

0 commit comments

Comments
 (0)