2020
2121#include < algorithm>
2222#include " fdbclient/MutationLogReader.actor.h"
23+ #include " fdbclient/StatusClient.h"
2324#include " fdbclient/Tuple.h"
2425#include " fdbserver/workloads/ApiWorkload.h"
2526#include " fdbserver/workloads/workloads.actor.h"
@@ -47,9 +48,13 @@ struct GetMappedRangeWorkload : ApiWorkload {
4748 // const bool SPLIT_RECORDS = deterministicRandom()->random01() < 0.5;
4849 const bool SPLIT_RECORDS = true ;
4950 const static int SPLIT_SIZE = 3 ;
51+ int checkStorageQueueSeconds;
52+ int queueMaxLength;
5053
5154 GetMappedRangeWorkload (WorkloadContext const & wcx) : ApiWorkload(wcx) {
5255 enabled = !clientId; // only do this on the "first" client
56+ checkStorageQueueSeconds = getOption (options, " checkStorageQueueSeconds" _sr, 60.0 );
57+ queueMaxLength = getOption (options, " queueMaxLength" _sr, 100 );
5358 }
5459
5560 // TODO: Currently this workload doesn't play well with MachineAttrition, but it probably should
@@ -267,15 +272,44 @@ struct GetMappedRangeWorkload : ApiWorkload {
267272 e.code () == error_code_quick_get_key_values_miss)) {
268273 TraceEvent (" GetMappedRangeWorkloadExpectedErrorDetected" ).error (e);
269274 return MappedRangeResult ();
275+ } else if (e.code () == error_code_commit_proxy_memory_limit_exceeded ||
276+ e.code () == error_code_operation_cancelled) {
277+ // requests have overwhelmed commit proxy, rest a bit
278+ wait (delay (FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY ));
279+ continue ;
270280 } else {
271- std::cout << " error " << e.what () << std::endl;
281+ std::cout << " scan error " << e.what () << " code is " << e. code () << std::endl;
272282 wait (tr->onError (e));
273283 }
274284 std::cout << " failed scanMappedRangeWithLimits" << std::endl;
275285 }
276286 }
277287 }
278288
289+ // if sendFirstRequestIndefinitely is true, then this method would send the first request indefinitly
290+ // it is in order to test the metric
291+ ACTOR Future<Void> submitSmallRequestIndefinitely (Database cx,
292+ int beginId,
293+ int endId,
294+ Key mapper,
295+ GetMappedRangeWorkload* self) {
296+ Key beginTuple = Tuple ().append (prefix).append (INDEX).append (indexKey (beginId)).getDataAsStandalone ();
297+ state KeySelector beginSelector = KeySelector (firstGreaterOrEqual (beginTuple));
298+ Key endTuple = Tuple ().append (prefix).append (INDEX).append (indexKey (endId)).getDataAsStandalone ();
299+ state KeySelector endSelector = KeySelector (firstGreaterOrEqual (endTuple));
300+ state int limit = 1 ;
301+ state int byteLimit = 10000 ;
302+ while (true ) {
303+ MappedRangeResult result = wait (self->scanMappedRangeWithLimits (
304+ cx, beginSelector, endSelector, mapper, limit, byteLimit, beginId, self, MATCH_INDEX_ALL, false ));
305+ if (result.empty ()) {
306+ TraceEvent (" EmptyResult" );
307+ }
308+ // to avoid requests make proxy memory overwhelmed
309+ wait (delay (FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY ));
310+ }
311+ }
312+
279313 ACTOR Future<Void> scanMappedRange (Database cx,
280314 int beginId,
281315 int endId,
@@ -315,7 +349,6 @@ struct GetMappedRangeWorkload : ApiWorkload {
315349 int indexByteLimit = byteLimit * SERVER_KNOBS->FRACTION_INDEX_BYTELIMIT_PREFETCH ;
316350 int indexCountByteLimit = indexByteLimit / indexSize + (indexByteLimit % indexSize != 0 );
317351 int indexCount = std::min (limit, indexCountByteLimit);
318- std::cout << " indexCount: " << indexCount << std::endl;
319352 // result set cannot be larger than the number of index fetched
320353 ASSERT (result.size () <= indexCount);
321354
@@ -331,7 +364,6 @@ struct GetMappedRangeWorkload : ApiWorkload {
331364 boundByRecord = round * SERVER_KNOBS->MAX_PARALLEL_QUICK_GET_VALUE ;
332365 }
333366 expectedCnt = std::min (expectedCnt, boundByRecord);
334- std::cout << " boundByRecord: " << boundByRecord << std::endl;
335367 ASSERT_LE (result.size (), expectedCnt);
336368 beginSelector = KeySelector (firstGreaterThan (result.back ().key ));
337369 }
@@ -423,6 +455,46 @@ struct GetMappedRangeWorkload : ApiWorkload {
423455 }
424456 }
425457
458+ // checking the max storage queue length is bounded
459+ ACTOR static Future<Void> reportMetric (GetMappedRangeWorkload* self, Database cx) {
460+ loop {
461+ StatusObject result = wait (StatusClient::statusFetcher (cx));
462+ StatusObjectReader statusObj (result);
463+ state StatusObjectReader statusObjCluster;
464+ state StatusObjectReader processesMap;
465+ state long queryQueueMax = 0 ;
466+ state int waitInterval = 2 ;
467+ if (!statusObj.get (" cluster" , statusObjCluster)) {
468+ TraceEvent (" NoCluster" );
469+ wait (delay (waitInterval));
470+ continue ;
471+ }
472+
473+ if (!statusObjCluster.get (" processes" , processesMap)) {
474+ TraceEvent (" NoProcesses" );
475+ wait (delay (waitInterval));
476+ continue ;
477+ }
478+ for (auto proc : processesMap.obj ()) {
479+ StatusObjectReader process (proc.second );
480+ if (process.has (" roles" )) {
481+ StatusArray rolesArray = proc.second .get_obj ()[" roles" ].get_array ();
482+ for (StatusObjectReader role : rolesArray) {
483+ if (role[" role" ].get_str () == " storage" ) {
484+ role.get (" query_queue_max" , queryQueueMax);
485+ CODE_PROBE (queryQueueMax > 0 , " SS query queue is non-empty" );
486+ TraceEvent (SevDebug, " QueryQueueMax" ).detail (" Value" , queryQueueMax);
487+ ASSERT (queryQueueMax < self->queueMaxLength );
488+ }
489+ }
490+ } else {
491+ TraceEvent (" NoRoles" );
492+ }
493+ }
494+ wait (delay (waitInterval));
495+ }
496+ }
497+
426498 // If the same transaction writes to the read set (the scanned ranges) before reading, it should throw read your
427499 // write exception.
428500 ACTOR Future<Void> testRYW (GetMappedRangeWorkload* self) {
@@ -447,6 +519,27 @@ struct GetMappedRangeWorkload : ApiWorkload {
447519 }
448520 }
449521
522+ ACTOR static Future<Void> testMetric (Database cx,
523+ GetMappedRangeWorkload* self,
524+ int beginId,
525+ int endId,
526+ Key mapper,
527+ int seconds) {
528+ loop choose {
529+ when (wait (reportMetric (self, cx))) {
530+ TraceEvent (SevError, " Error: ReportMetric has ended" );
531+ return Void ();
532+ }
533+ when (wait (self->submitSmallRequestIndefinitely (cx, 10 , 490 , mapper, self))) {
534+ TraceEvent (SevError, " Error: submitSmallRequestIndefinitely has ended" );
535+ return Void ();
536+ }
537+ when (wait (delay (seconds))) {
538+ return Void ();
539+ }
540+ }
541+ }
542+
450543 ACTOR Future<Void> _start (Database cx, GetMappedRangeWorkload* self) {
451544 TraceEvent (" GetMappedRangeWorkloadConfig" ).detail (" BadMapper" , self->BAD_MAPPER );
452545
@@ -474,7 +567,7 @@ struct GetMappedRangeWorkload : ApiWorkload {
474567 std::cout << " Test configuration: transactionType:" << self->transactionType << " snapshot:" << self->snapshot
475568 << " bad_mapper:" << self->BAD_MAPPER << std::endl;
476569
477- Key mapper = getMapper (self, false );
570+ state Key mapper = getMapper (self, false );
478571 // The scanned range cannot be too large to hit get_mapped_key_values_has_more. We have a unit validating the
479572 // error is thrown when the range is large.
480573 const double r = deterministicRandom ()->random01 ();
@@ -491,9 +584,10 @@ struct GetMappedRangeWorkload : ApiWorkload {
491584 wait (self->scanMappedRange (cx, 10 , 490 , mapper, self, matchIndex));
492585
493586 {
494- Key mapper = getMapper (self, true );
495- wait (self->scanMappedRange (cx, 10 , 490 , mapper , self, MATCH_INDEX_UNMATCHED_ONLY, true ));
587+ Key mapperMissing = getMapper (self, true );
588+ wait (self->scanMappedRange (cx, 10 , 490 , mapperMissing , self, MATCH_INDEX_UNMATCHED_ONLY, true ));
496589 }
590+ wait (testMetric (cx, self, 10 , 490 , mapper, self->checkStorageQueueSeconds ));
497591
498592 // reset it to default
499593 (const_cast <ServerKnobs*> SERVER_KNOBS)->STRICTLY_ENFORCE_BYTE_LIMIT = originalStrictlyEnforeByteLimit;
0 commit comments