Skip to content

Commit 916b4f0

Browse files
committed
Fix ConcurrentModificationException when flushing async log messages
A missing synchronized statement when adding to pendingWrites cause ConcurrentModificationExceptions in the flush() method when the pendingWrites are copied to writesToFlush.
1 parent 2a92207 commit 916b4f0

2 files changed

Lines changed: 43 additions & 1 deletion

File tree

google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,9 @@ public void onFailure(Throwable t) {
580580
}
581581
}
582582
});
583-
pendingWrites.add(writeFuture);
583+
synchronized (writeLock) {
584+
pendingWrites.add(writeFuture);
585+
}
584586
break;
585587
}
586588
}

google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingImplTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import java.util.List;
7171
import java.util.Map;
7272
import java.util.concurrent.ExecutionException;
73+
import java.util.concurrent.atomic.AtomicInteger;
7374
import org.easymock.EasyMock;
7475
import org.junit.After;
7576
import org.junit.Before;
@@ -1448,5 +1449,44 @@ public void run() {
14481449
flushWaiter.join(1000);
14491450
assertFalse(flushWaiter.isAlive());
14501451
}
1452+
1453+
@Test
1454+
public void testFlushStress() throws InterruptedException {
1455+
SettableApiFuture<WriteLogEntriesResponse> mockRpcResponse = SettableApiFuture.create();
1456+
mockRpcResponse.set(null);
1457+
replay(rpcFactoryMock);
1458+
logging = options.getService();
1459+
WriteLogEntriesRequest request = WriteLogEntriesRequest.newBuilder()
1460+
.addAllEntries(Iterables.transform(ImmutableList.of(LOG_ENTRY1),
1461+
LogEntry.toPbFunction(PROJECT)))
1462+
.build();
1463+
1464+
Thread[] threads = new Thread[100];
1465+
EasyMock.expect(loggingRpcMock.write(request)).andReturn(mockRpcResponse).times(threads.length);
1466+
EasyMock.replay(loggingRpcMock);
1467+
1468+
// log and flush concurrently in many threads to trigger a ConcurrentModificationException
1469+
final AtomicInteger exceptions = new AtomicInteger(0);
1470+
for (int i = 0; i < threads.length; i++) {
1471+
threads[i] = new Thread() {
1472+
@Override
1473+
public void run() {
1474+
try {
1475+
logging.write(ImmutableList.of(LOG_ENTRY1));
1476+
logging.flush();
1477+
} catch (Exception ex) {
1478+
ex.printStackTrace();
1479+
exceptions.incrementAndGet();
1480+
}
1481+
}
1482+
};
1483+
threads[i].start();
1484+
}
1485+
for (int i = 0; i < threads.length; i++) {
1486+
threads[i].join();
1487+
}
1488+
assertSame(0, exceptions.get());
1489+
}
1490+
14511491
}
14521492

0 commit comments

Comments
 (0)