Skip to content

Commit 8609b45

Browse files
authored
Add histograms to CommitProxyServer. (apple#5299)
1 parent f6beda6 commit 8609b45

File tree

4 files changed

+72
-6
lines changed

4 files changed

+72
-6
lines changed

fdbserver/CommitProxyServer.actor.cpp

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,14 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel
399399
return Void();
400400
}
401401

402+
ACTOR static Future<ResolveTransactionBatchReply> trackResolutionMetrics(Reference<Histogram> dist,
403+
Future<ResolveTransactionBatchReply> in) {
404+
state double startTime = now();
405+
ResolveTransactionBatchReply reply = wait(in);
406+
dist->sampleSeconds(now() - startTime);
407+
return reply;
408+
}
409+
402410
namespace CommitBatch {
403411

404412
struct CommitBatchContext {
@@ -579,6 +587,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
579587
TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1); // Wait for local batch
580588
wait(pProxyCommitData->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber - 1));
581589
double queuingDelay = g_network->now() - timeStart;
590+
pProxyCommitData->stats.commitBatchQueuingDist->sampleSeconds(queuingDelay);
582591
if ((queuingDelay > (double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS / SERVER_KNOBS->VERSIONS_PER_SECOND ||
583592
(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01))) &&
584593
SERVER_KNOBS->PROXY_REJECT_BATCH_QUEUED_TOO_LONG && canReject(trs)) {
@@ -619,13 +628,15 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
619628
pProxyCommitData->commitVersionRequestNumber++,
620629
pProxyCommitData->mostRecentProcessedRequestNumber,
621630
pProxyCommitData->dbgid);
631+
state double beforeGettingCommitVersion = now();
622632
GetCommitVersionReply versionReply = wait(brokenPromiseToNever(
623633
pProxyCommitData->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)));
624634

625635
pProxyCommitData->mostRecentProcessedRequestNumber = versionReply.requestNum;
626636

627637
pProxyCommitData->stats.txnCommitVersionAssigned += trs.size();
628638
pProxyCommitData->stats.lastCommitVersionAssigned = versionReply.version;
639+
pProxyCommitData->stats.getCommitVersionDist->sampleSeconds(now() - beforeGettingCommitVersion);
629640

630641
self->commitVersion = versionReply.version;
631642
self->prevVersion = versionReply.prevVersion;
@@ -646,6 +657,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
646657
}
647658

648659
ACTOR Future<Void> getResolution(CommitBatchContext* self) {
660+
state double resolutionStart = now();
649661
// Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with
650662
// resolution processing but is still using CPU
651663
ProxyCommitData* pProxyCommitData = self->pProxyCommitData;
@@ -674,8 +686,9 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
674686
std::vector<Future<ResolveTransactionBatchReply>> replies;
675687
for (int r = 0; r < pProxyCommitData->resolvers.size(); r++) {
676688
requests.requests[r].debugID = self->debugID;
677-
replies.push_back(brokenPromiseToNever(
678-
pProxyCommitData->resolvers[r].resolve.getReply(requests.requests[r], TaskPriority::ProxyResolverReply)));
689+
replies.push_back(trackResolutionMetrics(pProxyCommitData->stats.resolverDist[r],
690+
brokenPromiseToNever(pProxyCommitData->resolvers[r].resolve.getReply(
691+
requests.requests[r], TaskPriority::ProxyResolverReply))));
679692
}
680693

681694
self->transactionResolverMap.swap(requests.transactionResolverMap);
@@ -700,6 +713,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
700713
std::vector<ResolveTransactionBatchReply> resolutionResp = wait(getAll(replies));
701714
self->resolution.swap(*const_cast<std::vector<ResolveTransactionBatchReply>*>(&resolutionResp));
702715

716+
self->pProxyCommitData->stats.resolutionDist->sampleSeconds(now() - resolutionStart);
703717
if (self->debugID.present()) {
704718
g_traceBatch.addEvent(
705719
"CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterResolution");
@@ -1055,6 +1069,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
10551069
}
10561070

10571071
ACTOR Future<Void> postResolution(CommitBatchContext* self) {
1072+
state double postResolutionStart = now();
10581073
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
10591074
state std::vector<CommitTransactionRequest>& trs = self->trs;
10601075
state const int64_t localBatchNumber = self->localBatchNumber;
@@ -1064,6 +1079,8 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
10641079
bool queuedCommits = pProxyCommitData->latestLocalCommitBatchLogging.get() < localBatchNumber - 1;
10651080
TEST(queuedCommits); // Queuing post-resolution commit processing
10661081
wait(pProxyCommitData->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber - 1));
1082+
state double postResolutionQueuing = now();
1083+
pProxyCommitData->stats.postResolutionDist->sampleSeconds(postResolutionQueuing - postResolutionStart);
10671084
wait(yield(TaskPriority::ProxyCommitYield1));
10681085

10691086
self->computeStart = g_network->timer();
@@ -1212,10 +1229,12 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
12121229
1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
12131230
}
12141231

