@@ -399,6 +399,14 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel
399399 return Void ();
400400}
401401
402+ ACTOR static Future<ResolveTransactionBatchReply> trackResolutionMetrics (Reference<Histogram> dist,
403+ Future<ResolveTransactionBatchReply> in) {
404+ state double startTime = now ();
405+ ResolveTransactionBatchReply reply = wait (in);
406+ dist->sampleSeconds (now () - startTime);
407+ return reply;
408+ }
409+
402410namespace CommitBatch {
403411
404412struct CommitBatchContext {
@@ -579,6 +587,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
579587 TEST (pProxyCommitData->latestLocalCommitBatchResolving .get () < localBatchNumber - 1 ); // Wait for local batch
580588 wait (pProxyCommitData->latestLocalCommitBatchResolving .whenAtLeast (localBatchNumber - 1 ));
581589 double queuingDelay = g_network->now () - timeStart;
590+ pProxyCommitData->stats .commitBatchQueuingDist ->sampleSeconds (queuingDelay);
582591 if ((queuingDelay > (double )SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS / SERVER_KNOBS->VERSIONS_PER_SECOND ||
583592 (g_network->isSimulated () && BUGGIFY_WITH_PROB (0.01 ))) &&
584593 SERVER_KNOBS->PROXY_REJECT_BATCH_QUEUED_TOO_LONG && canReject (trs)) {
@@ -619,13 +628,15 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
619628 pProxyCommitData->commitVersionRequestNumber ++,
620629 pProxyCommitData->mostRecentProcessedRequestNumber ,
621630 pProxyCommitData->dbgid );
631+ state double beforeGettingCommitVersion = now ();
622632 GetCommitVersionReply versionReply = wait (brokenPromiseToNever (
623633 pProxyCommitData->master .getCommitVersion .getReply (req, TaskPriority::ProxyMasterVersionReply)));
624634
625635 pProxyCommitData->mostRecentProcessedRequestNumber = versionReply.requestNum ;
626636
627637 pProxyCommitData->stats .txnCommitVersionAssigned += trs.size ();
628638 pProxyCommitData->stats .lastCommitVersionAssigned = versionReply.version ;
639+ pProxyCommitData->stats .getCommitVersionDist ->sampleSeconds (now () - beforeGettingCommitVersion);
629640
630641 self->commitVersion = versionReply.version ;
631642 self->prevVersion = versionReply.prevVersion ;
@@ -646,6 +657,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
646657}
647658
648659ACTOR Future<Void> getResolution (CommitBatchContext* self) {
660+ state double resolutionStart = now ();
649661 // Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with
650662 // resolution processing but is still using CPU
651663 ProxyCommitData* pProxyCommitData = self->pProxyCommitData ;
@@ -674,8 +686,9 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
674686 std::vector<Future<ResolveTransactionBatchReply>> replies;
675687 for (int r = 0 ; r < pProxyCommitData->resolvers .size (); r++) {
676688 requests.requests [r].debugID = self->debugID ;
677- replies.push_back (brokenPromiseToNever (
678- pProxyCommitData->resolvers [r].resolve .getReply (requests.requests [r], TaskPriority::ProxyResolverReply)));
689+ replies.push_back (trackResolutionMetrics (pProxyCommitData->stats .resolverDist [r],
690+ brokenPromiseToNever (pProxyCommitData->resolvers [r].resolve .getReply (
691+ requests.requests [r], TaskPriority::ProxyResolverReply))));
679692 }
680693
681694 self->transactionResolverMap .swap (requests.transactionResolverMap );
@@ -700,6 +713,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
700713 std::vector<ResolveTransactionBatchReply> resolutionResp = wait (getAll (replies));
701714 self->resolution .swap (*const_cast <std::vector<ResolveTransactionBatchReply>*>(&resolutionResp));
702715
716+ self->pProxyCommitData ->stats .resolutionDist ->sampleSeconds (now () - resolutionStart);
703717 if (self->debugID .present ()) {
704718 g_traceBatch.addEvent (
705719 " CommitDebug" , self->debugID .get ().first (), " CommitProxyServer.commitBatch.AfterResolution" );
@@ -1055,6 +1069,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
10551069}
10561070
10571071ACTOR Future<Void> postResolution (CommitBatchContext* self) {
1072+ state double postResolutionStart = now ();
10581073 state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData ;
10591074 state std::vector<CommitTransactionRequest>& trs = self->trs ;
10601075 state const int64_t localBatchNumber = self->localBatchNumber ;
@@ -1064,6 +1079,8 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
10641079 bool queuedCommits = pProxyCommitData->latestLocalCommitBatchLogging .get () < localBatchNumber - 1 ;
10651080 TEST (queuedCommits); // Queuing post-resolution commit processing
10661081 wait (pProxyCommitData->latestLocalCommitBatchLogging .whenAtLeast (localBatchNumber - 1 ));
1082+ state double postResolutionQueuing = now ();
1083+ pProxyCommitData->stats .postResolutionDist ->sampleSeconds (postResolutionQueuing - postResolutionStart);
10671084 wait (yield (TaskPriority::ProxyCommitYield1));
10681085
10691086 self->computeStart = g_network->timer ();
@@ -1212,10 +1229,12 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
12121229 1e9 * pProxyCommitData->commitComputePerOperation [self->latencyBucket ]);
12131230 }
12141231
1232+ pProxyCommitData->stats .processingMutationDist ->sampleSeconds (now () - postResolutionQueuing);
12151233 return Void ();
12161234}
12171235
12181236ACTOR Future<Void> transactionLogging (CommitBatchContext* self) {
1237+ state double tLoggingStart = now ();
12191238 state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData ;
12201239 state Span span (" MP:transactionLogging" _loc, self->span .context );
12211240
@@ -1249,11 +1268,12 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
12491268 pProxyCommitData->txsPopVersions .emplace_back (self->commitVersion , self->msg .popTo );
12501269 }
12511270 pProxyCommitData->logSystem ->popTxs (self->msg .popTo );
1252-
1271+ pProxyCommitData-> stats . tlogLoggingDist -> sampleSeconds ( now () - tLoggingStart);
12531272 return Void ();
12541273}
12551274
12561275ACTOR Future<Void> reply (CommitBatchContext* self) {
1276+ state double replyStart = now ();
12571277 state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData ;
12581278 state Span span (" MP:reply" _loc, self->span .context );
12591279
@@ -1385,7 +1405,7 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
13851405 pProxyCommitData->commitBatchesMemBytesCount -= self->currentBatchMemBytesCount ;
13861406 ASSERT_ABORT (pProxyCommitData->commitBatchesMemBytesCount >= 0 );
13871407 wait (self->releaseFuture );
1388-
1408+ pProxyCommitData-> stats . replyCommitDist -> sampleSeconds ( now () - replyStart);
13891409 return Void ();
13901410}
13911411
@@ -1856,7 +1876,10 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
18561876
18571877 commitData.resolvers = commitData.db ->get ().resolvers ;
18581878 ASSERT (commitData.resolvers .size () != 0 );
1859-
1879+ for (int i = 0 ; i < commitData.resolvers .size (); ++i) {
1880+ commitData.stats .resolverDist .push_back (Histogram::getHistogram (
1881+ LiteralStringRef (" CommitProxy" ), " ToResolver_" + commitData.resolvers [i].id ().toString (), Histogram::Unit::microseconds));
1882+ }
18601883 auto rs = commitData.keyResolvers .modify (allKeys);
18611884 for (auto r = rs.begin (); r != rs.end (); ++r)
18621885 r->value ().emplace_back (0 , 0 );
0 commit comments