Skip to content

Commit 7d87f02

Browse files
author
Nathan Bronson
committed
support for concurrent adds to memtable
Summary: This diff adds support for concurrent adds to the skiplist memtable implementations. Memory allocation is made thread-safe by the addition of a spinlock, with small per-core buffers to avoid contention. Concurrent memtable writes are made via an additional method and don't impose a performance overhead on the non-concurrent case, so parallelism can be selected on a per-batch basis. Write thread synchronization is an increasing bottleneck for higher levels of concurrency, so this diff adds --enable_write_thread_adaptive_yield (default off). This feature causes threads joining a write batch group to spin for a short time (default 100 usec) using sched_yield, rather than going to sleep on a mutex. If the timing of the yield calls indicates that another thread has actually run during the yield then spinning is avoided. This option improves performance for concurrent situations even without parallel adds, although it has the potential to increase CPU usage (and the heuristic adaptation is not yet mature). Parallel writes are not currently compatible with inplace updates, update callbacks, or delete filtering. Enable it with --allow_concurrent_memtable_write (and --enable_write_thread_adaptive_yield). Parallel memtable writes are performance neutral when there is no actual parallelism, and in my experiments (SSD server-class Linux and varying contention and key sizes for fillrandom) they are always a performance win when there is more than one thread. Statistics are updated earlier in the write path, dropping the number of DB mutex acquisitions from 2 to 1 for almost all cases. This diff was motivated and inspired by Yahoo's cLSM work. It is more conservative than cLSM: RocksDB's write batch group leader role is preserved (along with all of the existing flush and write throttling logic) and concurrent writers are blocked until all memtable insertions have completed and the sequence number has been advanced, to preserve linearizability. My test config is "db_bench -benchmarks=fillrandom -threads=$T -batch_size=1 -memtablerep=skip_list -value_size=100 --num=1000000/$T -level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999 -disable_auto_compactions --max_write_buffer_number=8 -max_background_flushes=8 --disable_wal --write_buffer_size=160000000 --block_size=16384 --allow_concurrent_memtable_write" on a two-socket Xeon E5-2660 @ 2.2Ghz with lots of memory and an SSD hard drive. With 1 thread I get ~440Kops/sec. Peak performance for 1 socket (numactl -N1) is slightly more than 1Mops/sec, at 16 threads. Peak performance across both sockets happens at 30 threads, and is ~900Kops/sec, although with fewer threads there is less performance loss when the system has background work. Test Plan: 1. concurrent stress tests for InlineSkipList and DynamicBloom 2. make clean; make check 3. make clean; DISABLE_JEMALLOC=1 make valgrind_check; valgrind db_bench 4. make clean; COMPILE_WITH_TSAN=1 make all check; db_bench 5. make clean; COMPILE_WITH_ASAN=1 make all check; db_bench 6. make clean; OPT=-DROCKSDB_LITE make check 7. verify no perf regressions when disabled Reviewers: igor, sdong Reviewed By: sdong Subscribers: MarkCallaghan, IslamAbdelRahman, anthony, yhchiang, rven, sdong, guyg8, kradhakrishnan, dhruba Differential Revision: https://reviews.facebook.net/D50589
1 parent 5b2587b commit 7d87f02

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1812
-495
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ set(SOURCES
191191
util/coding.cc
192192
util/compaction_job_stats_impl.cc
193193
util/comparator.cc
194+
util/concurrent_arena.cc
194195
util/crc32c.cc
195196
util/db_info_dumper.cc
196197
util/delete_scheduler_impl.cc

db/column_family.cc

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,20 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
110110
return Status::OK();
111111
}
112112

113+
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
114+
if (cf_options.inplace_update_support) {
115+
return Status::InvalidArgument(
116+
"In-place memtable updates (inplace_update_support) is not compatible "
117+
"with concurrent writes (allow_concurrent_memtable_write)");
118+
}
119+
if (cf_options.filter_deletes) {
120+
return Status::InvalidArgument(
121+
"Delete filtering (filter_deletes) is not compatible with concurrent "
122+
"memtable writes (allow_concurrent_memtable_writes)");
123+
}
124+
return Status::OK();
125+
}
126+
113127
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
114128
const InternalKeyComparator* icmp,
115129
const ColumnFamilyOptions& src) {
@@ -916,13 +930,6 @@ ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
916930
return &handle_;
917931
}
918932

