5757import com .google .cloud .datastore .Value ;
5858import com .google .cloud .datastore .ValueType ;
5959import com .google .cloud .datastore .testing .RemoteDatastoreHelper ;
60- import com .google .common .collect .Iterators ;
61-
60+ import com .google .common .base .Preconditions ;
61+ import java .util .ArrayList ;
62+ import java .util .Collections ;
63+ import java .util .HashSet ;
64+ import java .util .Iterator ;
65+ import java .util .List ;
66+ import java .util .Set ;
6267import org .junit .After ;
6368import org .junit .AfterClass ;
6469import org .junit .Before ;
6570import org .junit .Rule ;
6671import org .junit .Test ;
6772import org .junit .rules .Timeout ;
6873
69- import java .util .Collections ;
70- import java .util .Iterator ;
71- import java .util .List ;
72-
7374public class ITDatastoreTest {
7475
7576 private static final RemoteDatastoreHelper HELPER = RemoteDatastoreHelper .create ();
@@ -84,8 +85,9 @@ public class ITDatastoreTest {
8485 private static final StringValue STR_VALUE = StringValue .of ("str" );
8586 private static final BooleanValue BOOL_VALUE = BooleanValue .newBuilder (false )
8687 .setExcludeFromIndexes (true ).build ();
88+ private static final Key ROOT_KEY = Key .newBuilder (PROJECT_ID , "rootkey" , "default" ).setNamespace (NAMESPACE ).build ();
8789 private static final IncompleteKey INCOMPLETE_KEY1 =
88- IncompleteKey .newBuilder (PROJECT_ID , KIND1 ).setNamespace (NAMESPACE ).build ();
90+ IncompleteKey .newBuilder (ROOT_KEY , KIND1 ).setNamespace (NAMESPACE ).build ();
8991 private static final IncompleteKey INCOMPLETE_KEY2 =
9092 IncompleteKey .newBuilder (PROJECT_ID , KIND2 ).setNamespace (NAMESPACE ).build ();
9193 private static final Key KEY1 = Key .newBuilder (INCOMPLETE_KEY1 , "name" ).build ();
@@ -137,6 +139,38 @@ public class ITDatastoreTest {
137139 private static final Entity ENTITY3 = Entity .newBuilder (ENTITY1 ).setKey (KEY3 ).remove ("str" )
138140 .set ("null" , NULL_VALUE ).set ("partial1" , PARTIAL_ENTITY2 ).set ("partial2" , ENTITY2 ).build ();
139141
142+ private <T > Iterator <T > getStronglyConsistentResults (Query scQuery , Query query ) throws InterruptedException {
143+ //scQuery is equivalent to query, but with an ancestor filter in it
144+ //this makes scQuery strongly consistent
145+ QueryResults <T > scResults = DATASTORE .run (scQuery );
146+ List <T > scResultsCopy = makeResultsCopy (scResults );
147+ Set <T > scResultsSet = new HashSet <>(scResultsCopy );
148+ int maxAttempts = 20 ;
149+
150+ while (maxAttempts > 0 ) {
151+ --maxAttempts ;
152+ QueryResults <T > results = DATASTORE .run (query );
153+ List <T > resultsCopy = makeResultsCopy (results );
154+ Set <T > resultsSet = new HashSet <>(resultsCopy );
155+ if (scResultsSet .size () == resultsSet .size ()
156+ && scResultsSet .containsAll (resultsSet )) {
157+ return resultsCopy .iterator ();
158+ }
159+ Thread .sleep (500 );
160+ }
161+
162+ throw new RuntimeException ("reached max number of attempts to get strongly consistent results." );
163+ }
164+
165+ private <T > List <T > makeResultsCopy (QueryResults <T > scResults ) {
166+ Preconditions .checkNotNull (scResults );
167+ List <T > results = new ArrayList <>();
168+ while (scResults .hasNext ()) {
169+ results .add (scResults .next ());
170+ }
171+ return results ;
172+ }
173+
140174 @ Rule
141175 public Timeout globalTimeout = Timeout .seconds (100 );
142176
@@ -219,11 +253,6 @@ public void testTransactionWithQuery() throws InterruptedException {
219253 .build ();
220254 Transaction transaction = DATASTORE .newTransaction ();
221255 QueryResults <Entity > results = transaction .run (query );
222- while (Iterators .size (results ) < 1 ) {
223- Thread .sleep (500 );
224- results = DATASTORE .run (query );
225- }
226- results = DATASTORE .run (query );
227256 assertTrue (results .hasNext ());
228257 assertEquals (ENTITY2 , results .next ());
229258 assertFalse (results .hasNext ());
@@ -233,11 +262,6 @@ public void testTransactionWithQuery() throws InterruptedException {
233262
234263 transaction = DATASTORE .newTransaction ();
235264 results = transaction .run (query );
236- while (Iterators .size (results ) < 1 ) {
237- Thread .sleep (500 );
238- results = DATASTORE .run (query );
239- }
240- results = DATASTORE .run (query );
241265 assertTrue (results .hasNext ());
242266 assertEquals (ENTITY2 , results .next ());
243267 assertFalse (results .hasNext ());
@@ -341,12 +365,14 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
341365 Query <Entity > query1 = Query .newGqlQueryBuilder (ResultType .ENTITY , "select * from " + KIND1 )
342366 .setNamespace (NAMESPACE )
343367 .build ();
344- QueryResults <Entity > results1 = DATASTORE .run (query1 );
345- while (Iterators .size (results1 ) < 1 ) {
346- Thread .sleep (500 );
347- results1 = DATASTORE .run (query1 );
348- }
349- results1 = DATASTORE .run (query1 );
368+ Query <Entity > scQuery1 = Query .newEntityQueryBuilder ()
369+ .setNamespace (NAMESPACE )
370+ .setKind (KIND1 )
371+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
372+ .build ();
373+
374+ Iterator <Entity > results1 = getStronglyConsistentResults (scQuery1 , query1 );
375+
350376 assertTrue (results1 .hasNext ());
351377 assertEquals (ENTITY1 , results1 .next ());
352378 assertFalse (results1 .hasNext ());
@@ -356,34 +382,44 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
356382 Query .newGqlQueryBuilder (ResultType .ENTITY , "select * from " + KIND2 + " order by __key__" )
357383 .setNamespace (NAMESPACE )
358384 .build ();
359- QueryResults <? extends Entity > results2 = DATASTORE .run (query2 );
360- while (Iterators .size (results2 ) < 2 ) {
361- Thread .sleep (500 );
362- results2 = DATASTORE .run (query2 );
363- }
364- results2 = DATASTORE .run (query2 );
385+ Query <? extends Entity > scQuery2 = Query .newEntityQueryBuilder ()
386+ .setNamespace (NAMESPACE )
387+ .setKind (KIND2 )
388+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
389+ .setOrderBy (OrderBy .asc ("__key__" ))
390+ .build ();
391+
392+ Iterator <Entity > results2 = getStronglyConsistentResults (scQuery2 , query2 );
365393 assertTrue (results2 .hasNext ());
366394 assertEquals (ENTITY2 , results2 .next ());
367395 assertTrue (results2 .hasNext ());
368396 assertEquals (ENTITY3 , results2 .next ());
369397 assertFalse (results2 .hasNext ());
370398
371- query1 = Query .newGqlQueryBuilder (ResultType .ENTITY , "select * from bla" )
372- .setNamespace (NAMESPACE )
373- .build ();
374- results1 = DATASTORE .run (query1 );
399+ query1 =
400+ Query .newGqlQueryBuilder (ResultType .ENTITY , "select * from bla" )
401+ .setNamespace (NAMESPACE )
402+ .build ();
403+ scQuery1 =
404+ Query .newEntityQueryBuilder ()
405+ .setNamespace (NAMESPACE )
406+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
407+ .setKind ("bla" )
408+ .build ();
409+ results1 = getStronglyConsistentResults (scQuery1 , query1 );
375410 assertFalse (results1 .hasNext ());
376411
377412 Query <Key > keyOnlyQuery =
378413 Query .newGqlQueryBuilder (ResultType .KEY , "select __key__ from " + KIND1 )
379414 .setNamespace (NAMESPACE )
380415 .build ();
381- QueryResults <Key > keyOnlyResults = DATASTORE .run (keyOnlyQuery );
382- while (Iterators .size (keyOnlyResults ) < 1 ) {
383- Thread .sleep (500 );
384- keyOnlyResults = DATASTORE .run (keyOnlyQuery );
385- }
386- keyOnlyResults = DATASTORE .run (keyOnlyQuery );
416+ Query <Key > scKeyOnlyQuery =
417+ Query .newKeyQueryBuilder ()
418+ .setNamespace (NAMESPACE )
419+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
420+ .setKind (KIND1 )
421+ .build ();
422+ Iterator <Key > keyOnlyResults = getStronglyConsistentResults (scKeyOnlyQuery , keyOnlyQuery );
387423 assertTrue (keyOnlyResults .hasNext ());
388424 assertEquals (KEY1 , keyOnlyResults .next ());
389425 assertFalse (keyOnlyResults .hasNext ());
@@ -392,12 +428,16 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
392428 Query .newGqlQueryBuilder (ResultType .PROJECTION_ENTITY , "select __key__ from " + KIND1 )
393429 .setNamespace (NAMESPACE )
394430 .build ();
395- QueryResults <ProjectionEntity > keyProjectionResult = DATASTORE .run (keyProjectionQuery );
396- while (Iterators .size (keyProjectionResult ) < 1 ) {
397- Thread .sleep (500 );
398- keyProjectionResult = DATASTORE .run (keyProjectionQuery );
399- }
400- keyProjectionResult = DATASTORE .run (keyProjectionQuery );
431+ Query <ProjectionEntity > scKeyProjectionQuery =
432+ Query .newProjectionEntityQueryBuilder ()
433+ .addProjection ("__key__" )
434+ .setNamespace (NAMESPACE )
435+ .setKind (KIND1 )
436+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
437+ .build ();
438+
439+ Iterator <ProjectionEntity > keyProjectionResult =
440+ getStronglyConsistentResults (scKeyProjectionQuery , keyProjectionQuery );
401441 assertTrue (keyProjectionResult .hasNext ());
402442 ProjectionEntity projectionEntity = keyProjectionResult .next ();
403443 assertEquals (KEY1 , projectionEntity .getKey ());
@@ -411,28 +451,32 @@ public void testRunGqlQueryWithCasting() throws InterruptedException {
411451 Query <Entity > query1 = (Query <Entity >) Query .newGqlQueryBuilder ("select * from " + KIND1 )
412452 .setNamespace (NAMESPACE )
413453 .build ();
414- QueryResults <Entity > results1 = DATASTORE .run (query1 );
415- while (Iterators .size (results1 ) < 1 ) {
416- Thread .sleep (500 );
417- results1 = DATASTORE .run (query1 );
418- }
419- results1 = DATASTORE .run (query1 );
454+ Query <Entity > scQuery1 =
455+ Query .newEntityQueryBuilder ()
456+ .setNamespace (NAMESPACE )
457+ .setKind (KIND1 )
458+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
459+ .build ();
460+ Iterator <Entity > results1 = getStronglyConsistentResults (scQuery1 ,query1 );
420461 assertTrue (results1 .hasNext ());
421462 assertEquals (ENTITY1 , results1 .next ());
422463 assertFalse (results1 .hasNext ());
423464
424465 Query <?> query2 = Query .newGqlQueryBuilder ("select * from " + KIND1 )
425466 .setNamespace (NAMESPACE )
426467 .build ();
468+
427469 QueryResults <?> results2 = DATASTORE .run (query2 );
428- while (Iterators .size (results2 ) < 1 ) {
429- Thread .sleep (500 );
430- results2 = DATASTORE .run (query2 );
431- }
432- results2 = DATASTORE .run (query1 );
470+
433471 assertSame (Entity .class , results2 .getResultClass ());
434- @ SuppressWarnings ("unchecked" )
435- QueryResults <Entity > results3 = (QueryResults <Entity >) results2 ;
472+
473+ Query <?> scQuery2 = Query .newEntityQueryBuilder ()
474+ .setNamespace (NAMESPACE )
475+ .setKind (KIND1 )
476+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
477+ .build ();
478+
479+ Iterator <Entity > results3 = getStronglyConsistentResults (scQuery2 , query2 );
436480 assertTrue (results3 .hasNext ());
437481 assertEquals (ENTITY1 , results3 .next ());
438482 assertFalse (results3 .hasNext ());
@@ -442,41 +486,44 @@ public void testRunGqlQueryWithCasting() throws InterruptedException {
442486 public void testRunStructuredQuery () throws InterruptedException {
443487 Query <Entity > query =
444488 Query .newEntityQueryBuilder ().setKind (KIND1 ).setOrderBy (OrderBy .asc ("__key__" )).build ();
445- QueryResults <Entity > results1 = DATASTORE .run (query );
446- while (Iterators .size (results1 ) < 1 ) {
447- Thread .sleep (500 );
448- results1 = DATASTORE .run (query );
449- }
450- results1 = DATASTORE .run (query );
489+
490+ Query <Entity > scQuery = Query .newEntityQueryBuilder ()
491+ .setKind (KIND1 )
492+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
493+ .setOrderBy (OrderBy .asc ("__key__" ))
494+ .build ();
495+
496+ Iterator <Entity > results1 = getStronglyConsistentResults (scQuery , query );
497+
451498 assertTrue (results1 .hasNext ());
452499 assertEquals (ENTITY1 , results1 .next ());
453500 assertFalse (results1 .hasNext ());
454501
455502 Query <Key > keyOnlyQuery = Query .newKeyQueryBuilder ().setKind (KIND1 ).build ();
456- QueryResults <Key > results2 = DATASTORE .run (keyOnlyQuery );
457- while (Iterators .size (results2 ) < 1 ) {
458- Thread .sleep (500 );
459- results2 = DATASTORE .run (keyOnlyQuery );
460- }
461- results2 = DATASTORE .run (keyOnlyQuery );
503+ Query <Key > scKeyOnlyQuery = Query .newKeyQueryBuilder ().setKind (KIND1 )
504+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
505+ .build ();
506+
507+ Iterator <Key > results2 = getStronglyConsistentResults (scKeyOnlyQuery , keyOnlyQuery );
462508 assertTrue (results2 .hasNext ());
463509 assertEquals (ENTITY1 .getKey (), results2 .next ());
464510 assertFalse (results2 .hasNext ());
465511
466512 StructuredQuery <ProjectionEntity > keyOnlyProjectionQuery =
467513 Query .newProjectionEntityQueryBuilder ()
468514 .setKind (KIND1 ).setProjection ("__key__" ).build ();
469- QueryResults <ProjectionEntity > results3 = DATASTORE .run (keyOnlyProjectionQuery );
470- while (Iterators .size (results3 ) < 1 ) {
471- Thread .sleep (500 );
472- results3 = DATASTORE .run (keyOnlyProjectionQuery );
473- }
474- results3 = DATASTORE .run (keyOnlyProjectionQuery );
515+ StructuredQuery <ProjectionEntity > scKeyOnlyProjectionQuery = Query .newProjectionEntityQueryBuilder ()
516+ .setKind (KIND1 )
517+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
518+ .setProjection ("__key__" )
519+ .build ();
520+ Iterator <ProjectionEntity > results3 =
521+ getStronglyConsistentResults (scKeyOnlyProjectionQuery , keyOnlyProjectionQuery );
475522 assertTrue (results3 .hasNext ());
476523 ProjectionEntity projectionEntity = results3 .next ();
477524 assertEquals (ENTITY1 .getKey (), projectionEntity .getKey ());
478525 assertTrue (projectionEntity .getNames ().isEmpty ());
479- assertFalse (results2 .hasNext ());
526+ assertFalse (results3 .hasNext ());
480527
481528 StructuredQuery <ProjectionEntity > projectionQuery = Query .newProjectionEntityQueryBuilder ()
482529 .setKind (KIND2 )
@@ -487,12 +534,17 @@ public void testRunStructuredQuery() throws InterruptedException {
487534 .setLimit (10 )
488535 .build ();
489536
490- QueryResults <ProjectionEntity > results4 = DATASTORE .run (projectionQuery );
491- while (Iterators .size (results4 ) < 1 ) {
492- Thread .sleep (500 );
493- results4 = DATASTORE .run (projectionQuery );
494- }
495- results4 = DATASTORE .run (projectionQuery );
537+ StructuredQuery <ProjectionEntity > scProjectionQuery = Query .newProjectionEntityQueryBuilder ()
538+ .setKind (KIND2 )
539+ .setFilter (PropertyFilter .hasAncestor (ROOT_KEY ))
540+ .setProjection ("age" )
541+ .setFilter (PropertyFilter .gt ("age" , 18 ))
542+ .setDistinctOn ("age" )
543+ .setOrderBy (OrderBy .asc ("age" ))
544+ .setLimit (10 )
545+ .build ();
546+
547+ Iterator <ProjectionEntity > results4 = getStronglyConsistentResults (scProjectionQuery , projectionQuery );
496548 assertTrue (results4 .hasNext ());
497549 ProjectionEntity entity = results4 .next ();
498550 assertEquals (ENTITY2 .getKey (), entity .getKey ());
0 commit comments