1232+
pProxyCommitData->stats.processingMutationDist->sampleSeconds(now() - postResolutionQueuing);
12151233
return Void();
12161234
}
12171235

12181236
ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
1237+
state double tLoggingStart = now();
12191238
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
12201239
state Span span("MP:transactionLogging"_loc, self->span.context);
12211240

@@ -1249,11 +1268,12 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
12491268
pProxyCommitData->txsPopVersions.emplace_back(self->commitVersion, self->msg.popTo);
12501269
}
12511270
pProxyCommitData->logSystem->popTxs(self->msg.popTo);
1252-
1271+
pProxyCommitData->stats.tlogLoggingDist->sampleSeconds(now() - tLoggingStart);
12531272
return Void();
12541273
}
12551274

12561275
ACTOR Future<Void> reply(CommitBatchContext* self) {
1276+
state double replyStart = now();
12571277
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
12581278
state Span span("MP:reply"_loc, self->span.context);
12591279

@@ -1385,7 +1405,7 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
13851405
pProxyCommitData->commitBatchesMemBytesCount -= self->currentBatchMemBytesCount;
13861406
ASSERT_ABORT(pProxyCommitData->commitBatchesMemBytesCount >= 0);
13871407
wait(self->releaseFuture);
1388-
1408+
pProxyCommitData->stats.replyCommitDist->sampleSeconds(now() - replyStart);
13891409
return Void();
13901410
}
13911411

@@ -1856,7 +1876,10 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
18561876

18571877
commitData.resolvers = commitData.db->get().resolvers;
18581878
ASSERT(commitData.resolvers.size() != 0);
1859-
1879+
for (int i = 0; i < commitData.resolvers.size(); ++i) {
1880+
commitData.stats.resolverDist.push_back(Histogram::getHistogram(
1881+
LiteralStringRef("CommitProxy"), "ToResolver_" + commitData.resolvers[i].id().toString(), Histogram::Unit::microseconds));
1882+
}
18601883
auto rs = commitData.keyResolvers.modify(allKeys);
18611884
for (auto r = rs.begin(); r != rs.end(); ++r)
18621885
r->value().emplace_back(0, 0);

