5757import com .google .cloud .datastore .Value ;
5858import com .google .cloud .datastore .ValueType ;
5959import com .google .cloud .datastore .testing .RemoteDatastoreHelper ;
60- import com .google .common .collect .Iterators ;
61-
60+ import com .google .common .base .Preconditions ;
61+ import java .util .ArrayList ;
62+ import java .util .Collections ;
63+ import java .util .HashSet ;
64+ import java .util .Iterator ;
65+ import java .util .List ;
66+ import java .util .Set ;
6267import org .junit .After ;
6368import org .junit .AfterClass ;
6469import org .junit .Before ;
6570import org .junit .Rule ;
6671import org .junit .Test ;
6772import org .junit .rules .Timeout ;
6873
69- import java .util .Collections ;
70- import java .util .Iterator ;
71- import java .util .List ;
72-
7374public class ITDatastoreTest {
7475
7576 private static final RemoteDatastoreHelper HELPER = RemoteDatastoreHelper .create ();
@@ -84,8 +85,9 @@ public class ITDatastoreTest {
8485 private static final StringValue STR_VALUE = StringValue .of ("str" );
8586 private static final BooleanValue BOOL_VALUE = BooleanValue .newBuilder (false )
8687 .setExcludeFromIndexes (true ).build ();
88+ private static final Key ROOT_KEY = Key .newBuilder (PROJECT_ID , "rootkey" , "default" ).setNamespace (NAMESPACE ).build ();
8789 private static final IncompleteKey INCOMPLETE_KEY1 =
88- IncompleteKey .newBuilder (PROJECT_ID , KIND1 ).setNamespace (NAMESPACE ).build ();
90+ IncompleteKey .newBuilder (ROOT_KEY , KIND1 ).setNamespace (NAMESPACE ).build ();
8991 private static final IncompleteKey INCOMPLETE_KEY2 =
9092 IncompleteKey .newBuilder (PROJECT_ID , KIND2 ).setNamespace (NAMESPACE ).build ();
9193 private static final Key KEY1 = Key .newBuilder (INCOMPLETE_KEY1 , "name" ).build ();
@@ -137,6 +139,45 @@ public class ITDatastoreTest {
137139 private static final Entity ENTITY3 = Entity .newBuilder (ENTITY1 ).setKey (KEY3 ).remove ("str" )
138140 .set ("null" , NULL_VALUE ).set ("partial1" , PARTIAL_ENTITY2 ).set ("partial2" , ENTITY2 ).build ();
139141
142+ private <T > Iterator <T > getStronglyConsistentResults (Query scQuery , Query query ) {
143+ QueryResults <T > scResults = DATASTORE .run (scQuery );
144+ List <T > scResultsCopy = makeResultsCopy (scResults );
145+ Set <T > scResultsSet = new HashSet <>(scResultsCopy );
146+
147+ while (true ) {
148+ QueryResults <T > results = DATASTORE .run (query );
149+ List <T > resultsCopy = makeResultsCopy (results );
150+ if (!haveSameSize (scResultsCopy .iterator (), resultsCopy .iterator ())) {
151+ continue ;
152+ }
153+ for (T res : resultsCopy ) {
154+ if (! scResultsSet .contains (res )) {
155+ continue ;
156+ }
157+ }
158+ return resultsCopy .iterator ();
159+ }
160+ }
161+
162+ private <T > List <T > makeResultsCopy (QueryResults <T > scResults ) {
163+ Preconditions .checkNotNull (scResults );
164+ List <T > results = new ArrayList <>();
165+ while (scResults .hasNext ()) {
166+ results .add (scResults .next ());
167+ }
168+ return results ;
169+ }
170+
171+ private <T , S > boolean haveSameSize (Iterator <T > it1 , Iterator <S > it2 ) {
172+ Preconditions .checkNotNull (it1 );
173+ Preconditions .checkNotNull (it2 );
174+ while (it1 .hasNext () && it2 .hasNext ()) {
175+ it1 .next ();
176+ it2 .next ();
177+ }
178+ return !it1 .hasNext () && !it2 .hasNext ();
179+ }
180+
140181 @ Rule
141182 public Timeout globalTimeout = Timeout .seconds (100 );
142183
@@ -219,11 +260,6 @@ public void testTransactionWithQuery() throws InterruptedException {
219260 .build ();
220261 Transaction transaction = DATASTORE .newTransaction ();
221262 QueryResults <Entity > results = transaction .run (query );
222- while (Iterators .size (results ) < 1 ) {
223- Thread .sleep (500 );
224- results = DATASTORE .run (query );
225- }
226- results = DATASTORE .run (query );
227263 assertTrue (results .hasNext ());
228264 assertEquals (ENTITY2 , results .next ());
229265 assertFalse (results .hasNext ());
@@ -233,11 +269,6 @@ public void testTransactionWithQuery() throws InterruptedException {
233269
234270 transaction = DATASTORE .newTransaction ();
235271 results = transaction .run (query );
236- while (Iterators .size (results ) < 1 ) {
237- Thread .sleep (500 );
238- results = DATASTORE .run (query );
239- }
240- results = DATASTORE .run (query );
241272 assertTrue (results .hasNext ());
242273 assertEquals (ENTITY2 , results .next ());
243274 assertFalse (results .hasNext ());
@@ -341,12 +372,14 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
341372 Query <Entity > query1 = Query .newGqlQueryBuilder (ResultType .ENTITY , "select * from " + KIND1 )
342373 .setNamespace (NAMESPACE )
343374 .build ();
344- QueryResults <Entity > results1 = DATASTORE .run (query1 );
345- while (Iterators .size (results1 ) < 1 ) {
346- Thread .sleep (500 );
347- results1 = DATASTORE .run (query1 );
348- }
349- results1 = DATASTORE .run (query1 );
375+ Query <Entity > scQuery1 = Query .newEntityQueryBuilder ()
376+ .setNamespace (NAMESPACE )
377+ .setKind (KIND1 )
378+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
379+ .build ();
380+
381+ Iterator <Entity > results1 = getStronglyConsistentResults (scQuery1 , query1 );
382+
350383 assertTrue (results1 .hasNext ());
351384 assertEquals (ENTITY1 , results1 .next ());
352385 assertFalse (results1 .hasNext ());
@@ -356,34 +389,44 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
356389 Query .newGqlQueryBuilder (ResultType .ENTITY , "select * from " + KIND2 + " order by __key__" )
357390 .setNamespace (NAMESPACE )
358391 .build ();
359- QueryResults <? extends Entity > results2 = DATASTORE .run (query2 );
360- while (Iterators .size (results2 ) < 2 ) {
361- Thread .sleep (500 );
362- results2 = DATASTORE .run (query2 );
363- }
364- results2 = DATASTORE .run (query2 );
392+ Query <? extends Entity > scQuery2 = Query .newEntityQueryBuilder ()
393+ .setNamespace (NAMESPACE )
394+ .setKind (KIND2 )
395+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
396+ .setOrderBy (OrderBy .asc ("__key__" ))
397+ .build ();
398+
399+ Iterator <Entity > results2 = getStronglyConsistentResults (scQuery2 , query2 );
365400 assertTrue (results2 .hasNext ());
366401 assertEquals (ENTITY2 , results2 .next ());
367402 assertTrue (results2 .hasNext ());
368403 assertEquals (ENTITY3 , results2 .next ());
369404 assertFalse (results2 .hasNext ());
370405
371- query1 = Query .newGqlQueryBuilder (ResultType .ENTITY , "select * from bla" )
372- .setNamespace (NAMESPACE )
373- .build ();
374- results1 = DATASTORE .run (query1 );
406+ query1 =
407+ Query .newGqlQueryBuilder (ResultType .ENTITY , "select * from bla" )
408+ .setNamespace (NAMESPACE )
409+ .build ();
410+ scQuery1 =
411+ Query .newEntityQueryBuilder ()
412+ .setNamespace (NAMESPACE )
413+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
414+ .setKind ("bla" )
415+ .build ();
416+ results1 = getStronglyConsistentResults (scQuery1 , query1 );
375417 assertFalse (results1 .hasNext ());
376418
377419 Query <Key > keyOnlyQuery =
378420 Query .newGqlQueryBuilder (ResultType .KEY , "select __key__ from " + KIND1 )
379421 .setNamespace (NAMESPACE )
380422 .build ();
381- QueryResults <Key > keyOnlyResults = DATASTORE .run (keyOnlyQuery );
382- while (Iterators .size (keyOnlyResults ) < 1 ) {
383- Thread .sleep (500 );
384- keyOnlyResults = DATASTORE .run (keyOnlyQuery );
385- }
386- keyOnlyResults = DATASTORE .run (keyOnlyQuery );
423+ Query <Key > scKeyOnlyQuery =
424+ Query .newKeyQueryBuilder ()
425+ .setNamespace (NAMESPACE )
426+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
427+ .setKind (KIND1 )
428+ .build ();
429+ Iterator <Key > keyOnlyResults = getStronglyConsistentResults (scKeyOnlyQuery , keyOnlyQuery );
387430 assertTrue (keyOnlyResults .hasNext ());
388431 assertEquals (KEY1 , keyOnlyResults .next ());
389432 assertFalse (keyOnlyResults .hasNext ());
@@ -392,12 +435,15 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
392435 Query .newGqlQueryBuilder (ResultType .PROJECTION_ENTITY , "select __key__ from " + KIND1 )
393436 .setNamespace (NAMESPACE )
394437 .build ();
395- QueryResults <ProjectionEntity > keyProjectionResult = DATASTORE .run (keyProjectionQuery );
396- while (Iterators .size (keyProjectionResult ) < 1 ) {
397- Thread .sleep (500 );
398- keyProjectionResult = DATASTORE .run (keyProjectionQuery );
399- }
400- keyProjectionResult = DATASTORE .run (keyProjectionQuery );
438+ Query <ProjectionEntity > scKeyProjectionQuery =
439+ Query .newProjectionEntityQueryBuilder ()
440+ .setNamespace (NAMESPACE )
441+ .setKind (KIND1 )
442+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
443+ .build ();
444+
445+ Iterator <ProjectionEntity > keyProjectionResult =
446+ getStronglyConsistentResults (scKeyProjectionQuery , keyProjectionQuery );
401447 assertTrue (keyProjectionResult .hasNext ());
402448 ProjectionEntity projectionEntity = keyProjectionResult .next ();
403449 assertEquals (KEY1 , projectionEntity .getKey ());
@@ -411,28 +457,32 @@ public void testRunGqlQueryWithCasting() throws InterruptedException {
411457 Query <Entity > query1 = (Query <Entity >) Query .newGqlQueryBuilder ("select * from " + KIND1 )
412458 .setNamespace (NAMESPACE )
413459 .build ();
414- QueryResults <Entity > results1 = DATASTORE .run (query1 );
415- while (Iterators .size (results1 ) < 1 ) {
416- Thread .sleep (500 );
417- results1 = DATASTORE .run (query1 );
418- }
419- results1 = DATASTORE .run (query1 );
460+ Query <Entity > scQuery1 =
461+ Query .newEntityQueryBuilder ()
462+ .setNamespace (NAMESPACE )
463+ .setKind (KIND1 )
464+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
465+ .build ();
466+ Iterator <Entity > results1 = getStronglyConsistentResults (scQuery1 ,query1 );
420467 assertTrue (results1 .hasNext ());
421468 assertEquals (ENTITY1 , results1 .next ());
422469 assertFalse (results1 .hasNext ());
423470
424471 Query <?> query2 = Query .newGqlQueryBuilder ("select * from " + KIND1 )
425472 .setNamespace (NAMESPACE )
426473 .build ();
474+
427475 QueryResults <?> results2 = DATASTORE .run (query2 );
428- while (Iterators .size (results2 ) < 1 ) {
429- Thread .sleep (500 );
430- results2 = DATASTORE .run (query2 );
431- }
432- results2 = DATASTORE .run (query1 );
476+
433477 assertSame (Entity .class , results2 .getResultClass ());
434- @ SuppressWarnings ("unchecked" )
435- QueryResults <Entity > results3 = (QueryResults <Entity >) results2 ;
478+
479+ Query <?> scQuery2 = Query .newEntityQueryBuilder ()
480+ .setNamespace (NAMESPACE )
481+ .setKind (KIND1 )
482+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
483+ .build ();
484+
485+ Iterator <Entity > results3 = getStronglyConsistentResults (scQuery2 , query2 );
436486 assertTrue (results3 .hasNext ());
437487 assertEquals (ENTITY1 , results3 .next ());
438488 assertFalse (results3 .hasNext ());
@@ -442,41 +492,44 @@ public void testRunGqlQueryWithCasting() throws InterruptedException {
442492 public void testRunStructuredQuery () throws InterruptedException {
443493 Query <Entity > query =
444494 Query .newEntityQueryBuilder ().setKind (KIND1 ).setOrderBy (OrderBy .asc ("__key__" )).build ();
445- QueryResults <Entity > results1 = DATASTORE .run (query );
446- while (Iterators .size (results1 ) < 1 ) {
447- Thread .sleep (500 );
448- results1 = DATASTORE .run (query );
449- }
450- results1 = DATASTORE .run (query );
495+
496+ Query <Entity > scQuery = Query .newEntityQueryBuilder ()
497+ .setKind (KIND1 )
498+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
499+ .setOrderBy (OrderBy .asc ("__key__" ))
500+ .build ();
501+
502+ Iterator <Entity > results1 = getStronglyConsistentResults (scQuery , query );
503+
451504 assertTrue (results1 .hasNext ());
452505 assertEquals (ENTITY1 , results1 .next ());
453506 assertFalse (results1 .hasNext ());
454507
455508 Query <Key > keyOnlyQuery = Query .newKeyQueryBuilder ().setKind (KIND1 ).build ();
456- QueryResults <Key > results2 = DATASTORE .run (keyOnlyQuery );
457- while (Iterators .size (results2 ) < 1 ) {
458- Thread .sleep (500 );
459- results2 = DATASTORE .run (keyOnlyQuery );
460- }
461- results2 = DATASTORE .run (keyOnlyQuery );
509+ Query <Key > scKeyOnlyQuery = Query .newKeyQueryBuilder ().setKind (KIND1 )
510+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
511+ .build ();
512+
513+ Iterator <Key > results2 = getStronglyConsistentResults (scKeyOnlyQuery , keyOnlyQuery );
462514 assertTrue (results2 .hasNext ());
463515 assertEquals (ENTITY1 .getKey (), results2 .next ());
464516 assertFalse (results2 .hasNext ());
465517
466518 StructuredQuery <ProjectionEntity > keyOnlyProjectionQuery =
467519 Query .newProjectionEntityQueryBuilder ()
468520 .setKind (KIND1 ).setProjection ("__key__" ).build ();
469- QueryResults <ProjectionEntity > results3 = DATASTORE .run (keyOnlyProjectionQuery );
470- while (Iterators .size (results3 ) < 1 ) {
471- Thread .sleep (500 );
472- results3 = DATASTORE .run (keyOnlyProjectionQuery );
473- }
474- results3 = DATASTORE .run (keyOnlyProjectionQuery );
521+ StructuredQuery <ProjectionEntity > scKeyOnlyProjectionQuery = Query .newProjectionEntityQueryBuilder ()
522+ .setKind (KIND1 )
523+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
524+ .setProjection ("__key__" )
525+ .build ();
526+ Iterator <ProjectionEntity > results3 =
527+ getStronglyConsistentResults (scKeyOnlyProjectionQuery , keyOnlyProjectionQuery );
475528 assertTrue (results3 .hasNext ());
476529 ProjectionEntity projectionEntity = results3 .next ();
477530 assertEquals (ENTITY1 .getKey (), projectionEntity .getKey ());
478531 assertTrue (projectionEntity .getNames ().isEmpty ());
479- assertFalse (results2 .hasNext ());
532+ assertFalse (results3 .hasNext ());
480533
481534 StructuredQuery <ProjectionEntity > projectionQuery = Query .newProjectionEntityQueryBuilder ()
482535 .setKind (KIND2 )
@@ -487,12 +540,17 @@ public void testRunStructuredQuery() throws InterruptedException {
487540 .setLimit (10 )
488541 .build ();
489542
490- QueryResults <ProjectionEntity > results4 = DATASTORE .run (projectionQuery );
491- while (Iterators .size (results4 ) < 1 ) {
492- Thread .sleep (500 );
493- results4 = DATASTORE .run (projectionQuery );
494- }
495- results4 = DATASTORE .run (projectionQuery );
543+ StructuredQuery <ProjectionEntity > scProjectionQuery = Query .newProjectionEntityQueryBuilder ()
544+ .setKind (KIND2 )
545+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
546+ .setProjection ("age" )
547+ .setFilter (PropertyFilter .gt ("age" , 18 ))
548+ .setDistinctOn ("age" )
549+ .setOrderBy (OrderBy .asc ("age" ))
550+ .setLimit (10 )
551+ .build ();
552+
553+ Iterator <ProjectionEntity > results4 = getStronglyConsistentResults (scProjectionQuery , projectionQuery );
496554 assertTrue (results4 .hasNext ());
497555 ProjectionEntity entity = results4 .next ();
498556 assertEquals (ENTITY2 .getKey (), entity .getKey ());
0 commit comments