Skip to content

Commit e8cbd88

Browse files
committed
The sharded rocks implementation skipped on TSS handling. Backfill. (#9759)
1 parent 6066839 commit e8cbd88

File tree

1 file changed

+68
-4
lines changed

1 file changed

+68
-4
lines changed

fdbserver/MoveKeys.actor.cpp

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,13 +1249,15 @@ ACTOR static Future<Void> startMoveShards(Database occ,
12491249
MoveKeysLock lock,
12501250
FlowLock* startMoveKeysLock,
12511251
UID relocationIntervalId,
1252+
std::map<UID, StorageServerInterface>* tssMapping,
12521253
const DDEnabledState* ddEnabledState,
12531254
CancelConflictingDataMoves cancelConflictingDataMoves) {
12541255
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
12551256
state Future<Void> warningLogger = logWarningAfter("StartMoveShardsTooLong", 600, servers);
12561257

12571258
wait(startMoveKeysLock->take(TaskPriority::DataDistributionLaunch));
12581259
state FlowLock::Releaser releaser(*startMoveKeysLock);
1260+
state bool loadedTssMapping = false;
12591261
state DataMoveMetaData dataMove;
12601262

12611263
TraceEvent(SevDebug, "StartMoveShardsBegin", relocationIntervalId)
@@ -1265,7 +1267,6 @@ ACTOR static Future<Void> startMoveShards(Database occ,
12651267
try {
12661268
state Key begin = keys.begin;
12671269
state KeyRange currentKeys = keys;
1268-
state int maxRetries = 0;
12691270
state bool complete = false;
12701271

12711272
loop {
@@ -1312,13 +1313,19 @@ ACTOR static Future<Void> startMoveShards(Database occ,
13121313
.detail("DataMoveID", dataMoveId);
13131314
}
13141315

1316+
if (!loadedTssMapping) {
1317+
// share transaction for loading tss mapping with the rest of start move keys
1318+
wait(readTSSMapping(&tr, tssMapping));
1319+
loadedTssMapping = true;
1320+
}
1321+
13151322
std::vector<Future<Optional<Value>>> serverListEntries;
13161323
serverListEntries.reserve(servers.size());
13171324
for (int s = 0; s < servers.size(); s++) {
13181325
serverListEntries.push_back(tr.get(serverListKeyFor(servers[s])));
13191326
}
1320-
13211327
std::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
1328+
13221329
for (int s = 0; s < serverListValues.size(); s++) {
13231330
if (!serverListValues[s].present()) {
13241331
// Attempt to move onto a server that isn't in serverList (removed or never added to the
@@ -1596,6 +1603,9 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
15961603

15971604
wait(finishMoveKeysParallelismLock->take(TaskPriority::DataDistributionLaunch));
15981605
state FlowLock::Releaser releaser = FlowLock::Releaser(*finishMoveKeysParallelismLock);
1606+
state std::unordered_set<UID> tssToIgnore;
1607+
// try waiting for tss for a 2 loops, give up if they're behind to not affect the rest of the cluster
1608+
state int waitForTSSCounter = 2;
15991609

16001610
ASSERT(!destinationTeam.empty());
16011611

@@ -1701,6 +1711,8 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
17011711
// They must also have at least the transaction read version so they can't "forget" the shard
17021712
// between now and when this transaction commits.
17031713
state std::vector<Future<Void>> serverReady; // only for count below
1714+
state std::vector<Future<Void>> tssReady; // for waiting in parallel with tss
1715+
state std::vector<StorageServerInterface> tssReadyInterfs;
17041716
state std::vector<UID> newDestinations;
17051717
std::set<UID> completeSrcSet(completeSrc.begin(), completeSrc.end());
17061718
for (const UID& id : destServers) {
@@ -1727,31 +1739,82 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
17271739
storageServerInterfaces.push_back(si);
17281740
}
17291741

1742+
// update client info in case tss mapping changed or server got updated
1743+
17301744
// Wait for new destination servers to fetch the data range.
17311745
serverReady.reserve(storageServerInterfaces.size());
1746+
tssReady.reserve(storageServerInterfaces.size());
1747+
tssReadyInterfs.reserve(storageServerInterfaces.size());
17321748
for (int s = 0; s < storageServerInterfaces.size(); s++) {
17331749
serverReady.push_back(waitForShardReady(
17341750
storageServerInterfaces[s], range, tr.getReadVersion().get(), GetShardStateRequest::READABLE));
1751+
1752+
auto tssPair = tssMapping.find(storageServerInterfaces[s].id());
1753+
1754+
if (tssPair != tssMapping.end() && waitForTSSCounter > 0 &&
1755+
!tssToIgnore.count(tssPair->second.id())) {
1756+
tssReadyInterfs.push_back(tssPair->second);
1757+
tssReady.push_back(waitForShardReady(
1758+
tssPair->second, range, tr.getReadVersion().get(), GetShardStateRequest::READABLE));
1759+
}
17351760
}
17361761

17371762
// Wait for all storage server moves, and explicitly swallow errors for tss ones with
17381763
// waitForAllReady If this takes too long the transaction will time out and retry, which is ok
1739-
wait(timeout(waitForAll(serverReady),
1764+
wait(timeout(waitForAll(serverReady) && waitForAllReady(tssReady),
17401765
SERVER_KNOBS->SERVER_READY_QUORUM_TIMEOUT,
17411766
Void(),
17421767
TaskPriority::MoveKeys));
17431768

1769+
// Check to see if we're waiting only on tss. If so, decrement the waiting counter.
1770+
// If the waiting counter is zero, ignore the slow/non-responsive tss processes before finalizing
1771+
// the data move.
1772+
if (tssReady.size()) {
1773+
bool allSSDone = true;
1774+
for (auto& f : serverReady) {
1775+
allSSDone &= f.isReady() && !f.isError();
1776+
if (!allSSDone) {
1777+
break;
1778+
}
1779+
}
1780+
1781+
if (allSSDone) {
1782+
bool anyTssNotDone = false;
1783+
1784+
for (auto& f : tssReady) {
1785+
if (!f.isReady() || f.isError()) {
1786+
anyTssNotDone = true;
1787+
waitForTSSCounter--;
1788+
break;
1789+
}
1790+
}
1791+
1792+
if (anyTssNotDone && waitForTSSCounter == 0) {
1793+
for (int i = 0; i < tssReady.size(); i++) {
1794+
if (!tssReady[i].isReady() || tssReady[i].isError()) {
1795+
tssToIgnore.insert(tssReadyInterfs[i].id());
1796+
}
1797+
}
1798+
}
1799+
}
1800+
}
1801+
17441802
std::vector<UID> readyServers;
17451803
for (int s = 0; s < serverReady.size(); ++s) {
17461804
if (serverReady[s].isReady() && !serverReady[s].isError()) {
17471805
readyServers.push_back(storageServerInterfaces[s].uniqueID);
17481806
}
17491807
}
1808+
int tssCount = 0;
1809+
for (int s = 0; s < tssReady.size(); s++) {
1810+
tssCount += tssReady[s].isReady() && !tssReady[s].isError();
1811+
}
17501812

17511813
TraceEvent(SevVerbose, "FinishMoveShardsWaitedServers", relocationIntervalId)
17521814
.detail("DataMoveID", dataMoveId)
17531815
.detail("ReadyServers", describe(readyServers))
1754-
.detail("NewDestinations", describe(newDestinations));
1816+
.detail("NewDestinations", describe(newDestinations))
1817+
.detail("ReadyTSS", tssCount);
17551818

17561819
if (readyServers.size() == newDestinations.size()) {
17571820

@@ -2493,6 +2556,7 @@ Future<Void> rawStartMovement(Database occ, MoveKeysParams& params, std::map<UID
24932556
params.lock,
24942557
params.startMoveKeysParallelismLock,
24952558
params.relocationIntervalId,
2559+
&tssMapping,
24962560
params.ddEnabledState,
24972561
params.cancelConflictingDataMoves);
24982562
}

0 commit comments

Comments
 (0)