fdbserver/LogSystem.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "fdbserver/MutationTracking.h"
3232
#include "flow/Arena.h"
3333
#include "flow/Error.h"
34+
#include "flow/Histogram.h"
3435
#include "flow/IndexedSet.h"
3536
#include "flow/Knobs.h"
3637
#include "fdbrpc/ReplicationPolicy.h"
@@ -57,6 +58,7 @@ class LogSet : NonCopyable, public ReferenceCounted<LogSet> {
5758
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logRouters;
5859
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers;
5960
std::vector<Reference<ConnectionResetInfo>> connectionResetTrackers;
61+
std::vector<Reference<Histogram>> tlogPushDistTrackers;
6062
int32_t tLogWriteAntiQuorum;
6163
int32_t tLogReplicationFactor;
6264
std::vector<LocalityData> tLogLocalities; // Stores the localities of the log servers

fdbserver/ProxyCommitData.actor.h

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ struct ProxyStats {
7474
int64_t maxComputeNS;
7575
int64_t minComputeNS;
7676

77+
Reference<Histogram> commitBatchQueuingDist;
78+
Reference<Histogram> getCommitVersionDist;
79+
std::vector<Reference<Histogram>> resolverDist;
80+
Reference<Histogram> resolutionDist;
81+
Reference<Histogram> postResolutionDist;
82+
Reference<Histogram> processingMutationDist;
83+
Reference<Histogram> tlogLoggingDist;
84+
Reference<Histogram> replyCommitDist;
85+
7786
int64_t getAndResetMaxCompute() {
7887
int64_t r = maxComputeNS;
7988
maxComputeNS = 0;
@@ -113,7 +122,28 @@ struct ProxyStats {
113122
id,
114123
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
115124
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
116-
maxComputeNS(0), minComputeNS(1e12) {
125+
maxComputeNS(0), minComputeNS(1e12),
126+
commitBatchQueuingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
127+
LiteralStringRef("CommitBatchQueuing"),
128+
Histogram::Unit::microseconds)),
129+
getCommitVersionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
130+
LiteralStringRef("GetCommitVersion"),
131+
Histogram::Unit::microseconds)),
132+
resolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
133+
LiteralStringRef("Resolution"),
134+
Histogram::Unit::microseconds)),
135+
postResolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
136+
LiteralStringRef("PostResolutionQueuing"),
137+
Histogram::Unit::microseconds)),
138+
processingMutationDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
139+
LiteralStringRef("ProcessingMutation"),
140+
Histogram::Unit::microseconds)),
141+
tlogLoggingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
142+
LiteralStringRef("TlogLogging"),
143+
Histogram::Unit::microseconds)),
144+
replyCommitDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
145+
LiteralStringRef("ReplyCommit"),
146+
Histogram::Unit::microseconds)) {
117147
specialCounter(cc, "LastAssignedCommitVersion", [this]() { return this->lastCommitVersionAssigned; });
118148
specialCounter(cc, "Version", [pVersion]() { return *pVersion; });
119149
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });

fdbserver/TagPartitionedLogSystem.actor.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
527527
}
528528

529529
ACTOR static Future<TLogCommitReply> recordPushMetrics(Reference<ConnectionResetInfo> self,
530+
Reference<Histogram> dist,
530531
NetworkAddress addr,
531532
Future<TLogCommitReply> in) {
532533
state double startTime = now();
@@ -541,6 +542,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
541542
self->fastReplies++;
542543
}
543544
}
545+
dist->sampleSeconds(now() - startTime);
544546
return t;
545547
}
546548

@@ -563,12 +565,21 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
563565
it->connectionResetTrackers.push_back(makeReference<ConnectionResetInfo>());
564566
}
565567
}
568+
if (it->tlogPushDistTrackers.empty()) {
569+
for (int i = 0; i < it->logServers.size(); i++) {
570+
it->tlogPushDistTrackers.push_back(
571+
Histogram::getHistogram("ToTlog_" + it->logServers[i]->get().interf().uniqueID.toString(),
572+
it->logServers[i]->get().interf().address().toString(),
573+
Histogram::Unit::microseconds));
574+
}
575+
}
566576
vector<Future<Void>> tLogCommitResults;
567577
for (int loc = 0; loc < it->logServers.size(); loc++) {
568578
Standalone<StringRef> msg = data.getMessages(location);
569579
data.recordEmptyMessage(location, msg);
570580
allReplies.push_back(recordPushMetrics(
571581
it->connectionResetTrackers[loc],
582+
it->tlogPushDistTrackers[loc],
572583
it->logServers[loc]->get().interf().address(),
573584
it->logServers[loc]->get().interf().commit.getReply(TLogCommitRequest(spanContext,
574585
msg.arena(),

0 commit comments

Comments
 (0)