Skip to content

Commit f023d07

Browse files
authored
force query results to become strongly consistent (#1806)
* force query results to become strongly consistent * fix for loop bug * address comments, add maxAttempts and refactor loop * fix typo
1 parent 8254f51 commit f023d07

1 file changed

Lines changed: 135 additions & 83 deletions

File tree

google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java

Lines changed: 135 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -57,19 +57,20 @@
5757
import com.google.cloud.datastore.Value;
5858
import com.google.cloud.datastore.ValueType;
5959
import com.google.cloud.datastore.testing.RemoteDatastoreHelper;
60-
import com.google.common.collect.Iterators;
61-
60+
import com.google.common.base.Preconditions;
61+
import java.util.ArrayList;
62+
import java.util.Collections;
63+
import java.util.HashSet;
64+
import java.util.Iterator;
65+
import java.util.List;
66+
import java.util.Set;
6267
import org.junit.After;
6368
import org.junit.AfterClass;
6469
import org.junit.Before;
6570
import org.junit.Rule;
6671
import org.junit.Test;
6772
import org.junit.rules.Timeout;
6873

69-
import java.util.Collections;
70-
import java.util.Iterator;
71-
import java.util.List;
72-
7374
public class ITDatastoreTest {
7475

7576
private static final RemoteDatastoreHelper HELPER = RemoteDatastoreHelper.create();
@@ -84,8 +85,9 @@ public class ITDatastoreTest {
8485
private static final StringValue STR_VALUE = StringValue.of("str");
8586
private static final BooleanValue BOOL_VALUE = BooleanValue.newBuilder(false)
8687
.setExcludeFromIndexes(true).build();
88+
private static final Key ROOT_KEY = Key.newBuilder(PROJECT_ID, "rootkey", "default").setNamespace(NAMESPACE).build();
8789
private static final IncompleteKey INCOMPLETE_KEY1 =
88-
IncompleteKey.newBuilder(PROJECT_ID, KIND1).setNamespace(NAMESPACE).build();
90+
IncompleteKey.newBuilder(ROOT_KEY, KIND1).setNamespace(NAMESPACE).build();
8991
private static final IncompleteKey INCOMPLETE_KEY2 =
9092
IncompleteKey.newBuilder(PROJECT_ID, KIND2).setNamespace(NAMESPACE).build();
9193
private static final Key KEY1 = Key.newBuilder(INCOMPLETE_KEY1, "name").build();
@@ -137,6 +139,38 @@ public class ITDatastoreTest {
137139
private static final Entity ENTITY3 = Entity.newBuilder(ENTITY1).setKey(KEY3).remove("str")
138140
.set("null", NULL_VALUE).set("partial1", PARTIAL_ENTITY2).set("partial2", ENTITY2).build();
139141

142+
private <T> Iterator<T> getStronglyConsistentResults (Query scQuery, Query query) throws InterruptedException {
143+
//scQuery is equivalent to query, but with an ancestor filter in it
144+
//this makes scQuery strongly consistent
145+
QueryResults<T> scResults = DATASTORE.run(scQuery);
146+
List<T> scResultsCopy = makeResultsCopy(scResults);
147+
Set<T> scResultsSet = new HashSet<>(scResultsCopy);
148+
int maxAttempts = 20;
149+
150+
while(maxAttempts > 0) {
151+
--maxAttempts;
152+
QueryResults<T> results = DATASTORE.run(query);
153+
List<T> resultsCopy = makeResultsCopy(results);
154+
Set<T> resultsSet = new HashSet<>(resultsCopy);
155+
if (scResultsSet.size() == resultsSet.size()
156+
&& scResultsSet.containsAll(resultsSet)) {
157+
return resultsCopy.iterator();
158+
}
159+
Thread.sleep(500);
160+
}
161+
162+
throw new RuntimeException("reached max number of attempts to get strongly consistent results.");
163+
}
164+
165+
private <T> List<T> makeResultsCopy(QueryResults<T> scResults) {
166+
Preconditions.checkNotNull(scResults);
167+
List<T> results = new ArrayList<>();
168+
while (scResults.hasNext()) {
169+
results.add(scResults.next());
170+
}
171+
return results;
172+
}
173+
140174
@Rule
141175
public Timeout globalTimeout = Timeout.seconds(100);
142176

@@ -219,11 +253,6 @@ public void testTransactionWithQuery() throws InterruptedException {
219253
.build();
220254
Transaction transaction = DATASTORE.newTransaction();
221255
QueryResults<Entity> results = transaction.run(query);
222-
while (Iterators.size(results) < 1) {
223-
Thread.sleep(500);
224-
results = DATASTORE.run(query);
225-
}
226-
results = DATASTORE.run(query);
227256
assertTrue(results.hasNext());
228257
assertEquals(ENTITY2, results.next());
229258
assertFalse(results.hasNext());
@@ -233,11 +262,6 @@ public void testTransactionWithQuery() throws InterruptedException {
233262

234263
transaction = DATASTORE.newTransaction();
235264
results = transaction.run(query);
236-
while (Iterators.size(results) < 1) {
237-
Thread.sleep(500);
238-
results = DATASTORE.run(query);
239-
}
240-
results = DATASTORE.run(query);
241265
assertTrue(results.hasNext());
242266
assertEquals(ENTITY2, results.next());
243267
assertFalse(results.hasNext());
@@ -341,12 +365,14 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
341365
Query<Entity> query1 = Query.newGqlQueryBuilder(ResultType.ENTITY, "select * from " + KIND1)
342366
.setNamespace(NAMESPACE)
343367
.build();
344-
QueryResults<Entity> results1 = DATASTORE.run(query1);
345-
while (Iterators.size(results1) < 1) {
346-
Thread.sleep(500);
347-
results1 = DATASTORE.run(query1);
348-
}
349-
results1 = DATASTORE.run(query1);
368+
Query<Entity> scQuery1 = Query.newEntityQueryBuilder()
369+
.setNamespace(NAMESPACE)
370+
.setKind(KIND1)
371+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
372+
.build();
373+
374+
Iterator<Entity> results1 = getStronglyConsistentResults(scQuery1, query1);
375+
350376
assertTrue(results1.hasNext());
351377
assertEquals(ENTITY1, results1.next());
352378
assertFalse(results1.hasNext());
@@ -356,34 +382,44 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
356382
Query.newGqlQueryBuilder(ResultType.ENTITY, "select * from " + KIND2 + " order by __key__")
357383
.setNamespace(NAMESPACE)
358384
.build();
359-
QueryResults<? extends Entity> results2 = DATASTORE.run(query2);
360-
while (Iterators.size(results2) < 2) {
361-
Thread.sleep(500);
362-
results2 = DATASTORE.run(query2);
363-
}
364-
results2 = DATASTORE.run(query2);
385+
Query<? extends Entity> scQuery2 = Query.newEntityQueryBuilder()
386+
.setNamespace(NAMESPACE)
387+
.setKind(KIND2)
388+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
389+
.setOrderBy(OrderBy.asc("__key__"))
390+
.build();
391+
392+
Iterator<Entity> results2 = getStronglyConsistentResults(scQuery2, query2);
365393
assertTrue(results2.hasNext());
366394
assertEquals(ENTITY2, results2.next());
367395
assertTrue(results2.hasNext());
368396
assertEquals(ENTITY3, results2.next());
369397
assertFalse(results2.hasNext());
370398

371-
query1 = Query.newGqlQueryBuilder(ResultType.ENTITY, "select * from bla")
372-
.setNamespace(NAMESPACE)
373-
.build();
374-
results1 = DATASTORE.run(query1);
399+
query1 =
400+
Query.newGqlQueryBuilder(ResultType.ENTITY, "select * from bla")
401+
.setNamespace(NAMESPACE)
402+
.build();
403+
scQuery1 =
404+
Query.newEntityQueryBuilder()
405+
.setNamespace(NAMESPACE)
406+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
407+
.setKind("bla")
408+
.build();
409+
results1 = getStronglyConsistentResults(scQuery1, query1);
375410
assertFalse(results1.hasNext());
376411

377412
Query<Key> keyOnlyQuery =
378413
Query.newGqlQueryBuilder(ResultType.KEY, "select __key__ from " + KIND1)
379414
.setNamespace(NAMESPACE)
380415
.build();
381-
QueryResults<Key> keyOnlyResults = DATASTORE.run(keyOnlyQuery);
382-
while (Iterators.size(keyOnlyResults) < 1) {
383-
Thread.sleep(500);
384-
keyOnlyResults = DATASTORE.run(keyOnlyQuery);
385-
}
386-
keyOnlyResults = DATASTORE.run(keyOnlyQuery);
416+
Query<Key> scKeyOnlyQuery =
417+
Query.newKeyQueryBuilder()
418+
.setNamespace(NAMESPACE)
419+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
420+
.setKind(KIND1)
421+
.build();
422+
Iterator<Key> keyOnlyResults = getStronglyConsistentResults(scKeyOnlyQuery, keyOnlyQuery);
387423
assertTrue(keyOnlyResults.hasNext());
388424
assertEquals(KEY1, keyOnlyResults.next());
389425
assertFalse(keyOnlyResults.hasNext());
@@ -392,12 +428,16 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
392428
Query.newGqlQueryBuilder(ResultType.PROJECTION_ENTITY, "select __key__ from " + KIND1)
393429
.setNamespace(NAMESPACE)
394430
.build();
395-
QueryResults<ProjectionEntity> keyProjectionResult = DATASTORE.run(keyProjectionQuery);
396-
while (Iterators.size(keyProjectionResult) < 1) {
397-
Thread.sleep(500);
398-
keyProjectionResult = DATASTORE.run(keyProjectionQuery);
399-
}
400-
keyProjectionResult = DATASTORE.run(keyProjectionQuery);
431+
Query<ProjectionEntity> scKeyProjectionQuery =
432+
Query.newProjectionEntityQueryBuilder()
433+
.addProjection("__key__")
434+
.setNamespace(NAMESPACE)
435+
.setKind(KIND1)
436+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
437+
.build();
438+
439+
Iterator<ProjectionEntity> keyProjectionResult =
440+
getStronglyConsistentResults(scKeyProjectionQuery, keyProjectionQuery);
401441
assertTrue(keyProjectionResult.hasNext());
402442
ProjectionEntity projectionEntity = keyProjectionResult.next();
403443
assertEquals(KEY1, projectionEntity.getKey());
@@ -411,28 +451,32 @@ public void testRunGqlQueryWithCasting() throws InterruptedException {
411451
Query<Entity> query1 = (Query<Entity>) Query.newGqlQueryBuilder("select * from " + KIND1)
412452
.setNamespace(NAMESPACE)
413453
.build();
414-
QueryResults<Entity> results1 = DATASTORE.run(query1);
415-
while (Iterators.size(results1) < 1) {
416-
Thread.sleep(500);
417-
results1 = DATASTORE.run(query1);
418-
}
419-
results1 = DATASTORE.run(query1);
454+
Query<Entity> scQuery1 =
455+
Query.newEntityQueryBuilder()
456+
.setNamespace(NAMESPACE)
457+
.setKind(KIND1)
458+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
459+
.build();
460+
Iterator<Entity> results1 = getStronglyConsistentResults(scQuery1,query1);
420461
assertTrue(results1.hasNext());
421462
assertEquals(ENTITY1, results1.next());
422463
assertFalse(results1.hasNext());
423464

424465
Query<?> query2 = Query.newGqlQueryBuilder("select * from " + KIND1)
425466
.setNamespace(NAMESPACE)
426467
.build();
468+
427469
QueryResults<?> results2 = DATASTORE.run(query2);
428-
while (Iterators.size(results2) < 1) {
429-
Thread.sleep(500);
430-
results2 = DATASTORE.run(query2);
431-
}
432-
results2 = DATASTORE.run(query1);
470+
433471
assertSame(Entity.class, results2.getResultClass());
434-
@SuppressWarnings("unchecked")
435-
QueryResults<Entity> results3 = (QueryResults<Entity>) results2;
472+
473+
Query<?> scQuery2 = Query.newEntityQueryBuilder()
474+
.setNamespace(NAMESPACE)
475+
.setKind(KIND1)
476+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
477+
.build();
478+
479+
Iterator<Entity> results3 = getStronglyConsistentResults(scQuery2, query2);
436480
assertTrue(results3.hasNext());
437481
assertEquals(ENTITY1, results3.next());
438482
assertFalse(results3.hasNext());
@@ -442,41 +486,44 @@ public void testRunGqlQueryWithCasting() throws InterruptedException {
442486
public void testRunStructuredQuery() throws InterruptedException {
443487
Query<Entity> query =
444488
Query.newEntityQueryBuilder().setKind(KIND1).setOrderBy(OrderBy.asc("__key__")).build();
445-
QueryResults<Entity> results1 = DATASTORE.run(query);
446-
while (Iterators.size(results1) < 1) {
447-
Thread.sleep(500);
448-
results1 = DATASTORE.run(query);
449-
}
450-
results1 = DATASTORE.run(query);
489+
490+
Query<Entity> scQuery = Query.newEntityQueryBuilder()
491+
.setKind(KIND1)
492+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
493+
.setOrderBy(OrderBy.asc("__key__"))
494+
.build();
495+
496+
Iterator<Entity> results1 = getStronglyConsistentResults(scQuery, query);
497+
451498
assertTrue(results1.hasNext());
452499
assertEquals(ENTITY1, results1.next());
453500
assertFalse(results1.hasNext());
454501

455502
Query<Key> keyOnlyQuery = Query.newKeyQueryBuilder().setKind(KIND1).build();
456-
QueryResults<Key> results2 = DATASTORE.run(keyOnlyQuery);
457-
while (Iterators.size(results2) < 1) {
458-
Thread.sleep(500);
459-
results2 = DATASTORE.run(keyOnlyQuery);
460-
}
461-
results2 = DATASTORE.run(keyOnlyQuery);
503+
Query<Key> scKeyOnlyQuery = Query.newKeyQueryBuilder().setKind(KIND1)
504+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
505+
.build();
506+
507+
Iterator<Key> results2 = getStronglyConsistentResults(scKeyOnlyQuery, keyOnlyQuery);
462508
assertTrue(results2.hasNext());
463509
assertEquals(ENTITY1.getKey(), results2.next());
464510
assertFalse(results2.hasNext());
465511

466512
StructuredQuery<ProjectionEntity> keyOnlyProjectionQuery =
467513
Query.newProjectionEntityQueryBuilder()
468514
.setKind(KIND1).setProjection("__key__").build();
469-
QueryResults<ProjectionEntity> results3 = DATASTORE.run(keyOnlyProjectionQuery);
470-
while (Iterators.size(results3) < 1) {
471-
Thread.sleep(500);
472-
results3 = DATASTORE.run(keyOnlyProjectionQuery);
473-
}
474-
results3 = DATASTORE.run(keyOnlyProjectionQuery);
515+
StructuredQuery<ProjectionEntity> scKeyOnlyProjectionQuery = Query.newProjectionEntityQueryBuilder()
516+
.setKind(KIND1)
517+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
518+
.setProjection("__key__")
519+
.build();
520+
Iterator<ProjectionEntity> results3 =
521+
getStronglyConsistentResults(scKeyOnlyProjectionQuery, keyOnlyProjectionQuery);
475522
assertTrue(results3.hasNext());
476523
ProjectionEntity projectionEntity = results3.next();
477524
assertEquals(ENTITY1.getKey(), projectionEntity.getKey());
478525
assertTrue(projectionEntity.getNames().isEmpty());
479-
assertFalse(results2.hasNext());
526+
assertFalse(results3.hasNext());
480527

481528
StructuredQuery<ProjectionEntity> projectionQuery = Query.newProjectionEntityQueryBuilder()
482529
.setKind(KIND2)
@@ -487,12 +534,17 @@ public void testRunStructuredQuery() throws InterruptedException {
487534
.setLimit(10)
488535
.build();
489536

490-
QueryResults<ProjectionEntity> results4 = DATASTORE.run(projectionQuery);
491-
while (Iterators.size(results4) < 1) {
492-
Thread.sleep(500);
493-
results4 = DATASTORE.run(projectionQuery);
494-
}
495-
results4 = DATASTORE.run(projectionQuery);
537+
StructuredQuery<ProjectionEntity> scProjectionQuery = Query.newProjectionEntityQueryBuilder()
538+
.setKind(KIND2)
539+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
540+
.setProjection("age")
541+
.setFilter(PropertyFilter.gt("age", 18))
542+
.setDistinctOn("age")
543+
.setOrderBy(OrderBy.asc("age"))
544+
.setLimit(10)
545+
.build();
546+
547+
Iterator<ProjectionEntity> results4 = getStronglyConsistentResults(scProjectionQuery, projectionQuery);
496548
assertTrue(results4.hasNext());
497549
ProjectionEntity entity = results4.next();
498550
assertEquals(ENTITY2.getKey(), entity.getKey());

0 commit comments

Comments
 (0)