@@ -1249,13 +1249,15 @@ ACTOR static Future<Void> startMoveShards(Database occ,
12491249 MoveKeysLock lock,
12501250 FlowLock* startMoveKeysLock,
12511251 UID relocationIntervalId,
1252+ std::map<UID, StorageServerInterface>* tssMapping,
12521253 const DDEnabledState* ddEnabledState,
12531254 CancelConflictingDataMoves cancelConflictingDataMoves) {
12541255 ASSERT (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA );
12551256 state Future<Void> warningLogger = logWarningAfter (" StartMoveShardsTooLong" , 600 , servers);
12561257
12571258 wait (startMoveKeysLock->take (TaskPriority::DataDistributionLaunch));
12581259 state FlowLock::Releaser releaser (*startMoveKeysLock);
1260+ state bool loadedTssMapping = false ;
12591261 state DataMoveMetaData dataMove;
12601262
12611263 TraceEvent (SevDebug, " StartMoveShardsBegin" , relocationIntervalId)
@@ -1265,7 +1267,6 @@ ACTOR static Future<Void> startMoveShards(Database occ,
12651267 try {
12661268 state Key begin = keys.begin ;
12671269 state KeyRange currentKeys = keys;
1268- state int maxRetries = 0 ;
12691270 state bool complete = false ;
12701271
12711272 loop {
@@ -1312,13 +1313,19 @@ ACTOR static Future<Void> startMoveShards(Database occ,
13121313 .detail (" DataMoveID" , dataMoveId);
13131314 }
13141315
1316+ if (!loadedTssMapping) {
1317+ // share transaction for loading tss mapping with the rest of start move keys
1318+ wait (readTSSMapping (&tr, tssMapping));
1319+ loadedTssMapping = true ;
1320+ }
1321+
13151322 std::vector<Future<Optional<Value>>> serverListEntries;
13161323 serverListEntries.reserve (servers.size ());
13171324 for (int s = 0 ; s < servers.size (); s++) {
13181325 serverListEntries.push_back (tr.get (serverListKeyFor (servers[s])));
13191326 }
1320-
13211327 std::vector<Optional<Value>> serverListValues = wait (getAll (serverListEntries));
1328+
13221329 for (int s = 0 ; s < serverListValues.size (); s++) {
13231330 if (!serverListValues[s].present ()) {
13241331 // Attempt to move onto a server that isn't in serverList (removed or never added to the
@@ -1596,6 +1603,9 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
15961603
15971604 wait (finishMoveKeysParallelismLock->take (TaskPriority::DataDistributionLaunch));
15981605 state FlowLock::Releaser releaser = FlowLock::Releaser (*finishMoveKeysParallelismLock);
1606+ state std::unordered_set<UID> tssToIgnore;
1607+ // try waiting for tss for a 2 loops, give up if they're behind to not affect the rest of the cluster
1608+ state int waitForTSSCounter = 2 ;
15991609
16001610 ASSERT (!destinationTeam.empty ());
16011611
@@ -1701,6 +1711,8 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
17011711 // They must also have at least the transaction read version so they can't "forget" the shard
17021712 // between now and when this transaction commits.
17031713 state std::vector<Future<Void>> serverReady; // only for count below
1714+ state std::vector<Future<Void>> tssReady; // for waiting in parallel with tss
1715+ state std::vector<StorageServerInterface> tssReadyInterfs;
17041716 state std::vector<UID> newDestinations;
17051717 std::set<UID> completeSrcSet (completeSrc.begin (), completeSrc.end ());
17061718 for (const UID& id : destServers) {
@@ -1727,31 +1739,82 @@ ACTOR static Future<Void> finishMoveShards(Database occ,
17271739 storageServerInterfaces.push_back (si);
17281740 }
17291741
1742+ // update client info in case tss mapping changed or server got updated
1743+
17301744 // Wait for new destination servers to fetch the data range.
17311745 serverReady.reserve (storageServerInterfaces.size ());
1746+ tssReady.reserve (storageServerInterfaces.size ());
1747+ tssReadyInterfs.reserve (storageServerInterfaces.size ());
17321748 for (int s = 0 ; s < storageServerInterfaces.size (); s++) {
17331749 serverReady.push_back (waitForShardReady (
17341750 storageServerInterfaces[s], range, tr.getReadVersion ().get (), GetShardStateRequest::READABLE));
1751+
1752+ auto tssPair = tssMapping.find (storageServerInterfaces[s].id ());
1753+
1754+ if (tssPair != tssMapping.end () && waitForTSSCounter > 0 &&
1755+ !tssToIgnore.count (tssPair->second .id ())) {
1756+ tssReadyInterfs.push_back (tssPair->second );
1757+ tssReady.push_back (waitForShardReady (
1758+ tssPair->second , range, tr.getReadVersion ().get (), GetShardStateRequest::READABLE));
1759+ }
17351760 }
17361761
17371762 // Wait for all storage server moves, and explicitly swallow errors for tss ones with
17381763 // waitForAllReady If this takes too long the transaction will time out and retry, which is ok
1739- wait (timeout (waitForAll (serverReady),
1764+ wait (timeout (waitForAll (serverReady) && waitForAllReady (tssReady) ,
17401765 SERVER_KNOBS->SERVER_READY_QUORUM_TIMEOUT ,
17411766 Void (),
17421767 TaskPriority::MoveKeys));
17431768
1769+ // Check to see if we're waiting only on tss. If so, decrement the waiting counter.
1770+ // If the waiting counter is zero, ignore the slow/non-responsive tss processes before finalizing
1771+ // the data move.
1772+ if (tssReady.size ()) {
1773+ bool allSSDone = true ;
1774+ for (auto & f : serverReady) {
1775+ allSSDone &= f.isReady () && !f.isError ();
1776+ if (!allSSDone) {
1777+ break ;
1778+ }
1779+ }
1780+
1781+ if (allSSDone) {
1782+ bool anyTssNotDone = false ;
1783+
1784+ for (auto & f : tssReady) {
1785+ if (!f.isReady () || f.isError ()) {
1786+ anyTssNotDone = true ;
1787+ waitForTSSCounter--;
1788+ break ;
1789+ }
1790+ }
1791+
1792+ if (anyTssNotDone && waitForTSSCounter == 0 ) {
1793+ for (int i = 0 ; i < tssReady.size (); i++) {
1794+ if (!tssReady[i].isReady () || tssReady[i].isError ()) {
1795+ tssToIgnore.insert (tssReadyInterfs[i].id ());
1796+ }
1797+ }
1798+ }
1799+ }
1800+ }
1801+
17441802 std::vector<UID> readyServers;
17451803 for (int s = 0 ; s < serverReady.size (); ++s) {
17461804 if (serverReady[s].isReady () && !serverReady[s].isError ()) {
17471805 readyServers.push_back (storageServerInterfaces[s].uniqueID );
17481806 }
17491807 }
1808+ int tssCount = 0 ;
1809+ for (int s = 0 ; s < tssReady.size (); s++) {
1810+ tssCount += tssReady[s].isReady () && !tssReady[s].isError ();
1811+ }
17501812
17511813 TraceEvent (SevVerbose, " FinishMoveShardsWaitedServers" , relocationIntervalId)
17521814 .detail (" DataMoveID" , dataMoveId)
17531815 .detail (" ReadyServers" , describe (readyServers))
1754- .detail (" NewDestinations" , describe (newDestinations));
1816+ .detail (" NewDestinations" , describe (newDestinations))
1817+ .detail (" ReadyTSS" , tssCount);
17551818
17561819 if (readyServers.size () == newDestinations.size ()) {
17571820
@@ -2493,6 +2556,7 @@ Future<Void> rawStartMovement(Database occ, MoveKeysParams& params, std::map<UID
24932556 params.lock ,
24942557 params.startMoveKeysParallelismLock ,
24952558 params.relocationIntervalId ,
2559+ &tssMapping,
24962560 params.ddEnabledState ,
24972561 params.cancelConflictingDataMoves );
24982562 }
0 commit comments