Skip to content

feat(sql): optimized ASOF JOIN on single symbol key where RHS symbol is low-frequency#6208

Merged
bluestreak01 merged 140 commits intomasterfrom
mt_asof-join-fast
Oct 28, 2025
Merged

feat(sql): optimized ASOF JOIN on single symbol key where RHS symbol is low-frequency#6208
bluestreak01 merged 140 commits intomasterfrom
mt_asof-join-fast

Conversation

@mtopolnik
Copy link
Copy Markdown
Contributor

@mtopolnik mtopolnik commented Sep 30, 2025

Keyed ASOF JOIN on a non-indexed symbol column uses linear scan to find the matching RHS row. For each LHS row, it repeats the RHS search from scratch. This works fine as long as the RHS table has plenty occurrences of all symbols.

However, if some symbol occurs in the RHS table only rarely, a great many rows will have to be scanned to find it. Then, when we move on after finding it, and encounter the same symbol in a future LHS row, we'll have to repeat the entire scan, only to find the exact same RHS symbol.

This PR saves time by remembering the location of each symbol it encounters in its search for the matching RHS row, and then reusing that knowledge when scanning again. It also remembers where it didn't find a symbol, allowing it to skip over the already-scanned region of the RHS table.

Benchmark

The benchmark uses two tables, prices as the RHS in the join, and orders as LHS.

We set up the data using the Zipf (long-tail) distribution of 5,000 symbols in prices, but hand-pick only the 500 rarest symbols for orders. Each of these symbols occurs just 5-9 times among the table's 320 million rows.

We set up the timestamps such that orders in orders happen later than almost all the data in prices, resulting in a large search space for the matching ASOF JOIN row.

  1. Currency price table:
CREATE TABLE prices (
      ts TIMESTAMP,
      sym SYMBOL,
      price DOUBLE
  ) timestamp(ts) PARTITION BY DAY;
INSERT INTO prices
  SELECT
      dateadd('s', x::int, '2010-01-01T00:00:00.000000Z') as ts,
      rnd_symbol_zipf(5_000, 2.0),
      rnd_double() * 10.0 + 5.0
      FROM long_sequence(320_000_000);
  1. Trade orders table:
CREATE TABLE orders (
    id LONG,
    order_ts TIMESTAMP,
    sym1 SYMBOL CAPACITY 1024,
    sym2 SYMBOL CAPACITY 1024,
    unit_price DOUBLE,
    volume DOUBLE
) TIMESTAMP(order_ts) PARTITION BY DAY;
WITH order_basics AS (SELECT * FROM (
  SELECT
    x AS id,
    dateadd('s', (x * 10)::int, '2020-01-01T00:00:00.000000Z') AS order_ts,
    rnd_symbol('sym4893','sym4774','sym4535', ... 500 rarest symbols in prices) AS sym1,
    rnd_symbol('sym4893','sym4774','sym4535', ... 500 rarest symbols in prices) AS sym2,
    rnd_double() * 20 + 10 AS volume
  FROM long_sequence(1000))
  TIMESTAMP(order_ts)
  ),
  join1 AS (
    SELECT ob.*, p.price price1 FROM order_basics ob ASOF JOIN prices p ON (ob.sym1 = p.sym)
  ),
  join2 AS (
    SELECT j1.*, p.price price2 FROM join1 j1 ASOF JOIN prices p ON (j1.sym2 = p.sym)
  )
INSERT INTO orders
SELECT id, order_ts, sym1, sym2, price1/price2 AS unit_price, volume FROM join2;

The query:

WITH
  offsets AS (
    SELECT sec_offs, 1_000_000 * sec_offs usec_offs 
    FROM (SELECT x-601 AS sec_offs FROM long_sequence(1201))
  ),
  points AS (SELECT * FROM (
    SELECT id, order_ts, sym1, sym2, unit_price, volume, sec_offs, order_ts + usec_offs AS ts
    FROM orders CROSS JOIN offsets
    ORDER BY order_ts + usec_offs
  ) TIMESTAMP(ts)),
  join1 AS (
    SELECT t.*, p.price AS sym1_price
    FROM points as t
    ASOF JOIN prices as p
    ON (t.sym1 = p.sym)
  ),
  join2 AS (
    SELECT t.*, p.price AS sym2_price
    FROM join1 as t
    ASOF JOIN prices as p
    ON (t.sym2 = p.sym)
  ),
  markouts AS (
  SELECT
    sec_offs, 
    volume,
    volume * (unit_price - (sym1_price/sym2_price)) AS weighted_markout
  FROM join2
  )
