Skip to content

feat(sql): horizon join for markout analysis#6635

Merged
bluestreak01 merged 162 commits intomasterfrom
mt_parallel-markout
Feb 23, 2026
Merged

feat(sql): horizon join for markout analysis#6635
bluestreak01 merged 162 commits intomasterfrom
mt_parallel-markout

Conversation

@puzpuzpuz
Copy link
Copy Markdown
Contributor

@puzpuzpuz puzpuzpuz commented Jan 13, 2026

Fixes #6770 by introducing per-worker sinks

HORIZON JOIN is a specialized time-series join in QuestDB designed for markout analysis — a common financial analytics pattern where you need to analyze how prices or metrics evolve at specific time offsets relative to events (e.g., trades, orders).

Syntax

SELECT [<keys>,] <aggregations>
FROM <left_table> AS <left_alias>
HORIZON JOIN <right_table> AS <right_alias> [ON (<join_keys>)]
RANGE FROM <from_expr> TO <to_expr> STEP <step_expr> AS <horizon_alias>
[GROUP BY <keys>]
[ORDER BY ...]

or, with explicit offset list:

SELECT [<keys>,] <aggregations>
FROM <left_table> AS <left_alias>
HORIZON JOIN <right_table> AS <right_alias> [ON (<join_keys>)]
LIST (<offset_expr>, ...) AS <horizon_alias>
[GROUP BY <keys>]
[ORDER BY ...]

How it works

For each row in the left-hand table and each offset in the horizon, the join computes left_timestamp + offset and performs an ASOF match against the right-hand table. When join keys are provided (via ON), only right-hand rows matching the keys are considered. Results are implicitly grouped by the non-aggregate SELECT columns (horizon offset, left-hand table keys, etc.), and aggregate functions are applied across all matched rows.

The horizon pseudo-table (RANGE / LIST)

The RANGE or LIST clause defines a virtual table of time offsets, aliased by the AS clause. This pseudo-table exposes two columns:

Column Type Description
<alias>.offset LONG The offset value in left-hand side table's designated timestamp resolution. Can be used in expressions, e.g. h.offset / 1000000 to get seconds.
<alias>.timestamp TIMESTAMP The computed horizon timestamp (left_timestamp + offset). Available for grouping or expressions.

RANGE generates offsets from FROM to TO (inclusive) with the given STEP. For example, RANGE FROM 0s TO 5m STEP 1m generates offsets at 0s, 1m, 2m, 3m, 4m, 5m.

LIST specifies explicit offsets as interval literals using the same SAMPLE BY-like expression syntax as RANGE. Unitless 0 is allowed as a shorthand for zero offset. Offsets must be monotonically increasing. For example, LIST (0, 1s, 5s, 1m) generates offsets at 0s, 1s, 5s, and 1m.

Both RANGE and LIST use the same syntax as SAMPLE BY: . Supported units: U (microseconds), T (milliseconds), s (seconds), m (minutes), h (hours), d (days).

Query examples

Given sample tables:

-- Executed trades
CREATE TABLE trades (ts TIMESTAMP, sym SYMBOL, side SYMBOL, price DOUBLE, qty DOUBLE) TIMESTAMP(ts);
-- Market mid-prices
CREATE TABLE mid_prices (ts TIMESTAMP, sym SYMBOL, mid DOUBLE) TIMESTAMP(ts);

Post-trade markout at uniform horizons

Measure average mid-price at 1s, 5s, 30s, and 60s after each trade, per symbol — a classic way to evaluate execution quality and price impact:

SELECT h.offset / 1000000 AS horizon_sec, t.sym, avg(m.mid) AS avg_mid
FROM trades AS t
HORIZON JOIN mid_prices AS m ON (t.sym = m.sym)
RANGE FROM 1s TO 60s STEP 1s AS h
ORDER BY t.sym, horizon_sec

Markout P&L at non-uniform horizons

Compute the average post-trade markout (future mid minus trade price) at specific horizons using LIST:

SELECT h.offset / 1000000 AS horizon_sec, t.sym,
       avg(m.mid - t.price) AS avg_markout
FROM trades AS t
HORIZON JOIN mid_prices AS m ON (t.sym = m.sym)
LIST (1s, 5s, 30s, 1m) AS h
ORDER BY t.sym, horizon_sec

Pre- and post-trade price movement

Use negative offsets to see price levels before and after trades, useful for detecting information leakage or adverse selection:

