@@ -111,10 +111,13 @@ class RocksDBErrorListener : public rocksdb::EventListener {
111111public:
112112 RocksDBErrorListener (){};
113113 void OnBackgroundError (rocksdb::BackgroundErrorReason reason, rocksdb::Status* bg_error) override {
114+ if (!bg_error)
115+ return ;
114116 TraceEvent (SevError, " ShardedRocksDBBGError" )
115117 .detail (" Reason" , getErrorReason (reason))
116118 .detail (" ShardedRocksDBSeverity" , bg_error->severity ())
117119 .detail (" Status" , bg_error->ToString ());
120+
118121 std::unique_lock<std::mutex> lock (mutex);
119122 if (!errorPromise.isValid ())
120123 return ;
@@ -249,6 +252,22 @@ rocksdb::CompactionPri getCompactionPriority() {
249252 }
250253}
251254
255+ rocksdb::WALRecoveryMode getWalRecoveryMode () {
256+ switch (SERVER_KNOBS->ROCKSDB_WAL_RECOVERY_MODE ) {
257+ case 0 :
258+ return rocksdb::WALRecoveryMode::kTolerateCorruptedTailRecords ;
259+ case 1 :
260+ return rocksdb::WALRecoveryMode::kAbsoluteConsistency ;
261+ case 2 :
262+ return rocksdb::WALRecoveryMode::kPointInTimeRecovery ;
263+ case 3 :
264+ return rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords ;
265+ default :
266+ TraceEvent (SevWarn, " InvalidWalRecoveryMode" ).detail (" KnobValue" , SERVER_KNOBS->ROCKSDB_WAL_RECOVERY_MODE );
267+ return rocksdb::WALRecoveryMode::kPointInTimeRecovery ;
268+ }
269+ }
270+
252271rocksdb::ColumnFamilyOptions getCFOptions () {
253272 rocksdb::ColumnFamilyOptions options;
254273 options.level_compaction_dynamic_level_bytes = SERVER_KNOBS->ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES ;
@@ -324,13 +343,16 @@ rocksdb::ColumnFamilyOptions getCFOptions() {
324343}
325344
326345rocksdb::Options getOptions () {
327- rocksdb::Options options ({}, getCFOptions ()) ;
346+ rocksdb::Options options;
328347 options.avoid_unnecessary_blocking_io = true ;
329348 options.create_if_missing = true ;
330349 if (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM > 0 ) {
331350 options.IncreaseParallelism (SERVER_KNOBS->ROCKSDB_BACKGROUND_PARALLELISM );
332351 }
333352
353+ options.wal_recovery_mode = getWalRecoveryMode ();
354+ options.target_file_size_base = SERVER_KNOBS->ROCKSDB_TARGET_FILE_SIZE_BASE ;
355+ options.max_open_files = SERVER_KNOBS->ROCKSDB_MAX_OPEN_FILES ;
334356 options.delete_obsolete_files_period_micros = SERVER_KNOBS->ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD * 1000000 ;
335357 options.max_total_wal_size = SERVER_KNOBS->ROCKSDB_MAX_TOTAL_WAL_SIZE ;
336358 options.max_subcompactions = SERVER_KNOBS->ROCKSDB_MAX_SUBCOMPACTIONS ;
@@ -632,10 +654,14 @@ int readRangeInDb(PhysicalShard* shard, const KeyRangeRef range, int rowLimit, i
632654// Manages physical shards and maintains logical shard mapping.
633655class ShardManager {
634656public:
635- ShardManager (std::string path, UID logId, const rocksdb::Options& options, std::shared_ptr<RocksDBErrorListener> errorListener)
636- : path(path), logId(logId), errorListener(errorListener) ,dbOptions(options), dataShardMap(nullptr , specialKeys.end) {
657+ ShardManager (std::string path,
658+ UID logId,
659+ const rocksdb::Options& options,
660+ std::shared_ptr<RocksDBErrorListener> errorListener)
661+ : path(path), logId(logId), dbOptions(options), cfOptions(getCFOptions()),
662+ dataShardMap (nullptr , specialKeys.end) {
637663 dbOptions.listeners .push_back (errorListener);
638- }
664+ }
639665
640666 ACTOR static Future<Void> shardMetricsLogger (std::shared_ptr<ShardedRocksDBState> rState,
641667 Future<Void> openFuture,
@@ -708,15 +734,14 @@ class ShardManager {
708734 if (name == METADATA_SHARD_ID) {
709735 foundMetadata = true ;
710736 }
711- descriptors.push_back (rocksdb::ColumnFamilyDescriptor{ name, rocksdb::ColumnFamilyOptions (dbOptions) } );
737+ descriptors.push_back (rocksdb::ColumnFamilyDescriptor ( name, cfOptions) );
712738 }
713739
714740 ASSERT (foundMetadata || descriptors.size () == 0 );
715741
716742 // Add default column family if it's a newly opened database.
717743 if (descriptors.size () == 0 ) {
718- descriptors.push_back (
719- rocksdb::ColumnFamilyDescriptor{ " default" , rocksdb::ColumnFamilyOptions (dbOptions) });
744+ descriptors.push_back (rocksdb::ColumnFamilyDescriptor (" default" , cfOptions));
720745 }
721746
722747 std::vector<rocksdb::ColumnFamilyHandle*> handles;
@@ -825,8 +850,7 @@ class ShardManager {
825850 physicalShards[defaultShard->id ] = defaultShard;
826851
827852 // Create metadata shard.
828- auto metadataShard =
829- std::make_shared<PhysicalShard>(db, METADATA_SHARD_ID, rocksdb::ColumnFamilyOptions (dbOptions));
853+ auto metadataShard = std::make_shared<PhysicalShard>(db, METADATA_SHARD_ID, cfOptions);
830854 metadataShard->init ();
831855 columnFamilyMap[metadataShard->cf ->GetID ()] = metadataShard->cf ;
832856 physicalShards[METADATA_SHARD_ID] = metadataShard;
@@ -897,8 +921,7 @@ class ShardManager {
897921 }
898922 }
899923
900- auto [it, inserted] = physicalShards.emplace (
901- id, std::make_shared<PhysicalShard>(db, id, rocksdb::ColumnFamilyOptions (dbOptions)));
924+ auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, cfOptions));
902925 std::shared_ptr<PhysicalShard>& shard = it->second ;
903926
904927 activePhysicalShardIds.emplace (id);
@@ -1221,7 +1244,7 @@ class ShardManager {
12211244 const std::string path;
12221245 const UID logId;
12231246 rocksdb::Options dbOptions;
1224- std::shared_ptr<RocksDBErrorListener> errorListener ;
1247+ rocksdb::ColumnFamilyOptions cfOptions ;
12251248 rocksdb::DB* db = nullptr ;
12261249 std::unordered_map<std::string, std::shared_ptr<PhysicalShard>> physicalShards;
12271250 std::unordered_set<std::string> activePhysicalShardIds;
@@ -1826,15 +1849,12 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
18261849 Optional<Future<Void>>& metrics;
18271850 const FlowLock* readLock;
18281851 const FlowLock* fetchLock;
1829- std::shared_ptr<RocksDBErrorListener> errorListener;
18301852
18311853 OpenAction (ShardManager* shardManager,
18321854 Optional<Future<Void>>& metrics,
18331855 const FlowLock* readLock,
1834- const FlowLock* fetchLock,
1835- std::shared_ptr<RocksDBErrorListener> errorListener)
1836- : shardManager(shardManager), metrics(metrics), readLock(readLock), fetchLock(fetchLock),
1837- errorListener (errorListener) {}
1856+ const FlowLock* fetchLock)
1857+ : shardManager(shardManager), metrics(metrics), readLock(readLock), fetchLock(fetchLock) {}
18381858
18391859 double getTimeEstimate () const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE ; }
18401860 };
@@ -2432,8 +2452,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
24322452 // of opening and closing multiple rocksdb instances, we reconcile the shard map using persist shard
24332453 // mapping data.
24342454 } else {
2435- auto a = std::make_unique<Writer::OpenAction>(
2436- &shardManager, metrics, &readSemaphore, &fetchSemaphore, errorListener);
2455+ auto a = std::make_unique<Writer::OpenAction>(&shardManager, metrics, &readSemaphore, &fetchSemaphore);
24372456 openFuture = a->done .getFuture ();
24382457 this ->metrics = ShardManager::shardMetricsLogger (this ->rState , openFuture, &shardManager) &&
24392458 rocksDBAggregatedMetricsLogger (this ->rState , openFuture, rocksDBMetrics, &shardManager);
@@ -2685,13 +2704,13 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
26852704
26862705 std::shared_ptr<ShardedRocksDBState> rState;
26872706 rocksdb::Options dbOptions;
2707+ std::shared_ptr<RocksDBErrorListener> errorListener;
26882708 ShardManager shardManager;
26892709 std::shared_ptr<RocksDBMetrics> rocksDBMetrics;
26902710 std::string path;
26912711 UID id;
26922712 Reference<IThreadPool> writeThread;
26932713 Reference<IThreadPool> readThreads;
2694- std::shared_ptr<RocksDBErrorListener> errorListener;
26952714 Future<Void> errorFuture;
26962715 Promise<Void> closePromise;
26972716 Future<Void> openFuture;
0 commit comments