|
47 | 47 | import java.util.concurrent.atomic.AtomicInteger; |
48 | 48 | import java.util.concurrent.atomic.AtomicReference; |
49 | 49 |
|
50 | | -public class BitmapIndexConcurrentTest extends AbstractCairoTest { |
| 50 | +public class BitmapIndexConcurrentFuzzTest extends AbstractCairoTest { |
51 | 51 | private static final int MAX_ID = 100; |
52 | 52 |
|
53 | 53 | @Test |
@@ -217,45 +217,47 @@ private void testConcurrentOperations(Rnd masterRnd) throws Exception { |
217 | 217 | startBarrier.await(); // Wait for all threads to be ready |
218 | 218 |
|
219 | 219 | Rnd rnd = new Rnd(seed0, seed1); // Unique deterministic seeds per thread |
220 | | - while (!stopFlag.get() && errorCount.get() == 0) { |
221 | | - try { |
222 | | - String randomSymbol = "SYM" + (rnd.nextInt(100) + 1); |
223 | | - String querySql = String.format( |
224 | | - "SELECT symbol, count(*) FROM trades WHERE symbol = '%s' GROUP BY symbol", |
225 | | - randomSymbol |
226 | | - ); |
227 | | - |
228 | | - try (RecordCursorFactory factory = select(querySql)) { |
229 | | - try (RecordCursor cursor = factory.getCursor(sqlExecutionContext)) { |
230 | | - int rowCount = 0; |
231 | | - CharSequence foundSymbol = null; |
232 | | - |
233 | | - while (cursor.hasNext()) { |
234 | | - rowCount++; |
235 | | - if (rowCount > 1) { |
| 220 | + try (var threadLocalContext = TestUtils.createSqlExecutionCtx(engine)) { |
| 221 | + while (!stopFlag.get() && errorCount.get() == 0) { |
| 222 | + try { |
| 223 | + String randomSymbol = "SYM" + (rnd.nextInt(100) + 1); |
| 224 | + String querySql = String.format( |
| 225 | + "SELECT symbol, count(*) FROM trades WHERE symbol = '%s' GROUP BY symbol", |
| 226 | + randomSymbol |
| 227 | + ); |
| 228 | + |
| 229 | + try (RecordCursorFactory factory = select(querySql, threadLocalContext)) { |
| 230 | + try (RecordCursor cursor = factory.getCursor(threadLocalContext)) { |
| 231 | + int rowCount = 0; |
| 232 | + CharSequence foundSymbol = null; |
| 233 | + |
| 234 | + while (cursor.hasNext()) { |
| 235 | + rowCount++; |
| 236 | + if (rowCount > 1) { |
| 237 | + errorCount.incrementAndGet(); |
| 238 | + firstError.compareAndSet(null, new AssertionError("Reader " + threadId + ": Multiple rows for symbol " + randomSymbol)); |
| 239 | + return; |
| 240 | + } |
| 241 | + foundSymbol = cursor.getRecord().getSymA(0); |
| 242 | + } |
| 243 | + |
| 244 | + if (rowCount == 1 && !Chars.equals(foundSymbol, randomSymbol)) { |
236 | 245 | errorCount.incrementAndGet(); |
237 | | - firstError.compareAndSet(null, new AssertionError("Reader " + threadId + ": Multiple rows for symbol " + randomSymbol)); |
| 246 | + firstError.compareAndSet(null, new AssertionError("Reader " + threadId + ": Expected " + randomSymbol + " but got " + foundSymbol)); |
238 | 247 | return; |
239 | 248 | } |
240 | | - foundSymbol = cursor.getRecord().getSymA(0); |
241 | | - } |
242 | 249 |
|
243 | | - if (rowCount == 1 && !Chars.equals(foundSymbol, randomSymbol)) { |
244 | | - errorCount.incrementAndGet(); |
245 | | - firstError.compareAndSet(null, new AssertionError("Reader " + threadId + ": Expected " + randomSymbol + " but got " + foundSymbol)); |
246 | | - return; |
| 250 | + // It's OK if rowCount is 0 or 1 (symbol might not exist yet or might exist) |
| 251 | + totalQueries.incrementAndGet(); |
247 | 252 | } |
248 | | - |
249 | | - // It's OK if rowCount is 0 or 1 (symbol might not exist yet or might exist) |
250 | | - totalQueries.incrementAndGet(); |
251 | 253 | } |
252 | | - } |
253 | 254 |
|
254 | | - Os.sleep(rnd.nextInt(5) + 1); |
255 | | - } catch (Exception e) { |
256 | | - errorCount.incrementAndGet(); |
257 | | - firstError.compareAndSet(null, e); |
258 | | - e.printStackTrace(); |
| 255 | + Os.sleep(rnd.nextInt(5) + 1); |
| 256 | + } catch (Exception e) { |
| 257 | + errorCount.incrementAndGet(); |
| 258 | + firstError.compareAndSet(null, e); |
| 259 | + e.printStackTrace(); |
| 260 | + } |
259 | 261 | } |
260 | 262 | } |
261 | 263 | } catch (Exception e) { |
|
0 commit comments