Skip to content

Commit fef89aa

Browse files
authored
Merge pull request #2213 from alexmiller-apple/new-log-spill-default
Stop and spill TLogs when a new TLog is recruited in a different SharedTLog
2 parents 6ff368a + 77c72de commit fef89aa

File tree

5 files changed

+128
-43
lines changed

5 files changed

+128
-43
lines changed

fdbserver/OldTLogServer_6_0.actor.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ struct TLogData : NonCopyable {
262262
int64_t instanceID;
263263
int64_t bytesInput;
264264
int64_t bytesDurable;
265+
int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
265266
int64_t overheadBytesInput;
266267
int64_t overheadBytesDurable;
267268

@@ -288,7 +289,7 @@ struct TLogData : NonCopyable {
288289
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
289290
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
290291
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
291-
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0),
292+
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
292293
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
293294
ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
294295
{
@@ -697,7 +698,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
697698
state FlowLock::Releaser commitLockReleaser;
698699

699700
if(logData->stopped) {
700-
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) {
701+
if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) {
701702
while(logData->persistentDataDurableVersion != logData->version.get()) {
702703
totalSize = 0;
703704
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
@@ -742,7 +743,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
742743
} else {
743744
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
744745
while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end()
745-
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) )
746+
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= self->targetVolatileBytes || sizeItr->value.first == 0) )
746747
{
747748
totalSize += sizeItr->value.first + sizeItr->value.second;
748749
++sizeItr;
@@ -2301,8 +2302,18 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
23012302
return Void();
23022303
}
23032304

2305+
ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Reference<AsyncVar<UID>> activeSharedTLog) {
2306+
wait(delay(10));
2307+
if (activeSharedTLog->get() != tlogId) {
2308+
// TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion
2309+
// and QuietDatabase will hang thinking our TLog is behind.
2310+
self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2;
2311+
}
2312+
return Void();
2313+
}
2314+
23042315
// New tLog (if !recoverFrom.size()) or restore from network
2305-
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded) {
2316+
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog) {
23062317
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
23072318
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
23082319

@@ -2335,6 +2346,13 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
23352346
}
23362347
}
23372348
when ( wait( error ) ) { throw internal_error(); }
2349+
when ( wait( activeSharedTLog->onChange() ) ) {
2350+
if (activeSharedTLog->get() == tlogId) {
2351+
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
2352+
} else {
2353+
self.sharedActors.send( startSpillingInTenSeconds(&self, tlogId, activeSharedTLog) );
2354+
}
2355+
}
23382356
}
23392357
}
23402358
} catch (Error& e) {

fdbserver/OldTLogServer_6_2.actor.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ struct TLogData : NonCopyable {
323323
int64_t instanceID;
324324
int64_t bytesInput;
325325
int64_t bytesDurable;
326+
int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
326327
int64_t overheadBytesInput;
327328
int64_t overheadBytesDurable;
328329

@@ -350,7 +351,7 @@ struct TLogData : NonCopyable {
350351
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
351352
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
352353
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
353-
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0),
354+
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
354355
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
355356
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
356357
ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
@@ -963,7 +964,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
963964
state FlowLock::Releaser commitLockReleaser;
964965

965966
if(logData->stopped) {
966-
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) {
967+
if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) {
967968
while(logData->persistentDataDurableVersion != logData->version.get()) {
968969
totalSize = 0;
969970
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
@@ -1014,7 +1015,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
10141015
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
10151016
while( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT &&
10161017
sizeItr != logData->version_sizes.end()
1017-
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) )
1018+
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= self->targetVolatileBytes || sizeItr->value.first == 0) )
10181019
{
10191020
totalSize += sizeItr->value.first + sizeItr->value.second;
10201021
++sizeItr;
@@ -2726,8 +2727,18 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
27262727
return Void();
27272728
}
27282729

2730+
ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Reference<AsyncVar<UID>> activeSharedTLog) {
2731+
wait(delay(10));
2732+
if (activeSharedTLog->get() != tlogId) {
2733+
// TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion
2734+
// and QuietDatabase will hang thinking our TLog is behind.
2735+
self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2;
2736+
}
2737+
return Void();
2738+
}
2739+
27292740
// New tLog (if !recoverFrom.size()) or restore from network
2730-
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded ) {
2741+
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) {
27312742
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
27322743
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
27332744

@@ -2760,6 +2771,13 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
27602771
}
27612772
}
27622773
when ( wait( error ) ) { throw internal_error(); }
2774+
when ( wait( activeSharedTLog->onChange() ) ) {
2775+
if (activeSharedTLog->get() == tlogId) {
2776+
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
2777+
} else {
2778+
self.sharedActors.send( startSpillingInTenSeconds(&self, tlogId, activeSharedTLog) );
2779+
}
2780+
}
27632781
}
27642782
}
27652783
} catch (Error& e) {

fdbserver/TLogServer.actor.cpp

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ struct TLogData : NonCopyable {
321321
int64_t instanceID;
322322
int64_t bytesInput;
323323
int64_t bytesDurable;
324+
int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
324325
int64_t overheadBytesInput;
325326
int64_t overheadBytesDurable;
326327

@@ -348,7 +349,7 @@ struct TLogData : NonCopyable {
348349
: dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID().first()),
349350
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
350351
dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0), queueCommitEnd(0),
351-
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), overheadBytesInput(0), overheadBytesDurable(0),
352+
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), bytesInput(0), bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
352353
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
353354
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
354355
ignorePopRequest(false), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
@@ -978,7 +979,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
978979
state FlowLock::Releaser commitLockReleaser;
979980

