Skip to content

Commit 69e74ab

Browse files
author
Yao Xiao
committed
recovery mode
1 parent 8422203 commit 69e74ab

File tree

3 files changed

+45
-20
lines changed

3 files changed

+45
-20
lines changed

fdbclient/ServerKnobs.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
464464
init( ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY, isSimulated ? 10.0 : 300.0 ); // Delays shard clean up, must be larger than ROCKSDB_READ_VALUE_TIMEOUT to prevent reading deleted shard.
465465
init( ROCKSDB_RETURN_OVERLOADED_ON_TIMEOUT, false ); if ( randomize && BUGGIFY ) ROCKSDB_RETURN_OVERLOADED_ON_TIMEOUT = true;
466466
init( ROCKSDB_COMPACTION_PRI, 3 ); // kMinOverlappingRatio, RocksDB default.
467+
init( ROCKSDB_WAL_RECOVERY_MODE, 2 ); // kPointInTimeRecovery, RocksDB default.
468+
init( ROCKSDB_TARGET_FILE_SIZE_BASE, 16777216 ); // 16MB, RocksDB default.
469+
init( ROCKSDB_MAX_OPEN_FILES, 50000 ); // Should be smaller than OS's fd limit.
467470

468471
// Leader election
469472
bool longLeaderElection = randomize && BUGGIFY;

