Skip to content

Commit 5498b68

Browse files
committed
actor pair stress test reorg
1 parent 82be5ef commit 5498b68

File tree

11 files changed

+198
-273
lines changed

11 files changed

+198
-273
lines changed

cache2k-pinpoint/src/main/java/org/cache2k/pinpoint/stress/pairwise/AbstractActorPair.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

cache2k-pinpoint/src/main/java/org/cache2k/pinpoint/stress/pairwise/ActorPair.java

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,12 @@
2020
* #L%
2121
*/
2222

23+
import org.cache2k.pinpoint.NeverExecutedError;
24+
2325
/**
24-
* The two actors will be executed concurrently in tests.
25-
*
2626
* @author Jens Wilke
27-
* @param <R> the result of the actor
28-
* @see ActorPairSuite
2927
*/
30-
public interface ActorPair<R> {
28+
public interface ActorPair<R1, R2> {
3129

3230
/**
3331
* Setup or reset action executed before the actors.
@@ -36,17 +34,25 @@ public interface ActorPair<R> {
3634

3735
/**
3836
* First actor executed concurrently with actor2.
37+
* Any exceptions are propagated.
3938
*
4039
* @return outcome of the actor
4140
*/
42-
R actor1();
41+
R1 actor1();
4342

4443
/**
4544
* Second actor executed concurrently with actor1.
45+
* Any exceptions are propagated.
4646
*
4747
* @return outcome of the actor
4848
*/
49-
R actor2();
49+
R2 actor2();
50+
51+
/**
52+
* The observer can check integrity or invariants of a data structure. It
53+
* is executed concurrently to the actors. Any exceptions are propagated.
54+
*/
55+
default void observe() { throw new NeverExecutedError(); }
5056

5157
/**
5258
* Checks the outcome after both actors have finished. The method is expected
@@ -55,36 +61,6 @@ public interface ActorPair<R> {
5561
* @param r1 outcome of {@link #actor1()}
5662
* @param r2 outcome of {@link #actor2()}
5763
*/
58-
void check(R r1, R r2);
59-
60-
/**
61-
* Useful for assertions based on success of each actor.
62-
*/
63-
class SuccessTuple {
64-
65-
private final boolean success1;
66-
private final boolean success2;
67-
68-
public SuccessTuple(boolean success1, boolean success2) {
69-
this.success1 = success1;
70-
this.success2 = success2;
71-
}
72-
73-
public boolean isSuccess1() { return success1; }
74-
75-
public boolean isSuccess2() { return success2; }
76-
77-
public boolean isBothSucceed() { return success1 && success2; }
78-
79-
public boolean isOneSucceeds() { return success1 != success2; }
80-
81-
@Override
82-
public String toString() {
83-
return "SuccessTuple{" +
84-
"success1=" + success1 +
85-
", success2=" + success2 +
86-
'}';
87-
}
64+
void check(R1 r1, R2 r2);
8865

89-
}
9066
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package org.cache2k.pinpoint.stress.pairwise;
2+
3+
/*-
4+
* #%L
5+
* cache2k pinpoint
6+
* %%
7+
* Copyright (C) 2000 - 2022 headissue GmbH, Munich
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
/**
24+
* The two actors will be executed concurrently in tests.
25+
*
26+
* @author Jens Wilke
27+
* @param <R> the result of the actor
28+
* @see ActorPairSuite
29+
*/
30+
public interface ActorPairSingleType<R> extends ActorPair<R, R> {
31+
32+
/**
33+
* Setup or reset action executed before the actors.
34+
*/
35+
void setup();
36+
37+
/**
38+
* First actor executed concurrently with actor2.
39+
*
40+
* @return outcome of the actor
41+
*/
42+
R actor1();
43+
44+
/**
45+
* Second actor executed concurrently with actor1.
46+
*
47+
* @return outcome of the actor
48+
*/
49+
R actor2();
50+
51+
/**
52+
* Checks the outcome after both actors have finished. The method is expected
53+
* to throw an exception or assertion error in case the result is unexpected.
54+
*
55+
* @param r1 outcome of {@link #actor1()}
56+
* @param r2 outcome of {@link #actor2()}
57+
*/
58+
void check(R r1, R r2);
59+
60+
}

cache2k-pinpoint/src/main/java/org/cache2k/pinpoint/stress/pairwise/ActorPairSuite.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,16 @@
3939
* Any exception within the actor methods will be propagated.
4040
*
4141
* @author Jens Wilke
42-
* @see ActorPair
42+
* @see ActorPairSingleType
4343
*/
4444
public class ActorPairSuite {
4545

46-
private final Set<ActorPair<?>> pairs = new HashSet<ActorPair<?>>();
47-
private final List<OneShotPairRunner> pairRunners = new ArrayList<OneShotPairRunner>();
48-
private final Map<OneShotPairRunner<?>, OneShotPairRunner<?>> running =
49-
new ConcurrentHashMap<OneShotPairRunner<?>, OneShotPairRunner<?>>();
46+
private final Set<ActorPair<?, ?>> pairs = new HashSet<>();
47+
private final List<OneShotPairRunner> pairRunners = new ArrayList<>();
48+
private final Map<OneShotPairRunner, OneShotPairRunner> running = new ConcurrentHashMap<>();
5049
private final CountDownLatch finishLatch = new CountDownLatch(1);
5150
private final Random random = new Random(1802);
52-
private final List<Throwable> exceptions = new CopyOnWriteArrayList<Throwable>();
51+
private final List<Throwable> exceptions = new CopyOnWriteArrayList<>();
5352
private final LongAdder runCount = new LongAdder();
5453
private long finishTime;
5554

@@ -89,7 +88,7 @@ public ActorPairSuite stopAtFirstException(boolean v) {
8988
/**
9089
* Add an actor pair
9190
*/
92-
public ActorPairSuite addPair(ActorPair<?>... ps) {
91+
public ActorPairSuite addPair(ActorPair<?, ?>... ps) {
9392
pairs.addAll(Arrays.asList(ps));
9493
return this;
9594
}
@@ -115,7 +114,7 @@ public void run() {
115114
finishTime = runMillis == Long.MAX_VALUE ?
116115
Long.MAX_VALUE : System.currentTimeMillis() + runMillis;
117116
for (ActorPair p : pairs) {
118-
pairRunners.add(new OneShotPairRunner<Object>(p));
117+
pairRunners.add(new OneShotPairRunner(p));
119118
}
120119
for (int i = 0; i < maxParallel; i++) {
121120
executor.execute(() -> runLoop());
@@ -133,8 +132,8 @@ private void checkForExceptions() {
133132
}
134133
}
135134

136-
private OneShotPairRunner<?> nextRunner() {
137-
OneShotPairRunner<?> r;
135+
private OneShotPairRunner nextRunner() {
136+
OneShotPairRunner r;
138137
do {
139138
r = pairRunners.get(random.nextInt(pairRunners.size()));
140139
} while (running.putIfAbsent(r, r) != null);
@@ -143,7 +142,7 @@ private OneShotPairRunner<?> nextRunner() {
143142

144143
private void runLoop() {
145144
do {
146-
OneShotPairRunner<?> runner = nextRunner();
145+
OneShotPairRunner runner = nextRunner();
147146
try {
148147
runner.run(executor);
149148
} catch (Throwable t) {

cache2k-pinpoint/src/main/java/org/cache2k/pinpoint/stress/pairwise/OneShotPairRunner.java

Lines changed: 68 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* #L%
2121
*/
2222

23+
import java.lang.reflect.Method;
2324
import java.util.concurrent.CountDownLatch;
2425
import java.util.concurrent.Executor;
2526
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,71 +32,104 @@
3132
*
3233
* @author Jens Wilke
3334
*/
34-
class OneShotPairRunner<R> {
35+
class OneShotPairRunner<R1, R2> {
3536

36-
private final ActorPair<R> actorPair;
37-
private final AtomicReference<R> result1 = new AtomicReference<R>();
38-
private final AtomicReference<R> result2 = new AtomicReference<R>();
39-
private final AtomicReference<Throwable> exception1 = new AtomicReference<Throwable>();
40-
private final AtomicReference<Throwable> exception2 = new AtomicReference<Throwable>();
37+
private final ActorPair<R1, R2> actorPair;
38+
private final AtomicReference<R1> result1 = new AtomicReference<>();
39+
private final AtomicReference<R2> result2 = new AtomicReference<>();
40+
private final AtomicReference<Throwable> exception1 = new AtomicReference<>();
41+
private final AtomicReference<Throwable> exception2 = new AtomicReference<>();
42+
private final AtomicReference<Throwable> observerException = new AtomicReference<>();
4143
private final AtomicInteger finishLatch = new AtomicInteger();
4244

43-
OneShotPairRunner(ActorPair<R> actorPair) {
45+
OneShotPairRunner(ActorPair<R1, R2> actorPair) {
4446
this.actorPair = actorPair;
4547
}
4648

4749
public void run(Executor executor) throws InterruptedException {
4850
actorPair.setup();
4951
exception1.set(null);
5052
exception2.set(null);
51-
finishLatch.set(2);
52-
CountDownLatch startLatch = new CountDownLatch(2);
53-
Runnable r1 = new Runnable() {
54-
@Override
55-
public void run() {
56-
try {
57-
startLatch.countDown();
58-
startLatch.await();
59-
result1.set(actorPair.actor1());
60-
} catch (Throwable t) {
61-
exception1.set(t);
62-
} finally {
63-
finishLatch.decrementAndGet();
64-
}
53+
boolean observerPresent = observerPresent(actorPair.getClass());
54+
finishLatch.set(observerPresent ? 3 : 2);
55+
CountDownLatch startLatch = new CountDownLatch(finishLatch.get());
56+
Runnable r1 = () -> {
57+
try {
58+
startLatch.countDown();
59+
startLatch.await();
60+
result1.set(actorPair.actor1());
61+
} catch (Throwable t) {
62+
exception1.set(t);
63+
} finally {
64+
finishLatch.decrementAndGet();
65+
}
66+
};
67+
Runnable r2 = () -> {
68+
try {
69+
startLatch.countDown();
70+
startLatch.await();
71+
result2.set(actorPair.actor2());
72+
} catch (Throwable t) {
73+
exception2.set(t);
74+
} finally {
75+
finishLatch.decrementAndGet();
6576
}
6677
};
67-
Runnable r2 = new Runnable() {
68-
@Override
69-
public void run() {
78+
if (observerPresent) {
79+
Runnable r3 = () -> {
7080
try {
7181
startLatch.countDown();
7282
startLatch.await();
73-
result2.set(actorPair.actor2());
83+
actorPair.observe();
7484
} catch (Throwable t) {
75-
exception2.set(t);
85+
observerException.set(t);
7686
} finally {
7787
finishLatch.decrementAndGet();
7888
}
79-
}
80-
};
89+
};
90+
executor.execute(r3);
91+
}
92+
if ((this.hashCode() & 1) == 0) {
93+
Runnable tmp = r1; r1 = r2; r2 = tmp;
94+
}
8195
executor.execute(r1);
8296
executor.execute(r2);
8397
while(finishLatch.get() > 0) { }
8498
if (exception1.get() != null) {
85-
throw new ExceptionInActorThread(exception1.get());
99+
throw new ActorException(exception1.get());
86100
}
87101
if (exception2.get() != null) {
88-
throw new ExceptionInActorThread(exception2.get());
102+
throw new ActorException(exception2.get());
103+
}
104+
if (observerException.get() != null) {
105+
throw new ObserverException(observerException.get());
89106
}
90107
actorPair.check(result1.get(), result2.get());
91108
}
92109

93-
static class ExceptionInActorThread extends AssertionError {
110+
static boolean observerPresent(Class<?> clazz) {
111+
try {
112+
Method m = clazz.getMethod("observe");
113+
return ! m.getDeclaringClass().equals(ActorPair.class);
114+
} catch (NoSuchMethodException e) {
115+
return false;
116+
}
117+
}
118+
119+
static class ActorException extends AssertionError {
94120

95-
ExceptionInActorThread(Throwable cause) {
96-
super(cause.toString());
97-
initCause(cause);
121+
ActorException(Throwable cause) {
122+
super(cause);
98123
}
124+
125+
}
126+
127+
static class ObserverException extends AssertionError {
128+
129+
ObserverException(Throwable cause) {
130+
super(cause);
131+
}
132+
99133
}
100134

101135
}

0 commit comments

Comments
 (0)