Skip to content

Commit 5bd6b1f

Browse files
committed
Add RepeatableReadMultiThreadClientTest
Add RepeatableReadMultiThreadClientTest to verify transactions have repeatable read.
1 parent 7eabdf7 commit 5bd6b1f

File tree

2 files changed

+189
-0
lines changed

2 files changed

+189
-0
lines changed
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* RepeatableReadMultiThreadClientTest
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
package com.apple.foundationdb;
21+
22+
import java.util.Collection;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
import com.apple.foundationdb.tuple.Tuple;
27+
28+
import org.junit.jupiter.api.Assertions;
29+
30+
/**
31+
* This test verify transcations have repeatable read.
32+
* 1 First set initialValue to key.
33+
* 2 Have transactions to read the key and verify the initialValue in a loop, if it does not
34+
* see the initialValue as the value, it set the flag to false.
35+
*
36+
* 3 Then have new transactions set the value and then read to verify the new value is set,
37+
* if it does not read the new value, set the flag to false.
38+
*
39+
* 4 Verify that old transactions have not finished when new transactions have finished,
40+
* then verify old transactions does not have false flag -- it means that old transactions
41+
* are still seeting the initialValue even after new transactions set them to a new value.
42+
*/
43+
public class RepeatableReadMultiThreadClientTest {
44+
public static final MultiClientHelper clientHelper = new MultiClientHelper();
45+
46+
private static final int oldValueReadCount = 30;
47+
private static final int threadPerDB = 5;
48+
49+
private static final String key = "foo";
50+
private static final String initialValue = "bar";
51+
private static final String newValue = "cool";
52+
private static final Map<Thread, OldValueReader> threadToOldValueReaders = new HashMap<>();
53+
54+
public static void main(String[] args) throws Exception {
55+
FDB fdb = FDB.selectAPIVersion(630);
56+
setupThreads(fdb);
57+
Collection<Database> dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us
58+
System.out.println("Starting tests");
59+
setup(dbs);
60+
System.out.println("Start processing and validating");
61+
readOldValue(dbs);
62+
setNewValueAndRead(dbs);
63+
System.out.println("Test finished");
64+
}
65+
66+
private static synchronized void setupThreads(FDB fdb) {
67+
int clientThreadsPerVersion = clientHelper.readClusterFromEnv().length;
68+
fdb.options().setClientThreadsPerVersion(clientThreadsPerVersion);
69+
System.out.printf("thread per version is %d\n", clientThreadsPerVersion);
70+
fdb.options().setExternalClientDirectory("/var/dynamic-conf/lib");
71+
fdb.options().setTraceEnable("/tmp");
72+
fdb.options().setKnob("min_trace_severity=5");
73+
}
74+
75+
private static void setup(Collection<Database> dbs) {
76+
// 0 -> 1 -> 2 -> 3 -> 0
77+
for (Database db : dbs) {
78+
db.run(tr -> {
79+
tr.set(Tuple.from(key).pack(), Tuple.from(initialValue).pack());
80+
return null;
81+
});
82+
}
83+
}
84+
85+
private static void readOldValue(Collection<Database> dbs) throws InterruptedException {
86+
for (Database db : dbs) {
87+
for (int i = 0; i < threadPerDB; i++) {
88+
final OldValueReader oldValueReader = new OldValueReader(db);
89+
final Thread thread = new Thread(OldValueReader.create(db));
90+
thread.start();
91+
threadToOldValueReaders.put(thread, oldValueReader);
92+
}
93+
}
94+
}
95+
96+
private static void setNewValueAndRead(Collection<Database> dbs) throws InterruptedException {
97+
// threads running NewValueReader need to wait for threads to start first who run OldValueReader
98+
Thread.sleep(1000);
99+
final Map<Thread, NewValueReader> threads = new HashMap<>();
100+
for (Database db : dbs) {
101+
for (int i = 0; i < threadPerDB; i++) {
102+
final NewValueReader newValueReader = new NewValueReader(db);
103+
final Thread thread = new Thread(NewValueReader.create(db));
104+
thread.start();
105+
threads.put(thread, newValueReader);
106+
}
107+
}
108+
109+
for (Map.Entry<Thread, NewValueReader> entry : threads.entrySet()) {
110+
entry.getKey().join();
111+
Assertions.assertTrue(entry.getValue().succeed, "new value reader failed to read the correct value");
112+
}
113+
114+
for (Map.Entry<Thread, OldValueReader> entry : threadToOldValueReaders.entrySet()) {
115+
Assertions.assertTrue(entry.getKey().isAlive(), "Old value reader finished too soon, cannot verify repeatable read, succeed is " + entry.getValue().succeed);
116+
}
117+
118+
for (Map.Entry<Thread, OldValueReader> entry : threadToOldValueReaders.entrySet()) {
119+
entry.getKey().join();
120+
Assertions.assertTrue(entry.getValue().succeed, "old value reader failed to read the correct value");
121+
}
122+
}
123+
124+
public static class OldValueReader implements Runnable {
125+
126+
private final Database db;
127+
private boolean succeed;
128+
129+
private OldValueReader(Database db) {
130+
this.db = db;
131+
this.succeed = true;
132+
}
133+
134+
public static OldValueReader create(Database db) {
135+
return new OldValueReader(db);
136+
}
137+
138+
@Override
139+
public void run() {
140+
db.run(tr -> {
141+
try {
142+
for (int i = 0; i < oldValueReadCount; i++) {
143+
byte[] result = tr.get(Tuple.from(key).pack()).join();
144+
String value = Tuple.fromBytes(result).getString(0);
145+
if (!initialValue.equals(value)) {
146+
succeed = false;
147+
break;
148+
}
149+
Thread.sleep(100);
150+
}
151+
}
152+
catch (Exception e) {
153+
succeed = false;
154+
}
155+
return null;
156+
});
157+
}
158+
}
159+
160+
public static class NewValueReader implements Runnable {
161+
private final Database db;
162+
private boolean succeed;
163+
164+
public NewValueReader(Database db) {
165+
this.db = db;
166+
this.succeed = true;
167+
}
168+
169+
public static NewValueReader create(Database db) {
170+
return new NewValueReader(db);
171+
}
172+
173+
@Override
174+
public void run() {
175+
db.run(tr -> {
176+
tr.set(Tuple.from(key).pack(), Tuple.from(newValue).pack());
177+
return null;
178+
});
179+
String value = db.run(tr -> {
180+
byte[] result = tr.get(Tuple.from(key).pack()).join();
181+
return Tuple.fromBytes(result).getString(0);
182+
});
183+
if (!newValue.equals(value)) {
184+
succeed = false;
185+
}
186+
}
187+
}
188+
}

bindings/java/src/tests.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ set(JAVA_INTEGRATION_TESTS
5050
src/integration/com/apple/foundationdb/BasicMultiClientIntegrationTest.java
5151
src/integration/com/apple/foundationdb/CycleMultiClientIntegrationTest.java
5252
src/integration/com/apple/foundationdb/SidebandMultiThreadClientTest.java
53+
src/integration/com/apple/foundationdb/RepeatableReadMultiThreadClientTest.java
5354
)
5455

5556
# Resources that are used in integration testing, but are not explicitly test files (JUnit rules,

0 commit comments

Comments
 (0)