Skip to content

Commit e461e7d

Browse files
authored
fix(ilp): UnsupportedOperationException when writing to non-WAL table via ILP/TCP (#5742)
1 parent 948947a commit e461e7d

4 files changed

Lines changed: 102 additions & 3 deletions

File tree

core/src/main/java/io/questdb/cutlass/line/tcp/TableUpdateDetails.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public boolean isDropped() {
281281
}
282282

283283
public boolean isWal() {
284-
return writerThreadId == -1;
284+
return tableToken.isWal();
285285
}
286286

287287
public boolean isWriterInError() {

core/src/test/java/io/questdb/test/AbstractCairoTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1207,11 +1207,11 @@ protected static void assertMemoryLeak(@Nullable FilesFacade ff, TestUtils.LeakP
12071207
try {
12081208
code.run();
12091209
forEachNode(node -> releaseInactive(node.getEngine()));
1210-
CLOSEABLES.forEach(Misc::free);
12111210
} catch (Throwable th) {
12121211
LOG.error().$("Error in test: ").$(th).$();
12131212
throw th;
12141213
} finally {
1214+
CLOSEABLES.forEach(Misc::free);
12151215
forEachNode(node -> node.getEngine().clear());
12161216
AbstractCairoTest.ff = ffBefore;
12171217
}

core/src/test/java/io/questdb/test/cairo/fuzz/AbstractFuzzTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ protected void runFuzz(Rnd rnd) throws Exception {
194194
WorkerPoolUtils.setupWriterJobs(sharedWorkerPool, engine);
195195
sharedWorkerPool.start(LOG);
196196

197-
int size = rnd.nextInt(16 * 1024 * 1024);
197+
int size = rnd.nextInt(8 * 1024 * 1024);
198198
node1.setProperty(PropertyKey.DEBUG_CAIRO_O3_COLUMN_MEMORY_SIZE, size);
199199
setZeroWalPurgeInterval();
200200
fuzzer.runFuzz(getTestName(), rnd);

core/src/test/java/io/questdb/test/cutlass/line/tcp/LineTcpReceiverTest.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.questdb.cairo.ColumnType;
2929
import io.questdb.cairo.PartitionBy;
3030
import io.questdb.cairo.TableReader;
31+
import io.questdb.cairo.TableToken;
3132
import io.questdb.cairo.TableWriter;
3233
import io.questdb.cairo.TableWriterAPI;
3334
import io.questdb.cairo.pool.PoolListener;
@@ -260,6 +261,104 @@ public void testColumnTypeStaysTheSameWhileColumnAdded() throws Exception {
260261
}, false, 250);
261262
}
262263

264+
@Test
265+
public void testConcurrentWriteAndTruncate() throws Exception {
266+
Assume.assumeTrue(walEnabled);
267+
runInContext((receiver) -> {
268+
String tableName = "concurrent_test";
269+
final int iterations = 500;
270+
final SOCountDownLatch startLatch = new SOCountDownLatch(2);
271+
final SOCountDownLatch finishLatch = new SOCountDownLatch(2);
272+
final AtomicInteger errorCount = new AtomicInteger(0);
273+
String initialData = tableName + ",location=init_location,symbol=test temperature=20.0 1465839830100000000\n";
274+
send(initialData, tableName);
275+
mayDrainWalQueue();
276+
277+
Thread ilpWriteThread = new Thread(() -> {
278+
try {
279+
startLatch.countDown();
280+
startLatch.await();
281+
282+
try (Socket socket = getSocket()) {
283+
for (int i = 0; i < iterations; i++) {
284+
try {
285+
// Use duplicate timestamp and symbols to trigger metadata operations
286+
String lineData = tableName + ",location=test_location,symbol=sym" + (i % 3) +
287+
" temperature=" + (20.0 + i) + " " +
288+
(1465839830102000000L + (i % 5)) + "\n";
289+
sendToSocket(socket, lineData);
290+
291+
if (i % 50 == 0) {
292+
mayDrainWalQueue();
293+
}
294+
295+
if (i % 20 == 0) {
296+
Os.sleep(1);
297+
}
298+
} catch (Throwable e) {
299+
errorCount.incrementAndGet();
300+
}
301+
}
302+
}
303+
} catch (Throwable e) {
304+
errorCount.incrementAndGet();
305+
} finally {
306+
Path.clearThreadLocals();
307+
finishLatch.countDown();
308+
}
309+
});
310+
311+
Thread truncateThread = new Thread(() -> {
312+
try {
313+
startLatch.countDown();
314+
startLatch.await();
315+
boolean drop = false;
316+
317+
for (int i = 0; i < iterations; i++) {
318+
try {
319+
// Alternate between truncate and drop operations
320+
if (i % 15 == 0) {
321+
TableToken tt = engine.getTableTokenIfExists(tableName);
322+
if (tt != null) {
323+
try (TableWriterAPI writer = getTableWriterAPI(tableName)) {
324+
writer.truncateSoft();
325+
}
326+
mayDrainWalQueue();
327+
drop = true;
328+
}
329+
} else if (i % 25 == 0 && drop) {
330+
TableToken tt = engine.getTableTokenIfExists(tableName);
331+
if (tt != null) {
332+
engine.dropTableOrMatView(path, tt);
333+
drop = false;
334+
}
335+
}
336+
Os.sleep(2);
337+
} catch (Throwable e) {
338+
errorCount.incrementAndGet();
339+
}
340+
}
341+
} catch (Throwable e) {
342+
errorCount.incrementAndGet();
343+
} finally {
344+
Path.clearThreadLocals();
345+
finishLatch.countDown();
346+
}
347+
});
348+
349+
ilpWriteThread.start();
350+
truncateThread.start();
351+
352+
try {
353+
finishLatch.await();
354+
Assert.assertEquals(0, errorCount.get());
355+
} finally {
356+
ilpWriteThread.interrupt();
357+
truncateThread.interrupt();
358+
}
359+
});
360+
}
361+
263362
@Test
264363
public void testCreationAttemptNonPartitionedTableWithWal() throws Exception {
265364
Assume.assumeTrue(walEnabled);

0 commit comments

Comments
 (0)