980981
if(logData->stopped) {
981-
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) {
982+
if (self->bytesInput - self->bytesDurable >= self->targetVolatileBytes) {
982983
while(logData->persistentDataDurableVersion != logData->version.get()) {
983984
totalSize = 0;
984985
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
@@ -1026,10 +1027,12 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
10261027
if(logData->version_sizes.empty()) {
10271028
nextVersion = logData->version.get();
10281029
} else {
1030+
// Double check that a running TLog wasn't wrongly affected by spilling locked SharedTLogs.
1031+
ASSERT_WE_THINK(self->targetVolatileBytes == SERVER_KNOBS->TLOG_SPILL_THRESHOLD);
10291032
Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
10301033
while( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT &&
10311034
sizeItr != logData->version_sizes.end()
1032-
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) )
1035+
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= self->targetVolatileBytes || sizeItr->value.first == 0) )
10331036
{
10341037
totalSize += sizeItr->value.first + sizeItr->value.second;
10351038
++sizeItr;
@@ -2600,21 +2603,10 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
26002603
}
26012604
}
26022605

2603-
// Start the tLog role for a worker
2604-
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
2605-
state TLogInterface recruited(self->dbgid, locality);
2606-
recruited.initEndpoints();
2607-
2608-
DUMPTOKEN( recruited.peekMessages );
2609-
DUMPTOKEN( recruited.popMessages );
2610-
DUMPTOKEN( recruited.commit );
2611-
DUMPTOKEN( recruited.lock );
2612-
DUMPTOKEN( recruited.getQueuingMetrics );
2613-
DUMPTOKEN( recruited.confirmRunning );
2614-
2606+
void stopAllTLogs( TLogData* self, UID newLogId ) {
26152607
for(auto it : self->id_data) {
26162608
if( !it.second->stopped ) {
2617-
TraceEvent("TLogStoppedByNewRecruitment", self->dbgid).detail("LogId", it.second->logId).detail("StoppedId", it.first.toString()).detail("RecruitedId", recruited.id()).detail("EndEpoch", it.second->logSystem->get().getPtr() != 0);
2609+
TraceEvent("TLogStoppedByNewRecruitment", self->dbgid).detail("LogId", it.second->logId).detail("StoppedId", it.first.toString()).detail("RecruitedId", newLogId).detail("EndEpoch", it.second->logSystem->get().getPtr() != 0);
26182610
if(!it.second->isPrimary && it.second->logSystem->get()) {
26192611
it.second->removed = it.second->removed && it.second->logSystem->get()->endEpoch();
26202612
}
@@ -2628,6 +2620,21 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
26282620
}
26292621
it.second->stopCommit.trigger();
26302622
}
2623+
}
2624+
2625+
// Start the tLog role for a worker
2626+
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
2627+
state TLogInterface recruited(self->dbgid, locality);
2628+
recruited.initEndpoints();
2629+
2630+
DUMPTOKEN( recruited.peekMessages );
2631+
DUMPTOKEN( recruited.popMessages );
2632+
DUMPTOKEN( recruited.commit );
2633+
DUMPTOKEN( recruited.lock );
2634+
DUMPTOKEN( recruited.getQueuingMetrics );
2635+
DUMPTOKEN( recruited.confirmRunning );
2636+
2637+
stopAllTLogs(self, recruited.id());
26312638