fdbclient/include/fdbclient/ServerKnobs.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,9 @@ class ServerKnobs : public KnobsImpl<ServerKnobs> {
373373
double ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY;
374374
bool ROCKSDB_RETURN_OVERLOADED_ON_TIMEOUT;
375375
int ROCKSDB_COMPACTION_PRI;
376+
int ROCKSDB_WAL_RECOVERY_MODE;
377+
int ROCKSDB_TARGET_FILE_SIZE_BASE;
378+
int ROCKSDB_MAX_OPEN_FILES;
376379

377380
// Leader election
378381
int MAX_NOTIFICATIONS;

fdbserver/KeyValueStoreShardedRocksDB.actor.cpp

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,13 @@ class RocksDBErrorListener : public rocksdb::EventListener {
111111
public:
112112
RocksDBErrorListener(){};
113113
void OnBackgroundError(rocksdb::BackgroundErrorReason reason, rocksdb::Status* bg_error) override {
114+
if (!bg_error)
115+
return;
114116
TraceEvent(SevError, "ShardedRocksDBBGError")
115117
.detail("Reason", getErrorReason(reason))
116118
.detail("ShardedRocksDBSeverity", bg_error->severity())
117119
.detail("Status", bg_error->ToString());
120+
118121
std::unique_lock<std::mutex> lock(mutex);
119122
if (!errorPromise.isValid())
120123
return;
@@ -249,6 +252,22 @@ rocksdb::CompactionPri getCompactionPriority() {
249252
}
250253
}
251254

255+
rocksdb::WALRecoveryMode getWalRecoveryMode() {
256+
switch (SERVER_KNOBS->ROCKSDB_WAL_RECOVERY_MODE) {
257+
case 0:
258+
return rocksdb::WALRecoveryMode::kTolerateCorruptedTailRecords;
259+
case 1:
260+
return rocksdb::WALRecoveryMode::kAbsoluteConsistency;
261+
case 2:
262+
return rocksdb::WALRecoveryMode::kPointInTimeRecovery;
263+
case 3:
264+
return rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords;
265+
default:
266+
TraceEvent(SevWarn, "InvalidWalRecoveryMode").detail("KnobValue", SERVER_KNOBS->ROCKSDB_WAL_RECOVERY_MODE);
267+
return rocksdb::WALRecoveryMode::kPointInTimeRecovery;
268+
}
269+
}
270+
252271
rocksdb::ColumnFamilyOptions getCFOptions() {
253272
rocksdb::ColumnFamilyOptions options;
254273
options.level_compaction_dynamic_level_bytes = SERVER_KNOBS->ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES;
@@ -324,13 +343,16 @@ rocksdb::ColumnFamilyOptions getCFOptions() {
324343
}
325344

326345
rocksdb::Options getOptions() {
327-
rocksdb::Options options({}, getCFOptions());
346+
rocksdb::Options options;
328347
options.avoid_unnecessary_blocking_io = true;
329348
options.create_if_missing = true;
330349
if (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM > 0) {
331350
options.IncreaseParallelism(SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM);
332351
}
333352

353+
options.wal_recovery_mode = getWalRecoveryMode();
354+
options.target_file_size_base = SERVER_KNOBS->ROCKSDB_TARGET_FILE_SIZE_BASE;
355+
options.max_open_files = SERVER_KNOBS->ROCKSDB_MAX_OPEN_FILES;
334356
options.delete_obsolete_files_period_micros = SERVER_KNOBS->ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD * 1000000;
335357
options.max_total_wal_size = SERVER_KNOBS->ROCKSDB_MAX_TOTAL_WAL_SIZE;
336358
options.max_subcompactions = SERVER_KNOBS->ROCKSDB_MAX_SUBCOMPACTIONS;
@@ -632,10 +654,14 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, i
632654
// Manages physical shards and maintains logical shard mapping.
633655
class ShardManager {
634656
public:
635-
ShardManager(std::string path, UID logId, const rocksdb::Options& options, std::shared_ptr<RocksDBErrorListener> errorListener)
636-
: path(path), logId(logId), errorListener(errorListener) ,dbOptions(options), dataShardMap(nullptr, specialKeys.end) {
657+
ShardManager(std::string path,
658+
UID logId,
659+
const rocksdb::Options& options,
660+
std::shared_ptr<RocksDBErrorListener> errorListener)
661+
: path(path), logId(logId), dbOptions(options), cfOptions(getCFOptions()),
662+
dataShardMap(nullptr, specialKeys.end) {
637663
dbOptions.listeners.push_back(errorListener);
638-
}
664+
}
639665

640666
ACTOR static Future<Void> shardMetricsLogger(std::shared_ptr<ShardedRocksDBState> rState,
641667
Future<Void> openFuture,
@@ -708,15 +734,14 @@ class ShardManager {
708734
if (name == METADATA_SHARD_ID) {
709735
foundMetadata = true;
710736
}
711-
descriptors.push_back(rocksdb::ColumnFamilyDescriptor{ name, rocksdb::ColumnFamilyOptions(dbOptions) });
737+
descriptors.push_back(rocksdb::ColumnFamilyDescriptor(name, cfOptions));
712738
}
713739

714740
ASSERT(foundMetadata || descriptors.size() == 0);
715741

716742
// Add default column family if it's a newly opened database.
717743
if (descriptors.size() == 0) {
718-
descriptors.push_back(
719-
rocksdb::ColumnFamilyDescriptor{ "default", rocksdb::ColumnFamilyOptions(dbOptions) });
744+
descriptors.push_back(rocksdb::ColumnFamilyDescriptor("default", cfOptions));
720745
}
721746

722747
std::vector<rocksdb::ColumnFamilyHandle*> handles;
@@ -825,8 +850,7 @@ class ShardManager {
825850
physicalShards[defaultShard->id] = defaultShard;
826851

827852
// Create metadata shard.
828-
auto metadataShard =
829-
std::make_shared<PhysicalShard>(db, METADATA_SHARD_ID, rocksdb::ColumnFamilyOptions(dbOptions));
853+
auto metadataShard = std::make_shared<PhysicalShard>(db, METADATA_SHARD_ID, cfOptions);
830854
metadataShard->init();
831855
columnFamilyMap[metadataShard->cf->GetID()] = metadataShard->cf;
832856
physicalShards[METADATA_SHARD_ID] = metadataShard;
@@ -897,8 +921,7 @@ class ShardManager {
897921
}
898922
}
899923

900-
auto [it, inserted] = physicalShards.emplace(
901-
id, std::make_shared<PhysicalShard>(db, id, rocksdb::ColumnFamilyOptions(dbOptions)));
924+
auto [it, inserted] = physicalShards.emplace(id, std::make_shared<PhysicalShard>(db, id, cfOptions));
902925
std::shared_ptr<PhysicalShard>& shard = it->second;
903926

904927
activePhysicalShardIds.emplace(id);
@@ -1221,7 +1244,7 @@ class ShardManager {
12211244
const std::string path;
12221245
const UID logId;
12231246
rocksdb::Options dbOptions;
1224-
std::shared_ptr<RocksDBErrorListener> errorListener;
1247+
rocksdb::ColumnFamilyOptions cfOptions;
12251248
rocksdb::DB* db = nullptr;
12261249
std::unordered_map<std::string, std::shared_ptr<PhysicalShard>> physicalShards;
12271250
std::unordered_set<std::string> activePhysicalShardIds;
@@ -1826,15 +1849,12 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
18261849
Optional<Future<Void>>& metrics;
18271850
const FlowLock* readLock;
18281851
const FlowLock* fetchLock;
1829-
std::shared_ptr<RocksDBErrorListener> errorListener;
18301852

18311853
OpenAction(ShardManager* shardManager,
18321854
Optional<Future<Void>>& metrics,
18331855
const FlowLock* readLock,
1834-
const FlowLock* fetchLock,
1835-
std::shared_ptr<RocksDBErrorListener> errorListener)
1836-
: shardManager(shardManager), metrics(metrics), readLock(readLock), fetchLock(fetchLock),
1837-
errorListener(errorListener) {}
1856+
const FlowLock* fetchLock)
1857+
: shardManager(shardManager), metrics(metrics), readLock(readLock), fetchLock(fetchLock) {}
18381858

18391859
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
18401860
};
@@ -2432,8 +2452,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
24322452
// of opening and closing multiple rocksdb instances, we reconcile the shard map using persist shard
24332453
// mapping data.
24342454
} else {
2435-
auto a = std::make_unique<Writer::OpenAction>(
2436-
&shardManager, metrics, &readSemaphore, &fetchSemaphore, errorListener);
2455+
auto a = std::make_unique<Writer::OpenAction>(&shardManager, metrics, &readSemaphore, &fetchSemaphore);
24372456
openFuture = a->done.getFuture();
24382457
this->metrics = ShardManager::shardMetricsLogger(this->rState, openFuture, &shardManager) &&
24392458
rocksDBAggregatedMetricsLogger(this->rState, openFuture, rocksDBMetrics, &shardManager);
@@ -2685,13 +2704,13 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
26852704

26862705
std::shared_ptr<ShardedRocksDBState> rState;
26872706
rocksdb::Options dbOptions;
2707+
std::shared_ptr<RocksDBErrorListener> errorListener;
26882708
ShardManager shardManager;
26892709
std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
26902710
std::string path;
26912711
UID id;
26922712
Reference<IThreadPool> writeThread;
26932713
Reference<IThreadPool> readThreads;
2694-
std::shared_ptr<RocksDBErrorListener> errorListener;
26952714
Future<Void> errorFuture;
26962715
Promise<Void> closePromise;
26972716
Future<Void> openFuture;

0 commit comments

Comments
 (0)