SELECT h.offset / 1000000 AS horizon_sec, t.sym,
       avg(m.mid) AS avg_mid, count(*) AS sample_size
FROM trades AS t
HORIZON JOIN mid_prices AS m ON (t.sym = m.sym)
RANGE FROM -5s TO 5s STEP 1s AS h
ORDER BY t.sym, horizon_sec

Markout by trade side

Break down markouts by buy/sell side to analyze execution asymmetry:

SELECT h.offset / 1000000 AS horizon_sec, t.sym, t.side,
       avg(m.mid - t.price) AS avg_markout
FROM trades AS t
HORIZON JOIN mid_prices AS m ON (t.sym = m.sym)
RANGE FROM 0s TO 5s STEP 1s AS h
ORDER BY t.sym, t.side, horizon_sec

Volume-weighted markout (non-keyed aggregation)

Compute an overall markout across all symbols without grouping keys:

SELECT h.offset / 1000000 AS horizon_sec,
       sum((m.mid - t.price) * t.qty) / sum(t.qty) AS vwap_markout
FROM trades AS t
HORIZON JOIN mid_prices AS m ON (t.sym = m.sym)
RANGE FROM 1s TO 60s STEP 1s AS h
ORDER BY horizon_sec

Use cases

  • Markout Analysis: Calculate P&L at various time horizons after trades
  • Event Impact Studies: Measure metric changes before/after events
  • Time-Series Correlation: Analyze relationships between series at different lags

Current limitations

  • No other joins: HORIZON JOIN cannot be combined with other joins in the same level of the query (but joins can be done on an outer level of the query).
  • No right-hand side filter: WHERE clause filters apply to the left-hand table only; right-hand table filters are not yet supported.
  • Both tables must have a designated timestamp: The left-hand and right-hand tables must each have a designated timestamp column.
  • RANGE constraints: STEP must be positive; FROM must be less than or equal to TO.
  • LIST constraints: Offsets must be interval literals (e.g. 1s, -2m, 0) and monotonically increasing.

TODOs

  • Benchmark prototype
  • Fix group by functions metadata in parallel factory's explain plan and cover EXPLAIN with tests
  • Fix broken symbol table source
  • Fix filter stealing - currently master filter is lost
  • Fix wrong column names and metadata used to build column source and index lists to be used in the factory
  • Special SQL syntax for markouts
  • Support nano timestamps
  • Add no join condition support
  • Separate factory for non-keyed aggregation
  • Optimize pre-sort step - try the same algo as in MarkoutHorizonRecordCursor
  • Optimize join - ASOF JOIN map should use int values instead of string when joining symbol columns
  • Support explicit GROUP BY clause
  • Single-threaded factories for markouts
  • LIST accepts same interval expression syntax as RANGE
  • More single-threaded tests
  • Fuzz test similar to WindowJoinFuzzTest
  • Parallel tests
  • [nice-to-have] Add slave filter support
  • [nice-to-have] Vectorized fast path for single offset key case
  • [nice-to-have] Add TOLERANCE clause support

mtopolnik
mtopolnik previously approved these changes Feb 18, 2026
puzpuzpuz and others added 10 commits February 18, 2026 18:50
Reduce unnecessary backward scanning in keyed ASOF JOIN within HORIZON JOIN queries
when master symbols don't exist in the slave table.

- Add hasNonExistentKey() to SymbolTranslatingRecord to detect master symbols absent
  from the slave's symbol table via VALUE_NOT_FOUND
- Add a fast path in backwardScanForKeyMatch that skips the full backward scan when
  a non-existent key is detected (checked after the watermark/cache lookup to avoid
  overhead on cache hits)
- Add findAsOfRow fast path: when the bookmarked row's timestamp <= target and the
  next row's timestamp > target, return immediately without a full linear scan
- Reduce default ASOF JOIN lookahead from 100 to 64 rows
@bluestreak01
Copy link
Copy Markdown
Member

@puzpuzpuz — Code review findings (verified). Each item was double-checked against the code; false positives from the initial pass were eliminated.


Confirmed Bug

GROUP BY validation is overly permissive

SqlOptimiser.java:10106-10174validateHorizonJoinGroupBy loop 2

expressionContainsLiteral checks whether a GROUP BY column appears anywhere in a SELECT expression tree, rather than verifying that all leaf literals are covered by GROUP BY. This is weaker than the regular GROUP BY validation (rewriteGroupBySelectExpression / replaceIfGroupByExpressionOrAggregate), which traverses to leaf nodes and rejects any bare literal not in the GROUP BY list.

