@@ -321,6 +321,7 @@ struct TLogData : NonCopyable {
321321 int64_t instanceID;
322322 int64_t bytesInput;
323323 int64_t bytesDurable;
324+ int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling.
324325 int64_t overheadBytesInput;
325326 int64_t overheadBytesDurable;
326327
@@ -348,7 +349,7 @@ struct TLogData : NonCopyable {
348349 : dbgid(dbgid), instanceID(deterministicRandom()->randomUniqueID ().first()),
349350 persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
350351 dbInfo(dbInfo), degraded(degraded), queueCommitBegin(0 ), queueCommitEnd(0 ),
351- diskQueueCommitBytes(0 ), largeDiskQueueCommitBytes(false ), bytesInput(0 ), bytesDurable(0 ), overheadBytesInput(0 ), overheadBytesDurable(0 ),
352+ diskQueueCommitBytes(0 ), largeDiskQueueCommitBytes(false ), bytesInput(0 ), bytesDurable(0 ), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0 ), overheadBytesDurable(0 ),
352353 peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
353354 concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS),
354355 ignorePopRequest(false ), ignorePopDeadline(), ignorePopUid(), dataFolder(folder), toBePopped()
@@ -978,7 +979,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
978979 state FlowLock::Releaser commitLockReleaser;
979980
980981 if (logData->stopped ) {
981- if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS-> TLOG_SPILL_THRESHOLD ) {
982+ if (self->bytesInput - self->bytesDurable >= self-> targetVolatileBytes ) {
982983 while (logData->persistentDataDurableVersion != logData->version .get ()) {
983984 totalSize = 0 ;
984985 Map<Version, std::pair<int ,int >>::iterator sizeItr = logData->version_sizes .begin ();
@@ -1026,10 +1027,12 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
10261027 if (logData->version_sizes .empty ()) {
10271028 nextVersion = logData->version .get ();
10281029 } else {
1030+ // Double check that a running TLog wasn't wrongly affected by spilling locked SharedTLogs.
1031+ ASSERT_WE_THINK (self->targetVolatileBytes == SERVER_KNOBS->TLOG_SPILL_THRESHOLD );
10291032 Map<Version, std::pair<int ,int >>::iterator sizeItr = logData->version_sizes .begin ();
10301033 while ( totalSize < SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT &&
10311034 sizeItr != logData->version_sizes .end ()
1032- && (logData->bytesInput .getValue () - logData->bytesDurable .getValue () - totalSize >= SERVER_KNOBS-> TLOG_SPILL_THRESHOLD || sizeItr->value .first == 0 ) )
1035+ && (logData->bytesInput .getValue () - logData->bytesDurable .getValue () - totalSize >= self-> targetVolatileBytes || sizeItr->value .first == 0 ) )
10331036 {
10341037 totalSize += sizeItr->value .first + sizeItr->value .second ;
10351038 ++sizeItr;
@@ -2600,21 +2603,10 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
26002603 }
26012604}
26022605
2603- // Start the tLog role for a worker
2604- ACTOR Future<Void> tLogStart ( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
2605- state TLogInterface recruited (self->dbgid , locality);
2606- recruited.initEndpoints ();
2607-
2608- DUMPTOKEN ( recruited.peekMessages );
2609- DUMPTOKEN ( recruited.popMessages );
2610- DUMPTOKEN ( recruited.commit );
2611- DUMPTOKEN ( recruited.lock );
2612- DUMPTOKEN ( recruited.getQueuingMetrics );
2613- DUMPTOKEN ( recruited.confirmRunning );
2614-
2606+ void stopAllTLogs ( TLogData* self, UID newLogId ) {
26152607 for (auto it : self->id_data ) {
26162608 if ( !it.second ->stopped ) {
2617- TraceEvent (" TLogStoppedByNewRecruitment" , self->dbgid ).detail (" LogId" , it.second ->logId ).detail (" StoppedId" , it.first .toString ()).detail (" RecruitedId" , recruited. id () ).detail (" EndEpoch" , it.second ->logSystem ->get ().getPtr () != 0 );
2609+ TraceEvent (" TLogStoppedByNewRecruitment" , self->dbgid ).detail (" LogId" , it.second ->logId ).detail (" StoppedId" , it.first .toString ()).detail (" RecruitedId" , newLogId ).detail (" EndEpoch" , it.second ->logSystem ->get ().getPtr () != 0 );
26182610 if (!it.second ->isPrimary && it.second ->logSystem ->get ()) {
26192611 it.second ->removed = it.second ->removed && it.second ->logSystem ->get ()->endEpoch ();
26202612 }
@@ -2628,6 +2620,21 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
26282620 }
26292621 it.second ->stopCommit .trigger ();
26302622 }
2623+ }
2624+
2625+ // Start the tLog role for a worker
2626+ ACTOR Future<Void> tLogStart ( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
2627+ state TLogInterface recruited (self->dbgid , locality);
2628+ recruited.initEndpoints ();
2629+
2630+ DUMPTOKEN ( recruited.peekMessages );
2631+ DUMPTOKEN ( recruited.popMessages );
2632+ DUMPTOKEN ( recruited.commit );
2633+ DUMPTOKEN ( recruited.lock );
2634+ DUMPTOKEN ( recruited.getQueuingMetrics );
2635+ DUMPTOKEN ( recruited.confirmRunning );
2636+
2637+ stopAllTLogs (self, recruited.id ());
26312638
26322639 state Reference<LogData> logData = Reference<LogData>( new LogData (self, recruited, req.remoteTag , req.isPrimary , req.logRouterTags , req.txsTags , req.recruitmentID , currentProtocolVersion, req.spillType , req.allTags ) );
26332640 self->id_data [recruited.id ()] = logData;
@@ -2744,8 +2751,21 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
27442751 return Void ();
27452752}
27462753
2754+ ACTOR Future<Void> startSpillingInTenSeconds (TLogData* self, UID tlogId, Reference<AsyncVar<UID>> activeSharedTLog) {
2755+ wait (delay (10 ));
2756+ if (activeSharedTLog->get () != tlogId) {
2757+ // TODO: This should fully spill, but currently doing so will cause us to no longer update poppedVersion
2758+ // and QuietDatabase will hang thinking our TLog is behind.
2759+ TraceEvent (" SharedTLogBeginSpilling" , self->dbgid ).detail (" NowActive" , activeSharedTLog->get ());
2760+ self->targetVolatileBytes = SERVER_KNOBS->REFERENCE_SPILL_UPDATE_STORAGE_BYTE_LIMIT * 2 ;
2761+ } else {
2762+ TraceEvent (" SharedTLogSkipSpilling" , self->dbgid ).detail (" NowActive" , activeSharedTLog->get ());
2763+ }
2764+ return Void ();
2765+ }
2766+
27472767// New tLog (if !recoverFrom.size()) or restore from network
2748- ACTOR Future<Void> tLog ( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool >> degraded ) {
2768+ ACTOR Future<Void> tLog ( IKeyValueStore* persistentData, IDiskQueue* persistentQueue, Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk, Promise<Void> oldLog, Promise<Void> recovered, std::string folder, Reference<AsyncVar<bool >> degraded, Reference<AsyncVar<UID>> activeSharedTLog ) {
27492769 state TLogData self ( tlogId, persistentData, persistentQueue, db, degraded, folder );
27502770 state Future<Void> error = actorCollection ( self.sharedActors .getFuture () );
27512771
@@ -2778,6 +2798,16 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
27782798 }
27792799 }
27802800 when ( wait ( error ) ) { throw internal_error (); }
2801+ when ( wait ( activeSharedTLog->onChange () ) ) {
2802+ if (activeSharedTLog->get () == tlogId) {
2803+ TraceEvent (" SharedTLogNowActive" , self.dbgid ).detail (" NowActive" , activeSharedTLog->get ());
2804+ self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD ;
2805+ } else {
2806+ stopAllTLogs (&self, tlogId);
2807+ TraceEvent (" SharedTLogQueueSpilling" , self.dbgid ).detail (" NowActive" , activeSharedTLog->get ());
2808+ self.sharedActors .send ( startSpillingInTenSeconds (&self, tlogId, activeSharedTLog) );
2809+ }
2810+ }
27812811 }
27822812 }
27832813 } catch (Error& e) {
0 commit comments