|
10 | 10 | #include <rocksdb/listener.h> |
11 | 11 | #include <rocksdb/metadata.h> |
12 | 12 | #include <rocksdb/options.h> |
| 13 | +#include <rocksdb/advanced_options.h> |
13 | 14 | #include <rocksdb/slice_transform.h> |
14 | 15 | #include <rocksdb/statistics.h> |
15 | 16 | #include <rocksdb/table.h> |
@@ -230,16 +231,63 @@ Error statusToError(const rocksdb::Status& s) { |
230 | 231 | } |
231 | 232 | } |
232 | 233 |
|
| 234 | +rocksdb::CompactionPri getCompactionPriority() { |
| 235 | + switch (SERVER_KNOBS->ROCKSDB_COMPACTION_PRI) { |
| 236 | + case 0: |
| 237 | + return rocksdb::CompactionPri::kByCompensatedSize; |
| 238 | + case 1: |
| 239 | + return rocksdb::CompactionPri::kOldestLargestSeqFirst; |
| 240 | + case 2: |
| 241 | + return rocksdb::CompactionPri::kOldestSmallestSeqFirst; |
| 242 | + case 3: |
| 243 | + return rocksdb::CompactionPri::kMinOverlappingRatio; |
| 244 | + case 4: |
| 245 | + return rocksdb::CompactionPri::kRoundRobin; |
| 246 | + default: |
| 247 | + TraceEvent(SevWarn, "InvalidCompactionPriority").detail("KnobValue", SERVER_KNOBS->ROCKSDB_COMPACTION_PRI); |
| 248 | + return rocksdb::CompactionPri::kMinOverlappingRatio; |
| 249 | + } |
| 250 | +} |
| 251 | + |
233 | 252 | rocksdb::ColumnFamilyOptions getCFOptions() { |
234 | 253 | rocksdb::ColumnFamilyOptions options; |
235 | 254 | options.level_compaction_dynamic_level_bytes = SERVER_KNOBS->ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES; |
236 | 255 | options.OptimizeLevelStyleCompaction(SERVER_KNOBS->ROCKSDB_MEMTABLE_BYTES); |
237 | 256 | if (SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS > 0) { |
238 | 257 | options.periodic_compaction_seconds = SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS; |
239 | 258 | } |
| 259 | + |
| 260 | + options.disable_auto_compactions = SERVER_KNOBS->ROCKSDB_DISABLE_AUTO_COMPACTIONS; |
| 261 | + if (SERVER_KNOBS->SHARD_SOFT_PENDING_COMPACT_BYTES_LIMIT > 0) { |
| 262 | + options.soft_pending_compaction_bytes_limit = SERVER_KNOBS->SHARD_SOFT_PENDING_COMPACT_BYTES_LIMIT; |
| 263 | + } |
| 264 | + if (SERVER_KNOBS->SHARD_HARD_PENDING_COMPACT_BYTES_LIMIT > 0) { |
| 265 | + options.hard_pending_compaction_bytes_limit = SERVER_KNOBS->SHARD_HARD_PENDING_COMPACT_BYTES_LIMIT; |
| 266 | + } |
| 267 | + |
240 | 268 | // Compact sstables when there's too much deleted stuff. |
241 | 269 | options.table_properties_collector_factories = { rocksdb::NewCompactOnDeletionCollectorFactory(128, 1) }; |
242 | 270 |
|
| 271 | + // Compact sstables when there's too much deleted stuff. |
| 272 | + if (SERVER_KNOBS->ROCKSDB_ENABLE_COMPACT_ON_DELETION) { |
| 273 | + // Creates a factory of a table property collector that marks a SST |
| 274 | + // file as need-compaction when it observe at least "D" deletion |
| 275 | + // entries in any "N" consecutive entries, or the ratio of tombstone |
| 276 | + // entries >= deletion_ratio. |
| 277 | + |
| 278 | + // @param sliding_window_size "N". Note that this number will be |
| 279 | + // round up to the smallest multiple of 128 that is no less |
| 280 | + // than the specified size. |
| 281 | + // @param deletion_trigger "D". Note that even when "N" is changed, |
| 282 | + // the specified number for "D" will not be changed. |
| 283 | + // @param deletion_ratio, if <= 0 or > 1, disable triggering compaction |
| 284 | + // based on deletion ratio. Disabled by default. |
| 285 | + options.table_properties_collector_factories = { rocksdb::NewCompactOnDeletionCollectorFactory( |
| 286 | + SERVER_KNOBS->ROCKSDB_CDCF_SLIDING_WINDOW_SIZE, |
| 287 | + SERVER_KNOBS->ROCKSDB_CDCF_DELETION_TRIGGER, |
| 288 | + SERVER_KNOBS->ROCKSDB_CDCF_DELETION_RATIO) }; |
| 289 | + } |
| 290 | + |
243 | 291 | rocksdb::BlockBasedTableOptions bbOpts; |
244 | 292 | // TODO: Add a knob for the block cache size. (Default is 8 MB) |
245 | 293 | if (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0) { |
@@ -273,6 +321,8 @@ rocksdb::ColumnFamilyOptions getCFOptions() { |
273 | 321 |
|
274 | 322 | options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbOpts)); |
275 | 323 |
|
| 324 | + options.compaction_pri = getCompactionPriority(); |
| 325 | + |
276 | 326 | return options; |
277 | 327 | } |
278 | 328 |
|
@@ -638,6 +688,7 @@ class ShardManager { |
638 | 688 | } |
639 | 689 |
|
640 | 690 | rocksdb::Status init() { |
| 691 | + double start = now(); |
641 | 692 | // Open instance. |
642 | 693 | TraceEvent(SevInfo, "ShardedRocksShardManagerInitBegin", this->logId).detail("DataPath", path); |
643 | 694 | if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) { |
@@ -801,7 +852,9 @@ class ShardManager { |
801 | 852 | if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) { |
802 | 853 | dbOptions.rate_limiter->SetBytesPerSecond(SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC); |
803 | 854 | } |
804 | | - TraceEvent(SevInfo, "ShardedRocksShardManagerInitEnd", this->logId).detail("DataPath", path); |
| 855 | + TraceEvent(SevInfo, "ShardedRocksShardManagerInitEnd", this->logId) |
| 856 | + .detail("DataPath", path) |
| 857 | + .detail("Duration", now() - start); |
805 | 858 | return status; |
806 | 859 | } |
807 | 860 |
|
@@ -1003,6 +1056,8 @@ class ShardManager { |
1003 | 1056 | TraceEvent(SevDebug, "ShardedRocksDB").detail("ClearNonExistentRange", it.range()); |
1004 | 1057 | continue; |
1005 | 1058 | } |
| 1059 | + |
| 1060 | + // Do not compact clear range. |
1006 | 1061 | writeBatch->DeleteRange(it.value()->physicalShard->cf, toSlice(range.begin), toSlice(range.end)); |
1007 | 1062 | dirtyShards->insert(it.value()->physicalShard); |
1008 | 1063 | } |
|
0 commit comments