Skip to content

Commit 7f43486

Browse files
authored
Merge branch 'master' into vi_pg_arr_fix
2 parents c8b69ce + 7eb1a51 commit 7f43486

File tree

53 files changed

+21128
-876
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+21128
-876
lines changed

core/src/main/java/io/questdb/PropServerConfiguration.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -882,17 +882,17 @@ public PropServerConfiguration(
882882
this.devModeEnabled = getBoolean(properties, env, PropertyKey.DEV_MODE_ENABLED, false);
883883

884884
int cpuAvailable = Runtime.getRuntime().availableProcessors();
885-
int cpuMvRefreshWorkers = 2;
885+
int cpuWalApplyWorkers = 2;
886886
int cpuSpare = 0;
887887

888888
if (cpuAvailable > 32) {
889-
cpuMvRefreshWorkers = 4;
889+
cpuWalApplyWorkers = 4;
890890
cpuSpare = 2;
891891
} else if (cpuAvailable > 16) {
892-
cpuMvRefreshWorkers = 3;
892+
cpuWalApplyWorkers = 3;
893893
cpuSpare = 1;
894894
} else if (cpuAvailable > 8) {
895-
cpuMvRefreshWorkers = 3;
895+
cpuWalApplyWorkers = 3;
896896
}
897897

898898
final FilesFacade ff = cairoConfiguration.getFilesFacade();
@@ -1289,7 +1289,8 @@ public PropServerConfiguration(
12891289
this.pgPipelineCapacity = getInt(properties, env, PropertyKey.PG_PIPELINE_CAPACITY, 64);
12901290
}
12911291

1292-
this.walApplyWorkerCount = getInt(properties, env, PropertyKey.WAL_APPLY_WORKER_COUNT, 0); // Use shared write pool by default;
1292+
// Do not use shared write pool by default for wal-apply
1293+
this.walApplyWorkerCount = getInt(properties, env, PropertyKey.WAL_APPLY_WORKER_COUNT, cpuWalApplyWorkers);
12931294
this.walApplyWorkerAffinity = getAffinity(properties, env, PropertyKey.WAL_APPLY_WORKER_AFFINITY, walApplyWorkerCount);
12941295
this.walApplyWorkerHaltOnError = getBoolean(properties, env, PropertyKey.WAL_APPLY_WORKER_HALT_ON_ERROR, false);
12951296
this.walApplyWorkerNapThreshold = getLong(properties, env, PropertyKey.WAL_APPLY_WORKER_NAP_THRESHOLD, 7_000);
@@ -1301,7 +1302,8 @@ public PropServerConfiguration(
13011302
this.matViewEnabled = getBoolean(properties, env, PropertyKey.CAIRO_MAT_VIEW_ENABLED, true);
13021303
this.matViewMaxRefreshRetries = getInt(properties, env, PropertyKey.CAIRO_MAT_VIEW_MAX_REFRESH_RETRIES, 10);
13031304
this.matViewRefreshOomRetryTimeout = getMillis(properties, env, PropertyKey.CAIRO_MAT_VIEW_REFRESH_OOM_RETRY_TIMEOUT, 200);
1304-
this.matViewRefreshWorkerCount = getInt(properties, env, PropertyKey.MAT_VIEW_REFRESH_WORKER_COUNT, cpuMvRefreshWorkers);
1305+
// Do not use shared write pool by default for mat-view-refresh, use same worker count as wal-apply
1306+
this.matViewRefreshWorkerCount = getInt(properties, env, PropertyKey.MAT_VIEW_REFRESH_WORKER_COUNT, cpuWalApplyWorkers);
13051307
this.matViewRefreshWorkerAffinity = getAffinity(properties, env, PropertyKey.MAT_VIEW_REFRESH_WORKER_AFFINITY, matViewRefreshWorkerCount);
13061308
this.matViewRefreshWorkerHaltOnError = getBoolean(properties, env, PropertyKey.MAT_VIEW_REFRESH_WORKER_HALT_ON_ERROR, false);
13071309
this.matViewRefreshWorkerNapThreshold = getLong(properties, env, PropertyKey.MAT_VIEW_REFRESH_WORKER_NAP_THRESHOLD, 7_000);

core/src/main/java/io/questdb/cairo/CairoException.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public class CairoException extends RuntimeException implements Sinkable, Flywei
4242
public static final int ERRNO_ACCESS_DENIED_WIN = 5;
4343
public static final int ERRNO_FILE_DOES_NOT_EXIST = 2;
4444
public static final int ERRNO_FILE_DOES_NOT_EXIST_WIN = 3;
45+
// psync_cvcontinue sets two bits in the error code to indicate whether the wait timed out (0x100) or there were no waiters (0x200).
46+
// Error #316 (0x13C) is the timed out bit bitwise OR'd with ETIMEDOUT (60).
47+
public static final int ERRNO_FILE_READ_TIMEOUT_MACOS = 316;
4548
public static final int ERRNO_INVALID_PARAMETER = 22;
4649
public static final int ERRNO_INVALID_PARAMETER_WIN = 87;
4750
public static final int METADATA_VALIDATION = -100;

core/src/main/java/io/questdb/cairo/TableReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ public void updateTableToken(TableToken tableToken) {
633633
}
634634

635635
private static int getColumnBits(int columnCount) {
636-
return Numbers.msb(Numbers.ceilPow2(columnCount) * 2);
636+
return Math.max(Numbers.msb(Numbers.ceilPow2(columnCount) * 2), 0);
637637
}
638638

639639
private static boolean growColumn(MemoryCMRDetachedImpl mem1, MemoryCMRDetachedImpl mem2, int columnType, long rowCount) {

core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5381,7 +5381,7 @@ private RecordCursorFactory generateSelectWindow(
53815381
ac.getRowsHiKindPos(),
53825382
ac.getExclusionKind(),
53835383
ac.getExclusionKindPos(),
5384-
chainMetadata.getTimestampIndex(),
5384+
baseMetadata.getTimestampIndex(),
53855385
ac.isIgnoreNulls(),
53865386
ac.getNullsDescPos()
53875387
);

core/src/main/java/io/questdb/griffin/engine/functions/window/AbstractWindowFunctionFactory.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,16 @@ public void pass1(Record record, long recordOffset, WindowSPI spi) {
175175
}
176176

177177
static class LongNullFunction extends BaseNullFunction implements WindowLongFunction {
178-
private final Long zeroValue;
178+
private final long zeroValue;
179179

180+
/**
181+
* Constructs a LongNullFunction that produces a constant long value for every row in the window.
182+
* <p>
183+
* The constructor initializes the null-function frame (name, bounds, range flag and partition spec)
184+
* and sets the constant value written/read by this function.
185+
*
186+
* @param zeroValue the constant long value returned by getLong and written into window memory during pass1
187+
*/
180188
LongNullFunction(Function arg, String name, long rowLo, long rowHi, boolean isRange, VirtualRecord partitionByRecord, long zeroValue) {
181189
super(arg, name, rowLo, rowHi, isRange, partitionByRecord);
182190
this.zeroValue = zeroValue;
@@ -187,6 +195,12 @@ public long getLong(Record rec) {
187195
return zeroValue;
188196
}
189197

198+
/**
199+
* Writes the configured zero timestamp value into the window buffer for the given record offset.
200+
*
201+
* @param record the source record (unused; kept for interface compatibility)
202+
* @param recordOffset byte offset of the record within window memory
203+
*/
190204
@Override
191205
public void pass1(Record record, long recordOffset, WindowSPI spi) {
192206
Unsafe.getUnsafe().putLong(spi.getAddress(recordOffset, columnIndex), zeroValue);
@@ -208,4 +222,48 @@ void reset(long capacity, long startOffset, long size, long firstIdx, LongList f
208222
this.freeList = freeList;
209223
}
210224
}
225+
226+
static class TimestampNullFunction extends BaseNullFunction implements WindowTimestampFunction {
227+
private final long zeroValue;
228+
229+
/**
230+
* Create a TimestampNullFunction that supplies a constant timestamp for null window entries.
231+
*
232+
* @param arg the wrapped argument function
233+
* @param name function name used in plans and diagnostics
234+
* @param rowLo lower window bound (rows or range units)
235+
* @param rowHi upper window bound (rows or range units)
236+
* @param isRange true if the window frame is RANGE, false if ROWS
237+
* @param partitionByRecord optional partitioning record (may be null)
238+
* @param zeroValue timestamp value returned/written for null results
239+
*/
240+
TimestampNullFunction(Function arg, String name, long rowLo, long rowHi, boolean isRange, VirtualRecord partitionByRecord, long zeroValue) {
241+
super(arg, name, rowLo, rowHi, isRange, partitionByRecord);
242+
this.zeroValue = zeroValue;
243+
}
244+
245+
/**
246+
* Returns the configured constant timestamp used to represent null/window-default values.
247+
*
248+
* <p>The provided record is ignored; the method always returns the stored `zeroValue`.</p>
249+
*
250+
* @param rec unused record parameter provided by the WindowTimestampFunction interface
251+
* @return the constant timestamp value used for nulls
252+
*/
253+
@Override
254+
public long getTimestamp(Record rec) {
255+
return zeroValue;
256+
}
257+
258+
/**
259+
* Writes the configured zero timestamp value into the window buffer for the given record offset.
260+
*
261+
* @param record the source record (unused; kept for interface compatibility)
262+
* @param recordOffset byte offset of the record within window memory
263+
*/
264+
@Override
265+
public void pass1(Record record, long recordOffset, WindowSPI spi) {
266+
Unsafe.getUnsafe().putLong(spi.getAddress(recordOffset, columnIndex), zeroValue);
267+
}
268+
}
211269
}

core/src/main/java/io/questdb/griffin/engine/functions/window/AvgDoubleWindowFunctionFactory.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ else if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
184184
args.get(0)
185185
);
186186
} // between current row and current row
187-
else if (rowsLo == 0 && rowsLo == rowsHi) {
187+
else if (rowsLo == 0 && rowsHi == 0) {
188188
return new AvgOverCurrentRowFunction(args.get(0));
189189
} // whole partition
190190
else if (rowsLo == Long.MIN_VALUE && rowsHi == Long.MAX_VALUE) {
@@ -271,7 +271,7 @@ else if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
271271
if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
272272
return new AvgOverUnboundedRowsFrameFunction(args.get(0));
273273
} // between current row and current row
274-
else if (rowsLo == 0 && rowsLo == rowsHi) {
274+
else if (rowsLo == 0 && rowsHi == 0) {
275275
return new AvgOverCurrentRowFunction(args.get(0));
276276
} // whole result set
277277
else if (rowsLo == Long.MIN_VALUE && rowsHi == Long.MAX_VALUE) {
@@ -402,7 +402,7 @@ public void preparePass2() {
402402
}
403403
}
404404

405-
// Handles avg() over (partition by x order by ts range between [undobuned | y] preceding and [z preceding | current row])
405+
// Handles avg() over (partition by x order by ts range between [unbounded | y] preceding and [z preceding | current row])
406406
// Removable cumulative aggregation with timestamp & value stored in resizable ring buffers
407407
// When lower bound is unbounded we add but immediately discard any values that enter the frame so buffer should only contain values
408408
// between upper bound and current row's value.
@@ -627,7 +627,8 @@ public int getPassCount() {
627627

628628
@Override
629629
public void pass1(Record record, long recordOffset, WindowSPI spi) {
630-
throw new UnsupportedOperationException();
630+
computeNext(record);
631+
Unsafe.getUnsafe().putDouble(spi.getAddress(recordOffset, columnIndex), avg);
631632
}
632633

633634
@Override
@@ -1053,7 +1054,8 @@ public int getPassCount() {
10531054

10541055
@Override
10551056
public void pass1(Record record, long recordOffset, WindowSPI spi) {
1056-
throw new UnsupportedOperationException();
1057+
computeNext(record);
1058+
Unsafe.getUnsafe().putDouble(spi.getAddress(recordOffset, columnIndex), avg);
10571059
}
10581060

10591061
@Override
@@ -1383,7 +1385,6 @@ public int getPassCount() {
13831385
@Override
13841386
public void pass1(Record record, long recordOffset, WindowSPI spi) {
13851387
computeNext(record);
1386-
13871388
Unsafe.getUnsafe().putDouble(spi.getAddress(recordOffset, columnIndex), avg);
13881389
}
13891390

core/src/main/java/io/questdb/griffin/engine/functions/window/CountConstWindowFunctionFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ else if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
167167
isRecordNotNull
168168
);
169169
} // between current row and current row
170-
else if (rowsLo == 0 && rowsLo == rowsHi) {
170+
else if (rowsLo == 0 && rowsHi == 0) {
171171
return new CountFunctionFactoryHelper.CountOverCurrentRowFunction(null, isRecordNotNull);
172172
} // whole partition
173173
else if (rowsLo == Long.MIN_VALUE && rowsHi == Long.MAX_VALUE) {
@@ -240,7 +240,7 @@ else if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
240240
if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
241241
return new CountFunctionFactoryHelper.CountOverUnboundedRowsFrameFunction(null, isRecordNotNull);
242242
} // between current row and current row
243-
else if (rowsLo == 0 && rowsLo == rowsHi) {
243+
else if (rowsLo == 0 && rowsHi == 0) {
244244
return new CountFunctionFactoryHelper.CountOverCurrentRowFunction(null, isRecordNotNull);
245245
} // whole result set
246246
else if (rowsLo == Long.MIN_VALUE && rowsHi == Long.MAX_VALUE) {

core/src/main/java/io/questdb/griffin/engine/functions/window/CountFunctionFactoryHelper.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ else if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
173173
isRecordNotNull
174174
);
175175
} // between current row and current row
176-
else if (rowsLo == 0 && rowsLo == rowsHi) {
176+
else if (rowsLo == 0 && rowsHi == 0) {
177177
return new CountOverCurrentRowFunction(args.get(0), isRecordNotNull);
178178
} // whole partition
179179
else if (rowsLo == Long.MIN_VALUE && rowsHi == Long.MAX_VALUE) {
@@ -254,7 +254,7 @@ else if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
254254
if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
255255
return new CountOverUnboundedRowsFrameFunction(args.get(0), isRecordNotNull);
256256
} // between current row and current row
257-
else if (rowsLo == 0 && rowsLo == rowsHi) {
257+
else if (rowsLo == 0 && rowsHi == 0) {
258258
return new CountOverCurrentRowFunction(args.get(0), isRecordNotNull);
259259
} // whole result set
260260
else if (rowsLo == Long.MIN_VALUE && rowsHi == Long.MAX_VALUE) {
@@ -290,7 +290,6 @@ static class CountOverCurrentRowFunction extends BaseWindowFunction implements W
290290
private static final long VALUE_ONE = 1L;
291291
private static final long VALUE_ZERO = 0L;
292292
private final IsRecordNotNull isNotNullFunc;
293-
private long value;
294293

295294
CountOverCurrentRowFunction(Function arg, IsRecordNotNull isNotNullFunc) {
296295
super(arg);
@@ -299,8 +298,7 @@ static class CountOverCurrentRowFunction extends BaseWindowFunction implements W
299298

300299
@Override
301300
public long getLong(Record rec) {
302-
value = isNotNullFunc.isNotNull(arg, rec) ? VALUE_ONE : VALUE_ZERO;
303-
return value;
301+
return isNotNullFunc.isNotNull(arg, rec) ? VALUE_ONE : VALUE_ZERO;
304302
}
305303

306304
@Override
@@ -313,10 +311,9 @@ public int getPassCount() {
313311
return ZERO_PASS;
314312
}
315313

316-
317314
@Override
318315
public void pass1(Record record, long recordOffset, WindowSPI spi) {
319-
Unsafe.getUnsafe().putLong(spi.getAddress(recordOffset, columnIndex), value);
316+
Unsafe.getUnsafe().putLong(spi.getAddress(recordOffset, columnIndex), isNotNullFunc.isNotNull(arg, record) ? VALUE_ONE : VALUE_ZERO);
320317
}
321318
}
322319

@@ -371,7 +368,7 @@ public void pass2(Record record, long recordOffset, WindowSPI spi) {
371368
}
372369
}
373370

374-
// Handles count(arg) over (partition by x order by ts range between [undobuned | y] preceding and [z preceding | current row])
371+
// Handles count(arg) over (partition by x order by ts range between [unbounded | y] preceding and [z preceding | current row])
375372
public static class CountOverPartitionRangeFrameFunction extends BasePartitionedWindowFunction implements WindowLongFunction {
376373

377374
private static final int RECORD_SIZE = Long.BYTES;
@@ -554,7 +551,8 @@ public int getPassCount() {
554551

555552
@Override
556553
public void pass1(Record record, long recordOffset, WindowSPI spi) {
557-
throw new UnsupportedOperationException();
554+
computeNext(record);
555+
Unsafe.getUnsafe().putLong(spi.getAddress(recordOffset, columnIndex), count);
558556
}
559557

560558
@Override
@@ -919,7 +917,8 @@ public int getPassCount() {
919917

920918
@Override
921919
public void pass1(Record record, long recordOffset, WindowSPI spi) {
922-
throw new UnsupportedOperationException();
920+
computeNext(record);
921+
Unsafe.getUnsafe().putLong(spi.getAddress(recordOffset, columnIndex), count);
923922
}
924923

925924
@Override

core/src/main/java/io/questdb/griffin/engine/functions/window/FirstValueDoubleWindowFunctionFactory.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ else if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
191191
args.get(0)
192192
);
193193
} // between current row and current row
194-
else if (rowsLo == 0 && rowsLo == rowsHi) {
194+
else if (rowsLo == 0 && rowsHi == 0) {
195195
return new FirstValueOverCurrentRowFunction(args.get(0), true);
196196
} // whole partition
197197
else if (rowsLo == Long.MIN_VALUE && rowsHi == Long.MAX_VALUE) {
@@ -272,7 +272,7 @@ else if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
272272
if (rowsLo == Long.MIN_VALUE && (rowsHi == 0 || rowsHi == Long.MAX_VALUE)) {
273273
return new FirstNotNullValueOverWholeResultSetFunction(args.get(0));
274274
} // between current row and current row
275-
else if (rowsLo == 0 && rowsLo == rowsHi) {
275+
else if (rowsLo == 0 && rowsHi == 0) {
276276
return new FirstValueDoubleWindowFunctionFactory.FirstValueOverCurrentRowFunction(args.get(0), true);
277277
} // between [unbounded | x] preceding and [y preceding | current row]
278278
else {
@@ -396,7 +396,7 @@ else if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
396396
args.get(0)
397397
);
398398
} // between current row and current row
399-
else if (rowsLo == 0 && rowsLo == rowsHi) {
399+
else if (rowsLo == 0 && rowsHi == 0) {
400400
return new FirstValueOverCurrentRowFunction(args.get(0), false);
401401
} // whole partition
402402
else if (rowsLo == Long.MIN_VALUE && rowsHi == Long.MAX_VALUE) {
@@ -476,7 +476,7 @@ else if (rowsLo == Long.MIN_VALUE && rowsHi == 0) {
476476
if (rowsLo == Long.MIN_VALUE && (rowsHi == 0 || rowsHi == Long.MAX_VALUE)) {
477477
return new FirstValueOverWholeResultSetFunction(args.get(0));
478478
} // between current row and current row
479-
else if (rowsLo == 0 && rowsLo == rowsHi) {
479+
else if (rowsLo == 0 && rowsHi == 0) {
480480
return new FirstValueOverCurrentRowFunction(args.get(0), false);
481481
} // between [unbounded | x] preceding and [y preceding | current row]
482482
else {
@@ -1334,7 +1334,8 @@ public int getPassCount() {
13341334

13351335
@Override
13361336
public void pass1(Record record, long recordOffset, WindowSPI spi) {
1337-
throw new UnsupportedOperationException();
1337+
computeNext(record);
1338+
Unsafe.getUnsafe().putDouble(spi.getAddress(recordOffset, columnIndex), firstValue);
13381339
}
13391340

13401341
@Override
@@ -1691,7 +1692,8 @@ public int getPassCount() {
16911692

16921693
@Override
16931694
public void pass1(Record record, long recordOffset, WindowSPI spi) {
1694-
throw new UnsupportedOperationException();
1695+
computeNext(record);
1696+
Unsafe.getUnsafe().putDouble(spi.getAddress(recordOffset, columnIndex), firstValue);
16951697
}
16961698

16971699
@Override

0 commit comments

Comments
 (0)