919-
void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
920-
if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
921-
flush_scheduler_->ScheduleFlush(current_);
922-
current_->mem()->MarkFlushScheduled();
923-
}
924-
}
925-
926933
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
927934
uint32_t column_family_id = 0;
928935
if (column_family != nullptr) {

db/column_family.h

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919
#include "db/write_controller.h"
2020
#include "db/table_cache.h"
2121
#include "db/table_properties_collector.h"
22-
#include "db/flush_scheduler.h"
2322
#include "rocksdb/compaction_job_stats.h"
2423
#include "rocksdb/db.h"
2524
#include "rocksdb/env.h"
2625
#include "rocksdb/options.h"
27-
#include "util/instrumented_mutex.h"
2826
#include "util/mutable_cf_options.h"
2927
#include "util/thread_local.h"
3028

@@ -134,6 +132,9 @@ struct SuperVersion {
134132

135133
extern Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options);
136134

135+
extern Status CheckConcurrentWritesSupported(
136+
const ColumnFamilyOptions& cf_options);
137+
137138
extern ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
138139
const InternalKeyComparator* icmp,
139140
const ColumnFamilyOptions& src);
@@ -158,14 +159,16 @@ class ColumnFamilyData {
158159
// thread-safe
159160
const std::string& GetName() const { return name_; }
160161

161-
// Ref() can only be called whily holding a DB mutex or during a
162-
// single-threaded write.
162+
// Ref() can only be called from a context where the caller can guarantee
163+
// that ColumnFamilyData is alive (while holding a non-zero ref already,
164+
// holding a DB mutex, or as the leader in a write batch group).
163165
void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
164-
// will just decrease reference count to 0, but will not delete it. returns
165-
// true if the ref count was decreased to zero. in that case, it can be
166-
// deleted by the caller immediately, or later, by calling
167-
// FreeDeadColumnFamilies()
168-
// Unref() can only be called while holding a DB mutex
166+
167+
// Unref decreases the reference count, but does not handle deletion
168+
// when the count goes to 0. If this method returns true then the
169+
// caller should delete the instance immediately, or later, by calling
170+
// FreeDeadColumnFamilies(). Unref() can only be called while holding
171+
// a DB mutex, or during single-threaded recovery.
169172
bool Unref() {
170173
int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed);
171174
assert(old_refs > 0);
@@ -497,36 +500,42 @@ class ColumnFamilySet {
497500
// memtables of different column families (specified by ID in the write batch)
498501
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
499502
public:
500-
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set,
501-
FlushScheduler* flush_scheduler)
502-
: column_family_set_(column_family_set),
503-
current_(nullptr),
504-
flush_scheduler_(flush_scheduler) {}
503+
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
504+
: column_family_set_(column_family_set), current_(nullptr) {}
505+
506+
// Constructs a ColumnFamilyMemTablesImpl equivalent to one constructed
507+
// with the arguments used to construct *orig.
508+
explicit ColumnFamilyMemTablesImpl(ColumnFamilyMemTablesImpl* orig)
509+
: column_family_set_(orig->column_family_set_), current_(nullptr) {}
505510

506511
// sets current_ to ColumnFamilyData with column_family_id
507512
// returns false if column family doesn't exist
508-
// REQUIRES: under a DB mutex OR from a write thread
513+
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
514+
// under a DB mutex OR from a write thread
509515
bool Seek(uint32_t column_family_id) override;
510516

511517
// Returns log number of the selected column family
512518
// REQUIRES: under a DB mutex OR from a write thread
513519
uint64_t GetLogNumber() const override;
514520

515521
// REQUIRES: Seek() called first
516-
// REQUIRES: under a DB mutex OR from a write thread
522+
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
523+
// under a DB mutex OR from a write thread
517524
virtual MemTable* GetMemTable() const override;
518525

519526
// Returns column family handle for the selected column family
520-
// REQUIRES: under a DB mutex OR from a write thread
527+
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
528+
// under a DB mutex OR from a write thread
521529
virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;
522530

523-
// REQUIRES: under a DB mutex OR from a write thread
524-
virtual void CheckMemtableFull() override;
531+
// Cannot be called while another thread is calling Seek().
532+
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
533+
// under a DB mutex OR from a write thread
534+
virtual ColumnFamilyData* current() { return current_; }
525535

526536
private:
527537
ColumnFamilySet* column_family_set_;
528538
ColumnFamilyData* current_;
529-
FlushScheduler* flush_scheduler_;
530539
ColumnFamilyHandleInternal handle_;
531540
};
532541

db/db_bench.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,20 @@ DEFINE_uint64(delayed_write_rate, 8388608u,
646646
"Limited bytes allowed to DB when soft_rate_limit or "
647647
"level0_slowdown_writes_trigger triggers");
648648

649+
DEFINE_bool(allow_concurrent_memtable_write, false,
650+
"Allow multi-writers to update mem tables in parallel.");
651+
652+
DEFINE_bool(enable_write_thread_adaptive_yield, false,
653+
"Use a yielding spin loop for brief writer thread waits.");
654+
655+
DEFINE_uint64(
656+
write_thread_max_yield_usec, 100,
657+
"Maximum microseconds for enable_write_thread_adaptive_yield operation.");
658+
659+
DEFINE_uint64(write_thread_slow_yield_usec, 3,
660+
"The threshold at which a slow yield is considered a signal that "
661+
"other processes or threads want the core.");
662+
649663
DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
650664
"When hard_rate_limit is set then this is the max time a put will"
651665
" be stalled.");
@@ -2552,6 +2566,12 @@ class Benchmark {
25522566
options.hard_pending_compaction_bytes_limit =
25532567
FLAGS_hard_pending_compaction_bytes_limit;
25542568
options.delayed_write_rate = FLAGS_delayed_write_rate;
2569+
options.allow_concurrent_memtable_write =
2570+
FLAGS_allow_concurrent_memtable_write;
2571+
options.enable_write_thread_adaptive_yield =
2572+
FLAGS_enable_write_thread_adaptive_yield;
2573+
options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
2574+
options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
25552575
options.rate_limit_delay_max_milliseconds =
25562576
FLAGS_rate_limit_delay_max_milliseconds;
25572577
options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;

0 commit comments

Comments
 (0)