SELECT sec_offs, sum(weighted_markout) / sum(volume) AS avg_weigthed_markout 
FROM markouts;

This query results in 1,201,000 rows.

  • With the SQL hint that disables the new algo, SELECT /*+ asof_fast_search(t p)*/, the query times out.
  • Using Indexed Scan with SELECT /*+ asof_index_search(t p)*/ ...., the query times out again. Now the search is able to skip over entire partitions where the symbol doesn't exist, but it must still go through the partitions one by one, repeatedly.
  • Using the new default Memoized Scan, the query takes around 15 seconds on a cold disk cache, and 8.5 seconds when repeated.

@puzpuzpuz puzpuzpuz self-requested a review October 22, 2025 14:32
puzpuzpuz
puzpuzpuz previously approved these changes Oct 22, 2025
Copy link
Copy Markdown
Member

@bluestreak01 bluestreak01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactoring in this PR introduced previously non-existent possibility of NPE in SQLOptimiser. Intellij produces warnings that should be investigated:

Image

@mtopolnik
Copy link
Copy Markdown
Contributor Author

refactoring in this PR introduced previously non-existent possibility of NPE in SQLOptimiser. Intellij produces warnings that should be investigated:

The changes I see in SqlOptimiser are unrelated to the warning.

I think the reason why this is showing up now isn't any change in this PR, but an improvement to the static analysis in IDEA. It can see that a few lines above we have a check if (innerVirtualModel != null), but don't have it later on, where this warning is raised. The reason why I think it's safe without the check is that the parameter addColumnToInnerVirtualModel is only true when innerVirtualModel is non-null. But I haven't 100% confirmed this.

@glasstiger
Copy link
Copy Markdown
Contributor

[PR Coverage check]

😍 pass : 280 / 296 (94.59%)

file detail

path covered line new line coverage
🔵 io/questdb/griffin/engine/join/ChainedSymbolColumnAccessHelper.java 0 4 00.00%
🔵 io/questdb/griffin/engine/join/NoopColumnAccessHelper.java 2 5 40.00%
🔵 io/questdb/griffin/engine/join/SingleVarcharColumnAccessHelper.java 6 7 85.71%
🔵 io/questdb/griffin/engine/join/SingleStringColumnAccessHelper.java 7 8 87.50%
🔵 io/questdb/griffin/engine/join/AsOfJoinMemoizedRecordCursorFactory.java 165 171 96.49%
🔵 io/questdb/griffin/SqlCodeGenerator.java 54 55 98.18%
🔵 io/questdb/std/CharSequenceLongHashMap.java 1 1 100.00%
🔵 io/questdb/std/LowerCaseCharSequenceIntHashMap.java 1 1 100.00%
🔵 io/questdb/std/IntLongHashMap.java 1 1 100.00%
🔵 io/questdb/griffin/engine/join/AsOfJoinFastRecordCursorFactory.java 12 12 100.00%
🔵 io/questdb/griffin/engine/join/AsofJoinColumnAccessHelper.java 1 1 100.00%
🔵 io/questdb/std/Utf8SequenceLongHashMap.java 1 1 100.00%
🔵 io/questdb/griffin/SqlHints.java 3 3 100.00%
🔵 io/questdb/griffin/engine/join/AsOfJoinIndexedRecordCursorFactory.java 5 5 100.00%
🔵 io/questdb/griffin/SqlOptimiser.java 3 3 100.00%
🔵 io/questdb/griffin/engine/join/SingleSymbolColumnAccessHelper.java 16 16 100.00%
🔵 io/questdb/std/CharSequenceIntHashMap.java 2 2 100.00%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Enhancement Enhance existing functionality Performance Performance improvements SQL Issues or changes relating to SQL execution

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants