Skip to content

Commit 7eb3d24

Browse files
committed
Add RepeatableReadMultiThreadClientTest
Add RepeatableReadMultiThreadClientTest to verify transactions have repeatable read.
1 parent 7eabdf7 commit 7eb3d24

File tree

2 files changed

+190
-0
lines changed

2 files changed

+190
-0
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* RepeatableReadMultiThreadClientTest
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
package com.apple.foundationdb;
21+
22+
import java.util.Collection;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
import com.apple.foundationdb.tuple.Tuple;
27+
28+
import org.junit.jupiter.api.Assertions;
29+
import org.junit.jupiter.api.Assumptions;
30+
31+
/**
32+
* This test verify transcations have repeatable read.
33+
* 1 First set initialValue to key.
34+
* 2 Have transactions to read the key and verify the initialValue in a loop, if it does not
35+
* see the initialValue as the value, it set the flag to false.
36+
*
37+
* 3 Then have new transactions set the value and then read to verify the new value is set,
38+
* if it does not read the new value, set the flag to false.
39+
*
40+
* 4 Verify that old transactions have not finished when new transactions have finished,
41+
* then verify old transactions does not have false flag -- it means that old transactions
42+
* are still seeting the initialValue even after new transactions set them to a new value.
43+
*/
44+
public class RepeatableReadMultiThreadClientTest {
45+
public static final MultiClientHelper clientHelper = new MultiClientHelper();
46+
47+
private static final int oldValueReadCount = 30;
48+
private static final int threadPerDB = 5;
49+
50+
private static final String key = "foo";
51+
private static final String initialValue = "bar";
52+
private static final String newValue = "cool";
53+
private static final Map<Thread, OldValueReader> threadToOldValueReaders = new HashMap<>();
54+
55+
public static void main(String[] args) throws Exception {
56+
FDB fdb = FDB.selectAPIVersion(630);
57+
setupThreads(fdb);
58+
Collection<Database> dbs = clientHelper.openDatabases(fdb); // the clientHelper will close the databases for us
59+
System.out.println("Starting tests");
60+
setup(dbs);
61+
System.out.println("Start processing and validating");
62+
readOldValue(dbs);
63+
setNewValueAndRead(dbs);
64+
System.out.println("Test finished");
65+
}
66+
67+
private static synchronized void setupThreads(FDB fdb) {
68+
int clientThreadsPerVersion = clientHelper.readClusterFromEnv().length;
69+
fdb.options().setClientThreadsPerVersion(clientThreadsPerVersion);
70+
System.out.printf("thread per version is %d\n", clientThreadsPerVersion);
71+
fdb.options().setExternalClientDirectory("/var/dynamic-conf/lib");
72+
fdb.options().setTraceEnable("/tmp");
73+
fdb.options().setKnob("min_trace_severity=5");
74+
}
75+
76+
private static void setup(Collection<Database> dbs) {
77+
// 0 -> 1 -> 2 -> 3 -> 0
78+
for (Database db : dbs) {
79+
db.run(tr -> {
80+
tr.set(Tuple.from(key).pack(), Tuple.from(initialValue).pack());
81+
return null;
82+
});
83+
}
84+
}
85+
86+
private static void readOldValue(Collection<Database> dbs) throws InterruptedException {
87+
for (Database db : dbs) {
88+
for (int i = 0; i < threadPerDB; i++) {
89+
final OldValueReader oldValueReader = new OldValueReader(db);
90+
final Thread thread = new Thread(OldValueReader.create(db));
91+
thread.start();
92+
threadToOldValueReaders.put(thread, oldValueReader);
93+
}
94+
}
95+
}
96+
97+
private static void setNewValueAndRead(Collection<Database> dbs) throws InterruptedException {
98+
// threads running NewValueReader need to wait for threads to start first who run OldValueReader
99+
Thread.sleep(1000);
100+
final Map<Thread, NewValueReader> threads = new HashMap<>();
101+
for (Database db : dbs) {
102+
for (int i = 0; i < threadPerDB; i++) {
103+
final NewValueReader newValueReader = new NewValueReader(db);
104+
final Thread thread = new Thread(NewValueReader.create(db));
105+
thread.start();
106+
threads.put(thread, newValueReader);
107+
}
108+
}
109+
110+
for (Map.Entry<Thread, NewValueReader> entry : threads.entrySet()) {
111+
entry.getKey().join();
112+
Assertions.assertTrue(entry.getValue().succeed, "new value reader failed to read the correct value");
113+
}
114+
115+
for (Map.Entry<Thread, OldValueReader> entry : threadToOldValueReaders.entrySet()) {
116+
Assumptions.assumeTrue(entry.getKey().isAlive(), "Old value reader finished too soon, cannot verify repeatable read, succeed is " + entry.getValue().succeed);
117+
}
118+
119+
for (Map.Entry<Thread, OldValueReader> entry : threadToOldValueReaders.entrySet()) {
120+
entry.getKey().join();
121+
Assertions.assertTrue(entry.getValue().succeed, "old value reader failed to read the correct value");
122+
}
123+
}
124+
125+
public static class OldValueReader implements Runnable {
126+
127+
private final Database db;
128+
private boolean succeed;
129+
130+
private OldValueReader(Database db) {
131+
this.db = db;
132+
this.succeed = true;
133+
}
134+
135+
public static OldValueReader create(Database db) {
136+
return new OldValueReader(db);
137+
}
138+
139+
@Override
140+
public void run() {
141+
db.run(tr -> {
142+
try {
143+
for (int i = 0; i < oldValueReadCount; i++) {
144+
byte[] result = tr.get(Tuple.from(key).pack()).join();
145+
String value = Tuple.fromBytes(result).getString(0);
146+
if (!initialValue.equals(value)) {
147+
succeed = false;
148+
break;
149+
}
150+
Thread.sleep(100);
151+
}
152+
}
153+
catch (Exception e) {
154+
succeed = false;
155+
}
156+
return null;
157+
});
158+
}
159+
}
160+
161+
public static class NewValueReader implements Runnable {
162+
private final Database db;
163+
private boolean succeed;
164+
165+
public NewValueReader(Database db) {
166+
this.db = db;
167+
this.succeed = true;
168+
}
169+
170+
public static NewValueReader create(Database db) {
171+
return new NewValueReader(db);
172+
}
173+
174+
@Override
175+
public void run() {
176+
db.run(tr -> {
177+
tr.set(Tuple.from(key).pack(), Tuple.from(newValue).pack());
178+
return null;
179+
});
180+
String value = db.run(tr -> {
181+
byte[] result = tr.get(Tuple.from(key).pack()).join();
182+
return Tuple.fromBytes(result).getString(0);
183+
});
184+
if (!newValue.equals(value)) {
185+
succeed = false;
186+
}
187+
}
188+
}
189+
}

bindings/java/src/tests.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ set(JAVA_INTEGRATION_TESTS
5050
src/integration/com/apple/foundationdb/BasicMultiClientIntegrationTest.java
5151
src/integration/com/apple/foundationdb/CycleMultiClientIntegrationTest.java
5252
src/integration/com/apple/foundationdb/SidebandMultiThreadClientTest.java
53+
src/integration/com/apple/foundationdb/RepeatableReadMultiThreadClientTest.java
5354
)
5455

5556
# Resources that are used in integration testing, but are not explicitly test files (JUnit rules,

0 commit comments

Comments
 (0)