99import com .azure .cosmos .implementation .DocumentCollection ;
1010import com .azure .cosmos .implementation .RetryAnalyzer ;
1111import com .azure .cosmos .implementation .Utils ;
12+ import com .azure .cosmos .implementation .changefeed .common .ChangeFeedState ;
13+ import com .azure .cosmos .implementation .changefeed .common .ChangeFeedStateV1 ;
1214import com .azure .cosmos .implementation .feedranges .FeedRangeEpkImpl ;
1315import com .azure .cosmos .implementation .feedranges .FeedRangeInternal ;
1416import com .azure .cosmos .implementation .guava25 .collect .ArrayListMultimap ;
1517import com .azure .cosmos .implementation .guava25 .collect .Multimap ;
18+ import com .azure .cosmos .implementation .query .CompositeContinuationToken ;
1619import com .azure .cosmos .implementation .routing .Range ;
1720import com .azure .cosmos .models .ChangeFeedPolicy ;
1821import com .azure .cosmos .models .CosmosChangeFeedRequestOptions ;
2225import com .azure .cosmos .models .FeedRange ;
2326import com .azure .cosmos .models .ModelBridgeInternal ;
2427import com .azure .cosmos .models .PartitionKey ;
28+ import com .azure .cosmos .models .PartitionKeyDefinition ;
29+ import com .azure .cosmos .models .PartitionKeyDefinitionVersion ;
2530import com .azure .cosmos .rx .TestSuiteBase ;
2631import com .azure .cosmos .test .faultinjection .CosmosFaultInjectionHelper ;
2732import com .azure .cosmos .test .faultinjection .FaultInjectionConditionBuilder ;
5257import java .util .Arrays ;
5358import java .util .Base64 ;
5459import java .util .Collection ;
60+ import java .util .Comparator ;
5561import java .util .HashMap ;
5662import java .util .List ;
5763import java .util .Map ;
64+ import java .util .Queue ;
5865import java .util .UUID ;
5966import java .util .concurrent .atomic .AtomicReference ;
6067import java .util .function .Function ;
@@ -665,6 +672,163 @@ public void asyncChangeFeed_retryPolicy_tests(
665672 }
666673 }
667674
675+ @ Test (groups = { "emulator" }, timeOut = TIMEOUT )
676+ public void split_only_notModified () throws Exception {
677+ // This test is used to reproduce and regression test a bug identified in the split handling
678+ // Background
679+ // Container.queryChangeFeed can result in hang when a continuation token consists of multiple sub-ranges,
680+ // there are no more change for any of the sub-ranges, but a split is happening on the first sub-range after
681+ // the sub-range where we saw a 304 the first time.
682+ // This effectively results in an endless-loop because we identify whether all sub-ranges are drained by
683+ // capturing teh first subrange we see a 304 on and then loop through sub-ranges until we hit the same
684+ // subrange without seeing anything but 304. When there is an even number of sub-ranges and a to-be-split
685+ // sub-range is the last one - we never exit the loop after the split.
686+ // This test is reproducing this edge case - and used both to validate the hotfix and to protect against
687+ // regressing the scenario again.
688+ //
689+ // SAMPLE CONTINUATION TOKEN
690+ // {
691+ // "V": 1,
692+ // "Rid": "0xljAKk+nBE=",
693+ // "Mode": "INCREMENTAL",
694+ // "StartFrom": {
695+ // "Type": "NOW"
696+ // },
697+ // "Continuation": {
698+ // "V": 1,
699+ // "Rid": "0xljAKk+nBE=",
700+ // "Continuation": [
701+ // {
702+ // "token": "\"46037\"",
703+ // "range": {
704+ // "min": "",
705+ // "max": "15555555555555555555555555555555"
706+ // }
707+ // },
708+ // {
709+ // "token": "\"9223372036854775797\"",
710+ // "range": {
711+ // "min": "15555555555555555555555555555555",
712+ // "max": "FF"
713+ // }
714+ // }
715+ // ],
716+ // "Range": {
717+ // "min": "",
718+ // "max": "FF"
719+ // }
720+ // }
721+ // }
722+ //
723+ // SEQUENCE OF EVENTS
724+ // - CF request for first sub-range returns 304 - now we have a bug where we capture the next sub-ranges MIN
725+ // range as the first one (doesn't really matter - we eventually just want to ensure we visited any subrange
726+ // and all return 304)
727+ // - So, FeedRangeCompositeContinuationImpl.initialNoResultsRange is set to the MinRange of the second
728+ // sub-range, which is to-be-split
729+ // - Split is processed and instead of the parent range two new child ranges are added at the end
730+ // - Another bug currently is that we don't go to the next sub-range but the next-thereafter. So, the child
731+ // sub-range starting with the MinRange of the parent range (in position 1) is skipped
732+ // - Since we always move to the second-next (and peek to the third token), we now have three sub-ranges
733+ // and only ever send CF requests for one sub-range in an endless loop.
734+
735+ this .createContainer (
736+ (cp ) -> {
737+
738+ // Ensuring we always use Hash V2 here
739+ PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition ();
740+ ArrayList <String > paths = new ArrayList <>();
741+ paths .add ("/mypk" );
742+ partitionKeyDef .setPaths (paths );
743+ partitionKeyDef .setVersion (PartitionKeyDefinitionVersion .V2 );
744+ cp .setPartitionKeyDefinition (partitionKeyDef );
745+
746+ // To reproduce easily we need at least 3 physical partitions
747+ return cp .setChangeFeedPolicy (ChangeFeedPolicy .createLatestVersionPolicy ());
748+ },
749+ 18_000
750+ );
751+ insertDocuments (20 , 7 );
752+
753+ CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
754+ .createForProcessingFromNow (FeedRange .forFullRange ());
755+
756+ String continuation = drainAndValidateChangeFeedResults (options , null , 0 );
757+ ChangeFeedState stateRaw = ChangeFeedState .fromString (continuation );
758+ assertThat (stateRaw ).isNotNull ();
759+ assertThat (stateRaw ).isInstanceOf (ChangeFeedStateV1 .class );
760+ ChangeFeedStateV1 state = (ChangeFeedStateV1 )stateRaw ;
761+ assertThat (state .getContinuation ()).isNotNull ();
762+ assertThat (state .getContinuation ().getCompositeContinuationTokens ()).isNotNull ();
763+
764+ logger .info ("Continuation token after first iteration {}" , state .toJson ());
765+
766+ Queue <CompositeContinuationToken > tokens = state .getContinuation ().getCompositeContinuationTokens ();
767+ assertThat (tokens ).isNotNull ();
768+
769+ // Validate that we don't clone the tokens in the property getter - otherwise the test code below wouldn't work.
770+ assertThat (tokens ).isSameAs (state .getContinuation ().getCompositeContinuationTokens ());
771+
772+ assertThat (tokens ).hasSize (3 );
773+
774+ List <CompositeContinuationToken > tokenList = new ArrayList <>();
775+ for (int i = 0 ; i < 3 ; i ++) {
776+ tokenList .add (tokens .poll ());
777+ }
778+
779+ tokenList .sort (Comparator .comparing (o -> o .getRange ().getMin ()));
780+
781+ CompositeContinuationToken firstToken = tokenList .get (0 );
782+ assertThat (firstToken ).isNotNull ();
783+
784+ CompositeContinuationToken secondToken = tokenList .get (1 );
785+ assertThat (secondToken ).isNotNull ();
786+
787+ CompositeContinuationToken thirdToken = tokenList .get (2 );
788+ assertThat (thirdToken ).isNotNull ();
789+
790+ assertThat (secondToken .getRange ().getMax ()).isEqualTo (thirdToken .getRange ().getMin ());
791+
792+ assertThat (tokens ).hasSize (0 );
793+
794+ // Add the first two tokens as is
795+ tokens .add (firstToken );
796+
797+ // generate a merged token for 3rd and 4th partition
798+ String newToken = "\" " + (Long .MAX_VALUE - 10L ) + "\" " ;
799+ CompositeContinuationToken newMergedToken = new CompositeContinuationToken (
800+ newToken ,
801+ new Range <>(
802+ secondToken .getRange ().getMin (),
803+ thirdToken .getRange ().getMax (),
804+ true ,
805+ false )
806+ );
807+
808+ tokens .add (newMergedToken );
809+
810+ // Add the second and third token after the merged one - this is necessary to make sure the
811+ // next token after hitting 304 on the merged token is not the first child sub-range
812+ // then the hang would not be reproducible
813+ //tokens.add(secondToken);
814+
815+ logger .info ("New modified continuation to provoke the hang {}" , state .toJson ());
816+
817+ assertThat (state .getContinuation ().getCompositeContinuationTokens ()).hasSize (2 );
818+ options = CosmosChangeFeedRequestOptions
819+ .createForProcessingFromContinuation (state .toString ());
820+
821+ String continuationAfterLastDrainAttempt =
822+ drainAndValidateChangeFeedResults (options , null , 0 );
823+ ChangeFeedState stateAfterLastDrainAttemptRaw = ChangeFeedState .fromString (continuationAfterLastDrainAttempt );
824+ assertThat (stateAfterLastDrainAttemptRaw ).isNotNull ();
825+ assertThat (stateAfterLastDrainAttemptRaw ).isInstanceOf (ChangeFeedStateV1 .class );
826+ ChangeFeedStateV1 stateAfterLastDrainAttempt = (ChangeFeedStateV1 )stateAfterLastDrainAttemptRaw ;
827+ assertThat (stateAfterLastDrainAttempt .getContinuation ()).isNotNull ();
828+ assertThat (stateAfterLastDrainAttempt .getContinuation ().getCompositeContinuationTokens ()).isNotNull ();
829+ assertThat (stateAfterLastDrainAttempt .getContinuation ().getCompositeContinuationTokens ()).hasSize (3 );
830+ }
831+
668832 void insertDocuments (
669833 int partitionCount ,
670834 int documentCount ) {
@@ -888,6 +1052,13 @@ private Range<String> convertToMaxExclusive(Range<String> maxInclusiveRange) {
8881052 private void createContainer (
8891053 Function <CosmosContainerProperties , CosmosContainerProperties > onInitialization ) {
8901054
1055+ createContainer (onInitialization , 10100 );
1056+ }
1057+
1058+ private void createContainer (
1059+ Function <CosmosContainerProperties , CosmosContainerProperties > onInitialization ,
1060+ int throughput ) {
1061+
8911062 String collectionName = UUID .randomUUID ().toString ();
8921063 CosmosContainerProperties containerProperties = getCollectionDefinition (collectionName );
8931064
@@ -896,7 +1067,7 @@ private void createContainer(
8961067 }
8971068
8981069 CosmosContainerResponse containerResponse =
899- createdDatabase .createContainer (containerProperties , 10100 , null );
1070+ createdDatabase .createContainer (containerProperties , throughput , null );
9001071 assertThat (containerResponse .getRequestCharge ()).isGreaterThan (0 );
9011072 validateContainerResponse (containerProperties , containerResponse );
9021073
0 commit comments