Example that incorrectly passes:

SELECT a + b, sum(c) FROM t HORIZON JOIN ... GROUP BY a

Passes because a appears in a + b, even though b is not in GROUP BY. The regular GROUP BY path correctly rejects the equivalent SELECT x+y FROM t GROUP BY x with "column must appear in GROUP BY clause or aggregate function".

The fix would be to replace the expressionContainsLiteral fallback (line 10172) with leaf-level literal coverage checking, matching the strictness of the regular GROUP BY path.


UX Issue

Error positions hardcoded to 0 in buildHorizonColumnMappings

SqlCodeGenerator.java:813, 826, 833, 839, 865

Five SqlException.$(0, ...) calls. Position 0 points at the start of the query, not the offending column. The method receives no position information from its call site. Threading position through (e.g., from column expression nodes in the QueryModel) would fix this.


Defensive / Code Quality

Missing closeSlaveOnFailure = false after HORIZON JOIN factory creation

SqlCodeGenerator.java:~4940

After generateHorizonJoinFactory takes ownership of both master and slave, closeSlaveOnFailure is not set to false. Not a live bug today (no throwing code between the factory call and the break), but it breaks the convention established by ASOF JOIN (line ~4448) and SPLICE JOIN (line ~4467). Should be added for consistency and to prevent future regressions if code is added between the factory call and the break.

"horizon" keyword is a backward compatibility break

SqlParser.java:5176, 5205

Adding "horizon" to tableAliasStop and joinStartSet means it can no longer be used as a table alias or unqualified table name in implicit cross joins. Consistent with how other join keywords work, but worth documenting as a breaking change.

Missing explicit SELECT_MODEL_HORIZON_JOIN case in generateSelect

SqlCodeGenerator.java:~6240

The generateSelect switch falls through to default, which works correctly but is implicit. Compare with SELECT_MODEL_WINDOW_JOIN which has its own explicit case. Adding case SELECT_MODEL_HORIZON_JOIN -> generateSubQuery(model, executionContext); would improve clarity.


Positive Observations

  • Concurrency architecture is sound — per-worker isolation via PerWorkerLocks, proper memory fences through dispatch mechanism, correct happens-before relationships.
  • The PR fixes a pre-existing bug: missing recordAt() calls after frame transitions in AsyncWindowJoinFastRecordCursorFactory (4 locations).
  • No allocations on hot paths — records, maps, iterators are all pre-allocated and reused.
  • Clean extraction of GroupByShardingContext and GroupByMapFragment faithfully reproduces the original logic.
  • recordAt(int, long) optimization to avoid Rows.toRowID encoding/decoding is correct.

@puzpuzpuz
Copy link
Copy Markdown
Contributor Author

puzpuzpuz commented Feb 21, 2026

@puzpuzpuz — Code review findings (verified). Each item was double-checked against the code; false positives from the initial pass were eliminated.

@bluestreak01 thanks for the thorough review! Here's what was addressed in daa7827:

Confirmed Bug: GROUP BY validation is overly permissive

Fixed. Removed expressionContainsLiteral from both validation loops in validateHorizonJoinGroupBy. HORIZON JOIN GROUP BY now requires exact expression match (via compareExpressionsWithTablePrefixes) or alias/index match. Since HORIZON JOIN doesn't extract a virtual model, GROUP BY t.qty cannot cover SELECT t.qty + p.price — the user must write GROUP BY t.qty + p.price or use a column alias.

The expressionContainsLiteral method had no other callers and was removed entirely.

Existing tests that relied on the loose matching (e.g., GROUP BY h.offset matching SELECT h.offset / 1000000 AS sec_offs) were updated to use either the full expression or the alias. Two new tests were added for the partial expression coverage case.

UX Issue: Error positions hardcoded to 0

Fixed. Added a queryPosition parameter to buildHorizonColumnMappings, sourced from slaveModel.getJoinKeywordPosition() at the call site. All five SqlException.$(0, ...) calls now use this position. Notice that these exception are defensive checks hit only when the metadata was generated incorrectly.

Defensive: Missing closeSlaveOnFailure = false

Fixed. Added after the generateHorizonJoinFactory call in the JOIN_HORIZON case, consistent with ASOF/SPLICE JOIN convention.

Defensive: Missing explicit SELECT_MODEL_HORIZON_JOIN case in generateSelect