26322639
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited, req.remoteTag, req.isPrimary, req.logRouterTags, req.txsTags, req.recruitmentID, currentProtocolVersion, req.spillType, req.allTags) );
26332640
self->id_data[recruited.id()] = logData;
@@ -2744,8 +2751,21 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
27442751
return Void();
27452752
}
27462753

2754+
ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Reference<AsyncVar<UID>> activeSharedTLog) {
2755+
wait(delay(10));
2756+
if (activeSharedTLog->get() != tlogId) {
2757+
// TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion
2758+
// and QuietDatabase will hang thinking our TLog is behind.
2759+
TraceEvent("SharedTLogBeginSpilling", self->dbgid).detail("NowActive", activeSharedTLog->get());
2760+
self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2;
2761+
} else {
2762+
TraceEvent("SharedTLogSkipSpilling", self->dbgid).detail("NowActive", activeSharedTLog->get());
2763+
}
2764+
return Void();
2765+
}
2766+
27472767
// New tLog (if !recoverFrom.size()) or restore from network
2748-
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded ) {
2768+
ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) {
27492769
state TLogData self( tlogId, persistentData, persistentQueue, db, degraded, folder );
27502770
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
27512771

@@ -2778,6 +2798,16 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
27782798
}
27792799
}
27802800
when ( wait( error ) ) { throw internal_error(); }
2801+
when ( wait( activeSharedTLog->onChange() ) ) {
2802+
if (activeSharedTLog->get() == tlogId) {
2803+
TraceEvent("SharedTLogNowActive", self.dbgid).detail("NowActive", activeSharedTLog->get());
2804+
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
2805+
} else {
2806+
stopAllTLogs(&self, tlogId);
2807+
TraceEvent("SharedTLogQueueSpilling", self.dbgid).detail("NowActive", activeSharedTLog->get());
2808+
self.sharedActors.send( startSpillingInTenSeconds(&self, tlogId, activeSharedTLog) );
2809+
}
2810+
}
27812811
}
27822812
}
27832813
} catch (Error& e) {

fdbserver/WorkerInterface.actor.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,9 @@ ACTOR Future<Void> masterProxyServer(MasterProxyInterface proxy, InitializeMaste
443443
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
444444
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
445445
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
446-
Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded); // changes tli->id() to be the recovered ID
446+
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
447+
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
448+
447449
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
448450
Reference<ClusterConnectionFile> ccf, LocalityData locality,
449451
Reference<AsyncVar<ServerDBInfo>> dbInfo);
@@ -465,13 +467,15 @@ namespace oldTLog_6_0 {
465467
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
466468
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
467469
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
468-
Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded);
470+
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
471+
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
469472
}
470473
namespace oldTLog_6_2 {
471474
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
472475
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
473476
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
474-
Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool>> degraded);
477+
Promise<Void> oldLog, Promise<Void> recovered, std::string folder,
478+
Reference<AsyncVar<bool>> degraded, Reference<AsyncVar<UID>> activeSharedTLog);
475479
}
476480

477481
typedef decltype(&tLog) TLogFn;

0 commit comments

Comments
 (0)