Skip to content

Commit 6eef80c

Browse files
committed
force query results to become strongly consistent
1 parent afdb204 commit 6eef80c

1 file changed

Lines changed: 141 additions & 83 deletions

File tree

google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java

Lines changed: 141 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -57,19 +57,20 @@
5757
import com.google.cloud.datastore.Value;
5858
import com.google.cloud.datastore.ValueType;
5959
import com.google.cloud.datastore.testing.RemoteDatastoreHelper;
60-
import com.google.common.collect.Iterators;
61-
60+
import com.google.common.base.Preconditions;
61+
import java.util.ArrayList;
62+
import java.util.Collections;
63+
import java.util.HashSet;
64+
import java.util.Iterator;
65+
import java.util.List;
66+
import java.util.Set;
6267
import org.junit.After;
6368
import org.junit.AfterClass;
6469
import org.junit.Before;
6570
import org.junit.Rule;
6671
import org.junit.Test;
6772
import org.junit.rules.Timeout;
6873

69-
import java.util.Collections;
70-
import java.util.Iterator;
71-
import java.util.List;
72-
7374
public class ITDatastoreTest {
7475

7576
private static final RemoteDatastoreHelper HELPER = RemoteDatastoreHelper.create();
@@ -84,8 +85,9 @@ public class ITDatastoreTest {
8485
private static final StringValue STR_VALUE = StringValue.of("str");
8586
private static final BooleanValue BOOL_VALUE = BooleanValue.newBuilder(false)
8687
.setExcludeFromIndexes(true).build();
88+
private static final Key ROOT_KEY = Key.newBuilder(PROJECT_ID, "rootkey", "default").setNamespace(NAMESPACE).build();
8789
private static final IncompleteKey INCOMPLETE_KEY1 =
88-
IncompleteKey.newBuilder(PROJECT_ID, KIND1).setNamespace(NAMESPACE).build();
90+
IncompleteKey.newBuilder(ROOT_KEY, KIND1).setNamespace(NAMESPACE).build();
8991
private static final IncompleteKey INCOMPLETE_KEY2 =
9092
IncompleteKey.newBuilder(PROJECT_ID, KIND2).setNamespace(NAMESPACE).build();
9193
private static final Key KEY1 = Key.newBuilder(INCOMPLETE_KEY1, "name").build();
@@ -137,6 +139,45 @@ public class ITDatastoreTest {
137139
private static final Entity ENTITY3 = Entity.newBuilder(ENTITY1).setKey(KEY3).remove("str")
138140
.set("null", NULL_VALUE).set("partial1", PARTIAL_ENTITY2).set("partial2", ENTITY2).build();
139141

142+
private <T> Iterator<T> getStronglyConsistentResults (Query scQuery, Query query) {
143+
QueryResults<T> scResults = DATASTORE.run(scQuery);
144+
List<T> scResultsCopy = makeResultsCopy(scResults);
145+
Set<T> scResultsSet = new HashSet<>(scResultsCopy);
146+
147+
while(true) {
148+
QueryResults<T> results = DATASTORE.run(query);
149+
List<T> resultsCopy = makeResultsCopy(results);
150+
if (!haveSameSize(scResultsCopy.iterator(), resultsCopy.iterator())) {
151+
continue;
152+
}
153+
for (T res: resultsCopy) {
154+
if (! scResultsSet.contains(res)) {
155+
continue;
156+
}
157+
}
158+
return resultsCopy.iterator();
159+
}
160+
}
161+
162+
private <T> List<T> makeResultsCopy(QueryResults<T> scResults) {
163+
Preconditions.checkNotNull(scResults);
164+
List<T> results = new ArrayList<>();
165+
while (scResults.hasNext()) {
166+
results.add(scResults.next());
167+
}
168+
return results;
169+
}
170+
171+
private <T, S> boolean haveSameSize(Iterator<T> it1, Iterator<S> it2) {
172+
Preconditions.checkNotNull(it1);
173+
Preconditions.checkNotNull(it2);
174+
while(it1.hasNext() && it2.hasNext()) {
175+
it1.next();
176+
it2.next();
177+
}
178+
return !it1.hasNext() && !it2.hasNext();
179+
}
180+
140181
@Rule
141182
public Timeout globalTimeout = Timeout.seconds(100);
142183

@@ -219,11 +260,6 @@ public void testTransactionWithQuery() throws InterruptedException {
219260
.build();
220261
Transaction transaction = DATASTORE.newTransaction();
221262
QueryResults<Entity> results = transaction.run(query);
222-
while (Iterators.size(results) < 1) {
223-
Thread.sleep(500);
224-
results = DATASTORE.run(query);
225-
}
226-
results = DATASTORE.run(query);
227263
assertTrue(results.hasNext());
228264
assertEquals(ENTITY2, results.next());
229265
assertFalse(results.hasNext());
@@ -233,11 +269,6 @@ public void testTransactionWithQuery() throws InterruptedException {
233269

234270
transaction = DATASTORE.newTransaction();
235271
results = transaction.run(query);
236-
while (Iterators.size(results) < 1) {
237-
Thread.sleep(500);
238-
results = DATASTORE.run(query);
239-
}
240-
results = DATASTORE.run(query);
241272
assertTrue(results.hasNext());
242273
assertEquals(ENTITY2, results.next());
243274
assertFalse(results.hasNext());
@@ -341,12 +372,14 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
341372
Query<Entity> query1 = Query.newGqlQueryBuilder(ResultType.ENTITY, "select * from " + KIND1)
342373
.setNamespace(NAMESPACE)
343374
.build();
344-
QueryResults<Entity> results1 = DATASTORE.run(query1);
345-
while (Iterators.size(results1) < 1) {
346-
Thread.sleep(500);
347-
results1 = DATASTORE.run(query1);
348-
}
349-
results1 = DATASTORE.run(query1);
375+
Query<Entity> scQuery1 = Query.newEntityQueryBuilder()
376+
.setNamespace(NAMESPACE)
377+
.setKind(KIND1)
378+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
379+
.build();
380+
381+
Iterator<Entity> results1 = getStronglyConsistentResults(scQuery1, query1);
382+
350383
assertTrue(results1.hasNext());
351384
assertEquals(ENTITY1, results1.next());
352385
assertFalse(results1.hasNext());
@@ -356,34 +389,44 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
356389
Query.newGqlQueryBuilder(ResultType.ENTITY, "select * from " + KIND2 + " order by __key__")
357390
.setNamespace(NAMESPACE)
358391
.build();
359-
QueryResults<? extends Entity> results2 = DATASTORE.run(query2);
360-
while (Iterators.size(results2) < 2) {
361-
Thread.sleep(500);
362-
results2 = DATASTORE.run(query2);
363-
}
364-
results2 = DATASTORE.run(query2);
392+
Query<? extends Entity> scQuery2 = Query.newEntityQueryBuilder()
393+
.setNamespace(NAMESPACE)
394+
.setKind(KIND2)
395+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
396+
.setOrderBy(OrderBy.asc("__key__"))
397+
.build();
398+
399+
Iterator<Entity> results2 = getStronglyConsistentResults(scQuery2, query2);
365400
assertTrue(results2.hasNext());
366401
assertEquals(ENTITY2, results2.next());
367402
assertTrue(results2.hasNext());
368403
assertEquals(ENTITY3, results2.next());
369404
assertFalse(results2.hasNext());
370405

371-
query1 = Query.newGqlQueryBuilder(ResultType.ENTITY, "select * from bla")
372-
.setNamespace(NAMESPACE)
373-
.build();
374-
results1 = DATASTORE.run(query1);
406+
query1 =
407+
Query.newGqlQueryBuilder(ResultType.ENTITY, "select * from bla")
408+
.setNamespace(NAMESPACE)
409+
.build();
410+
scQuery1 =
411+
Query.newEntityQueryBuilder()
412+
.setNamespace(NAMESPACE)
413+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
414+
.setKind("bla")
415+
.build();
416+
results1 = getStronglyConsistentResults(scQuery1, query1);
375417
assertFalse(results1.hasNext());
376418

377419
Query<Key> keyOnlyQuery =
378420
Query.newGqlQueryBuilder(ResultType.KEY, "select __key__ from " + KIND1)
379421
.setNamespace(NAMESPACE)
380422
.build();
381-
QueryResults<Key> keyOnlyResults = DATASTORE.run(keyOnlyQuery);
382-
while (Iterators.size(keyOnlyResults) < 1) {
383-
Thread.sleep(500);
384-
keyOnlyResults = DATASTORE.run(keyOnlyQuery);
385-
}
386-
keyOnlyResults = DATASTORE.run(keyOnlyQuery);
423+
Query<Key> scKeyOnlyQuery =
424+
Query.newKeyQueryBuilder()
425+
.setNamespace(NAMESPACE)
426+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
427+
.setKind(KIND1)
428+
.build();
429+
Iterator<Key> keyOnlyResults = getStronglyConsistentResults(scKeyOnlyQuery, keyOnlyQuery);
387430
assertTrue(keyOnlyResults.hasNext());
388431
assertEquals(KEY1, keyOnlyResults.next());
389432
assertFalse(keyOnlyResults.hasNext());
@@ -392,12 +435,15 @@ public void testRunGqlQueryNoCasting() throws InterruptedException {
392435
Query.newGqlQueryBuilder(ResultType.PROJECTION_ENTITY, "select __key__ from " + KIND1)
393436
.setNamespace(NAMESPACE)
394437
.build();
395-
QueryResults<ProjectionEntity> keyProjectionResult = DATASTORE.run(keyProjectionQuery);
396-
while (Iterators.size(keyProjectionResult) < 1) {
397-
Thread.sleep(500);
398-
keyProjectionResult = DATASTORE.run(keyProjectionQuery);
399-
}
400-
keyProjectionResult = DATASTORE.run(keyProjectionQuery);
438+
Query<ProjectionEntity> scKeyProjectionQuery =
439+
Query.newProjectionEntityQueryBuilder()
440+
.setNamespace(NAMESPACE)
441+
.setKind(KIND1)
442+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
443+
.build();
444+
445+
Iterator<ProjectionEntity> keyProjectionResult =
446+
getStronglyConsistentResults(scKeyProjectionQuery, keyProjectionQuery);
401447
assertTrue(keyProjectionResult.hasNext());
402448
ProjectionEntity projectionEntity = keyProjectionResult.next();
403449
assertEquals(KEY1, projectionEntity.getKey());
@@ -411,28 +457,32 @@ public void testRunGqlQueryWithCasting() throws InterruptedException {
411457
Query<Entity> query1 = (Query<Entity>) Query.newGqlQueryBuilder("select * from " + KIND1)
412458
.setNamespace(NAMESPACE)
413459
.build();
414-
QueryResults<Entity> results1 = DATASTORE.run(query1);
415-
while (Iterators.size(results1) < 1) {
416-
Thread.sleep(500);
417-
results1 = DATASTORE.run(query1);
418-
}
419-
results1 = DATASTORE.run(query1);
460+
Query<Entity> scQuery1 =
461+
Query.newEntityQueryBuilder()
462+
.setNamespace(NAMESPACE)
463+
.setKind(KIND1)
464+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
465+
.build();
466+
Iterator<Entity> results1 = getStronglyConsistentResults(scQuery1,query1);
420467
assertTrue(results1.hasNext());
421468
assertEquals(ENTITY1, results1.next());
422469
assertFalse(results1.hasNext());
423470

424471
Query<?> query2 = Query.newGqlQueryBuilder("select * from " + KIND1)
425472
.setNamespace(NAMESPACE)
426473
.build();
474+
427475
QueryResults<?> results2 = DATASTORE.run(query2);
428-
while (Iterators.size(results2) < 1) {
429-
Thread.sleep(500);
430-
results2 = DATASTORE.run(query2);
431-
}
432-
results2 = DATASTORE.run(query1);
476+
433477
assertSame(Entity.class, results2.getResultClass());
434-
@SuppressWarnings("unchecked")
435-
QueryResults<Entity> results3 = (QueryResults<Entity>) results2;
478+
479+
Query<?> scQuery2 = Query.newEntityQueryBuilder()
480+
.setNamespace(NAMESPACE)
481+
.setKind(KIND1)
482+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
483+
.build();
484+
485+
Iterator<Entity> results3 = getStronglyConsistentResults(scQuery2, query2);
436486
assertTrue(results3.hasNext());
437487
assertEquals(ENTITY1, results3.next());
438488
assertFalse(results3.hasNext());
@@ -442,41 +492,44 @@ public void testRunGqlQueryWithCasting() throws InterruptedException {
442492
public void testRunStructuredQuery() throws InterruptedException {
443493
Query<Entity> query =
444494
Query.newEntityQueryBuilder().setKind(KIND1).setOrderBy(OrderBy.asc("__key__")).build();
445-
QueryResults<Entity> results1 = DATASTORE.run(query);
446-
while (Iterators.size(results1) < 1) {
447-
Thread.sleep(500);
448-
results1 = DATASTORE.run(query);
449-
}
450-
results1 = DATASTORE.run(query);
495+
496+
Query<Entity> scQuery = Query.newEntityQueryBuilder()
497+
.setKind(KIND1)
498+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
499+
.setOrderBy(OrderBy.asc("__key__"))
500+
.build();
501+
502+
Iterator<Entity> results1 = getStronglyConsistentResults(scQuery, query);
503+
451504
assertTrue(results1.hasNext());
452505
assertEquals(ENTITY1, results1.next());
453506
assertFalse(results1.hasNext());
454507

455508
Query<Key> keyOnlyQuery = Query.newKeyQueryBuilder().setKind(KIND1).build();
456-
QueryResults<Key> results2 = DATASTORE.run(keyOnlyQuery);
457-
while (Iterators.size(results2) < 1) {
458-
Thread.sleep(500);
459-
results2 = DATASTORE.run(keyOnlyQuery);
460-
}
461-
results2 = DATASTORE.run(keyOnlyQuery);
509+
Query<Key> scKeyOnlyQuery = Query.newKeyQueryBuilder().setKind(KIND1)
510+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
511+
.build();
512+
513+
Iterator<Key> results2 = getStronglyConsistentResults(scKeyOnlyQuery, keyOnlyQuery);
462514
assertTrue(results2.hasNext());
463515
assertEquals(ENTITY1.getKey(), results2.next());
464516
assertFalse(results2.hasNext());
465517

466518
StructuredQuery<ProjectionEntity> keyOnlyProjectionQuery =
467519
Query.newProjectionEntityQueryBuilder()
468520
.setKind(KIND1).setProjection("__key__").build();
469-
QueryResults<ProjectionEntity> results3 = DATASTORE.run(keyOnlyProjectionQuery);
470-
while (Iterators.size(results3) < 1) {
471-
Thread.sleep(500);
472-
results3 = DATASTORE.run(keyOnlyProjectionQuery);
473-
}
474-
results3 = DATASTORE.run(keyOnlyProjectionQuery);
521+
StructuredQuery<ProjectionEntity> scKeyOnlyProjectionQuery = Query.newProjectionEntityQueryBuilder()
522+
.setKind(KIND1)
523+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
524+
.setProjection("__key__")
525+
.build();
526+
Iterator<ProjectionEntity> results3 =
527+
getStronglyConsistentResults(scKeyOnlyProjectionQuery, keyOnlyProjectionQuery);
475528
assertTrue(results3.hasNext());
476529
ProjectionEntity projectionEntity = results3.next();
477530
assertEquals(ENTITY1.getKey(), projectionEntity.getKey());
478531
assertTrue(projectionEntity.getNames().isEmpty());
479-
assertFalse(results2.hasNext());
532+
assertFalse(results3.hasNext());
480533

481534
StructuredQuery<ProjectionEntity> projectionQuery = Query.newProjectionEntityQueryBuilder()
482535
.setKind(KIND2)
@@ -487,12 +540,17 @@ public void testRunStructuredQuery() throws InterruptedException {
487540
.setLimit(10)
488541
.build();
489542

490-
QueryResults<ProjectionEntity> results4 = DATASTORE.run(projectionQuery);
491-
while (Iterators.size(results4) < 1) {
492-
Thread.sleep(500);
493-
results4 = DATASTORE.run(projectionQuery);
494-
}
495-
results4 = DATASTORE.run(projectionQuery);
543+
StructuredQuery<ProjectionEntity> scProjectionQuery = Query.newProjectionEntityQueryBuilder()
544+
.setKind(KIND2)
545+
.setFilter(PropertyFilter.hasAncestor(ROOT_KEY))
546+
.setProjection("age")
547+
.setFilter(PropertyFilter.gt("age", 18))
548+
.setDistinctOn("age")
549+
.setOrderBy(OrderBy.asc("age"))
550+
.setLimit(10)
551+
.build();
552+
553+
Iterator<ProjectionEntity> results4 = getStronglyConsistentResults(scProjectionQuery, projectionQuery);
496554
assertTrue(results4.hasNext());
497555
ProjectionEntity entity = results4.next();
498556
assertEquals(ENTITY2.getKey(), entity.getKey());

0 commit comments

Comments
 (0)