Not applied. The default branch correctly routes HORIZON JOIN models to generateJoins(model, executionContext), which is where JOIN_HORIZON processing happens. An explicit case SELECT_MODEL_HORIZON_JOIN -> generateSubQuery(...) would bypass generateJoins entirely, breaking HORIZON JOIN.

Defensive: "horizon" keyword backward compatibility

No code change needed — consistent with how other join keywords (asof, splice, lt, window) work. Will mention in the PR description.

@glasstiger
Copy link
Copy Markdown
Contributor

[PR Coverage check]

😍 pass : 3102 / 3496 (88.73%)

file detail

path covered line new line coverage
🔵 io/questdb/griffin/engine/table/SelectedRecordCursorFactory.java 0 3 00.00%
🔵 io/questdb/griffin/engine/groupby/DistinctTimeSeriesRecordCursorFactory.java 0 1 00.00%
🔵 io/questdb/griffin/engine/table/ExtraNullColumnCursorFactory.java 0 3 00.00%
🔵 io/questdb/griffin/engine/join/AsOfJoinFastRecordCursorFactory.java 0 2 00.00%
🔵 io/questdb/griffin/engine/join/AsyncWindowJoinFastRecordCursorFactory.java 2 5 40.00%
🔵 io/questdb/griffin/engine/join/AsyncWindowJoinRecordCursorFactory.java 2 5 40.00%
🔵 io/questdb/griffin/engine/table/HorizonJoinRecord.java 54 110 49.09%
🔵 io/questdb/griffin/engine/functions/bind/BindVariableServiceImpl.java 1 2 50.00%
🔵 io/questdb/griffin/engine/table/AsyncHorizonJoinNotKeyedRecordCursorFactory.java 92 179 51.40%
🔵 io/questdb/cairo/DefaultCairoConfiguration.java 2 3 66.67%
🔵 io/questdb/griffin/engine/table/AsyncGroupByRecordCursor.java 7 9 77.78%
🔵 io/questdb/griffin/engine/table/AsyncHorizonJoinAtom.java 50 64 78.12%
🔵 io/questdb/griffin/engine/table/AsyncHorizonJoinNotKeyedAtom.java 24 30 80.00%
🔵 io/questdb/griffin/engine/table/AsyncHorizonJoinRecordCursor.java 99 116 85.34%
🔵 io/questdb/griffin/engine/table/AsyncHorizonTimestampIterator.java 94 107 87.85%
🔵 io/questdb/griffin/SqlCodeGenerator.java 342 387 88.37%
🔵 io/questdb/griffin/SqlOptimiser.java 218 245 88.98%
🔵 io/questdb/griffin/engine/table/HorizonJoinTimeFrameHelper.java 250 284 88.03%
🔵 io/questdb/griffin/engine/join/AsyncWindowJoinRecordCursor.java 22 25 88.00%
🔵 io/questdb/griffin/engine/table/HorizonJoinSymbolTableSource.java 17 19 89.47%
🔵 io/questdb/griffin/engine/table/AsyncHorizonJoinNotKeyedRecordCursor.java 89 100 89.00%
🔵 io/questdb/griffin/engine/table/HorizonTimestampIterator.java 140 154 90.91%
🔵 io/questdb/griffin/engine/table/ConcurrentTimeFrameCursor.java 45 50 90.00%
🔵 io/questdb/griffin/engine/table/TimeFrameCursorImpl.java 43 46 93.48%
🔵 io/questdb/PropServerConfiguration.java 18 19 94.74%
🔵 io/questdb/griffin/engine/table/HorizonJoinRecordCursorFactory.java 179 190 94.21%
🔵 io/questdb/griffin/engine/functions/conditional/SwitchFunctionFactory.java 30 31 96.77%
🔵 io/questdb/griffin/engine/table/AsyncHorizonJoinRecordCursorFactory.java 199 207 96.14%
🔵 io/questdb/griffin/model/QueryModel.java 30 31 96.77%
🔵 io/questdb/griffin/engine/table/HorizonJoinNotKeyedRecordCursorFactory.java 164 169 97.04%
🔵 io/questdb/griffin/engine/table/BaseAsyncHorizonJoinAtom.java 190 195 97.44%
🔵 io/questdb/griffin/SqlParser.java 71 72 98.61%
🔵 io/questdb/griffin/engine/table/SymbolTranslatingRecord.java 54 55 98.18%
🔵 io/questdb/griffin/engine/table/GroupByShardingContext.java 200 204 98.04%
🔵 io/questdb/griffin/engine/table/AsyncFilterContext.java 5 5 100.00%
🔵 io/questdb/griffin/engine/groupby/SampleByFillNoneRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/std/Misc.java 8 8 100.00%
🔵 io/questdb/griffin/engine/table/AsyncTopKRecordCursorFactory.java 15 15 100.00%
🔵 io/questdb/cairo/sql/PageFrameMemoryRecord.java 2 2 100.00%
🔵 io/questdb/griffin/engine/join/WindowJoinTimeFrameHelper.java 11 11 100.00%
🔵 io/questdb/griffin/engine/groupby/TimestampSamplerFactory.java 35 35 100.00%
🔵 io/questdb/griffin/engine/table/AsyncGroupByNotKeyedRecordCursor.java 9 9 100.00%
🔵 io/questdb/griffin/engine/table/GroupByMapStats.java 7 7 100.00%
🔵 io/questdb/griffin/SqlExecutionContextImpl.java 4 4 100.00%
🔵 io/questdb/tasks/GroupByMergeShardTask.java 3 3 100.00%
🔵 io/questdb/PropertyKey.java 2 2 100.00%
🔵 io/questdb/cairo/map/OrderedMap.java 26 26 100.00%
🔵 io/questdb/griffin/engine/table/GroupByMapFragment.java 72 72 100.00%
🔵 io/questdb/griffin/model/HorizonJoinContext.java 52 52 100.00%
🔵 io/questdb/cairo/ColumnTypeConverter.java 2 2 100.00%
🔵 io/questdb/cairo/CairoConfigurationWrapper.java 2 2 100.00%
🔵 io/questdb/griffin/engine/groupby/AbstractSampleByFillRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/VarcharFunction.java 1 1 100.00%
🔵 io/questdb/griffin/SqlKeywords.java 8 8 100.00%
🔵 io/questdb/cairo/MillisTimestampDriver.java 8 8 100.00%
🔵 io/questdb/std/IntList.java 3 3 100.00%
🔵 io/questdb/griffin/engine/table/AsyncGroupByNotKeyedRecordCursorFactory.java 17 17 100.00%
🔵 io/questdb/griffin/engine/functions/window/RankFunctionFactory.java 1 1 100.00%
🔵 io/questdb/griffin/engine/groupby/SampleByInterpolateRecordCursorFactory.java 2 2 100.00%
🔵 io/questdb/griffin/engine/groupby/GroupByMergeShardJob.java 5 5 100.00%
🔵 io/questdb/griffin/engine/functions/cast/CastStrToDateFunctionFactory.java 1 1 100.00%
🔵 io/questdb/griffin/engine/groupby/DistinctRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/cast/CastVarcharToDateFunctionFactory.java 1 1 100.00%
🔵 io/questdb/griffin/engine/table/AsyncGroupByAtom.java 25 25 100.00%
🔵 io/questdb/griffin/engine/groupby/GroupByRecordCursorFactory.java 1 1 100.00%
🔵 io/questdb/griffin/FunctionParser.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/StrFunction.java 1 1 100.00%
🔵 io/questdb/griffin/engine/join/WindowJoinPrevailingCache.java 1 1 100.00%
🔵 io/questdb/griffin/engine/table/AsyncGroupByRecordCursorFactory.java 22 22 100.00%
🔵 io/questdb/griffin/engine/ops/CreateMatViewOperationImpl.java 2 2 100.00%
🔵 io/questdb/cairo/sql/PageFrameMemoryPool.java 2 2 100.00%
🔵 io/questdb/std/Numbers.java 1 1 100.00%
🔵 io/questdb/cairo/sql/PageFrameFilteredMemoryRecord.java 9 9 100.00%
🔵 io/questdb/griffin/engine/groupby/GroupByLongTopKJob.java 1 1 100.00%
🔵 io/questdb/griffin/engine/functions/json/JsonExtractFunction.java 1 1 100.00%
🔵 io/questdb/griffin/SqlUtil.java 2 2 100.00%

@bluestreak01 bluestreak01 merged commit 3c12324 into master Feb 23, 2026
44 checks passed
@bluestreak01 bluestreak01 deleted the mt_parallel-markout branch February 23, 2026 14:11
@puzpuzpuz
Copy link
Copy Markdown
Contributor Author

@bluestreak01 @mtopolnik thanks for your reviews!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Performance Performance improvements SQL Issues or changes relating to SQL execution

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Data races in parallel GROUP BY with DECIMAL128/256 keys

4 participants