Skip to content

Support rewrite to optimize order by limit#82478

Closed
acking-you wants to merge 11 commits intoClickHouse:masterfrom
acking-you:support_order_by_limit_rewrite
Closed

Support rewrite to optimize order by limit#82478
acking-you wants to merge 11 commits intoClickHouse:masterfrom
acking-you:support_order_by_limit_rewrite

Conversation

@acking-you
Copy link
Copy Markdown
Contributor

@acking-you acking-you commented Jun 24, 2025

Summary

Relevant issue: #79645
This PR optimizes order by limit based on where (xx) in subquery, with the specific description as follows:

For SQL statements with a large LIMIT value, such as order by limit 100000, significant performance improvements can be achieved. Below are the test results on the hits dataset, showing a performance increase of nearly 100 times compared to ColumnLazy.

The test version is 25.7.1.1, and the following query results are all hot queries (execute three times and take the best result):

-- original query cannot proceed normally (memory usage exceeds the limit)
SELECT * FROM hits ORDER BY EventTime LIMIT 100000;

-- enable ColumnLazy
set query_plan_max_limit_for_lazy_materialization=200000;
SELECT * FROM hits ORDER BY EventTime LIMIT 100000;

100000 rows in set. Elapsed: 71.017 sec. Processed 78.00 million rows, 935.97 MB (1.10 million rows/s., 13.18 MB/s.)
Peak memory usage: 855.88 MiB.


-- rewrite order by limit
SELECT* from hits where (_part_starting_offset + _part_offset) IN (SELECT _part_starting_offset+_part_offset FROM hits ORDER BY EventTime LIMIT 100000) ORDER BY EventTime;

100000 rows in set. Elapsed: 0.784 sec. Processed 81.78 million rows, 3.23 GB (104.32 million rows/s., 4.12 GB/s.)
Peak memory usage: 776.61 MiB.

The performance comparison is as follows:

Method Description Execution Time Speedup Factor
Original query (fails)​
SELECT * FROM hits ORDER BY EventTime LIMIT 100000
❌ Fails
(Memory limit exceeded)
-
Lazy materialization enabled
SET query_plan_max_limit...
+ original query
71.017 seconds 1.0x (Baseline)
Optimized rewritten query
SELECT * FROM hits WHERE ... IN (subquery)
0.784 seconds 90.6x faster
(71.017 ÷ 0.784)

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

  • Added RewriteOrderByLimitPass in Analyzer for pattern recognition and rewriting
  • Added configuration parameters:
    • query_plan_rewrite_order_by_limit
    • query_plan_max_limit_for_rewrite_order_by_limit
    • query_plan_min_columns_to_use_rewrite_order_by_limit
  • Disabled by default (incompatible with distributed tables)
  • Default limit: 1 million rows (matching DuckDB's implementation)

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

Add the capability to rewrite "order by limit" into a subquery based on rowid.

Known Issues

This optimization is only applicable to non-distributed tables for the following reasons:

  • Using (part_starting_offset+part_offset) or (part,part_offset) to identify a row is only valid in the case of non-distributed tables.
  • A stable global snapshot is required (possibly already resolved by PR).

Possible solutions are:

  1. When querying distributed tables, manually rewrite the local plan during createLocalPlan.
  2. Add some virtual columns (e.g., ip, shard_name) to identify a row in the case of distributed tables.

@acking-you acking-you force-pushed the support_order_by_limit_rewrite branch from 7c4a818 to db759d5 Compare June 24, 2025 08:09
@novikd novikd self-assigned this Jun 24, 2025
@novikd
Copy link
Copy Markdown
Member

novikd commented Jun 24, 2025

Could you please provide a test for it?

@acking-you
Copy link
Copy Markdown
Contributor Author

Could you please provide a test for it?

okey,I will do it later

@UnamedRus
Copy link
Copy Markdown
Contributor

Another alternative is to use _part_granule_offset #82341.
Because, you in any case need to read most of rows from specific granule, but it will reduce size of IN statement for big LIMIT values and reduce cost of index lookup by it.

Also, make sense to wrap it in indexHint function, to only use it in index lookup.

@acking-you
Copy link
Copy Markdown
Contributor Author

Another alternative is to use _part_granule_offset #82341. Because, you in any case need to read most of rows from specific granule, but it will reduce size of IN statement for big LIMIT values and reduce cost of index lookup by it.

Also, make sense to wrap it in indexHint function, to only use it in index lookup.

Thank you very much for your suggestion!

In fact, this optimization is not only suitable for large data volumes but also performs well with small data volumes (e.g., limit 10).

  1. I tried wrapping the where condition with indexHint, which resulted in a two to threefold performance drop (tested with limit 10/1000/100000).
  2. The introduction of either _part_granule_offset or indexHint causes the main query to scan more data, which might be the root cause of the performance degradation.

@acking-you
Copy link
Copy Markdown
Contributor Author

acking-you commented Jun 24, 2025

Could you please provide a test for it?

done @novikd

@amosbird
Copy link
Copy Markdown
Collaborator

Another alternative is to use _part_granule_offset

@UnamedRus I don't think it's feasible. We still need row offsets to retrieve the Top K rows, and index analysis for _part_granule_offset hasn't been implemented — mainly because I believe there are no valid use cases that depend on it - though I might be wrong about this assumption...

@novikd novikd added the can be tested Allows running workflows for external contributors label Jun 24, 2025
@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh bot commented Jun 24, 2025

Workflow [PR], commit [eb995ef]

Summary:

job_name test_name status info comment
Stateless tests (amd_binary, ParallelReplicas, s3 storage) failure
02233_HTTP_ranged FAIL
Integration tests (asan, old analyzer, 4/6) failure
Job Timeout Expired FAIL
Stress test (amd_ubsan) failure
Server died FAIL
Hung check failed, possible deadlock found (see hung_check.log) FAIL
Killed by signal (in clickhouse-server.log) FAIL
Fatal message in clickhouse-server.log (see fatal_messages.txt) FAIL
Killed by signal (output files) FAIL
Found signal in gdb.log FAIL

@EmeraldShift
Copy link
Copy Markdown
Contributor

index analysis for _part_granule_offset hasn't been implemented — mainly because I believe there are no valid use cases that depend on it

Could it be useful in an inverted projection index to efficiently store granules to check? Maybe there are cases where there would be too much data if you store the individual offsets, and you're willing to spend more time re-checking the indexed predicate upon reading the data rather than getting it from the index? Something like

PROJECTION prj (SELECT groupBitmap(_part_granule_offset) GROUP BY field ORDER BY field)

where field correlates strongly with granules?

@UnamedRus
Copy link
Copy Markdown
Contributor

UnamedRus commented Jun 24, 2025

We still need row offsets to retrieve the Top K rows,

It's more like, If we are going to read most of granula anyway, wouldn't it be simpler to read it as whole and just sort more rows?
Plus, data is usually time sorted, so all interesting timestamp and rows will probably reside in the same granules, so actual overhead of sorting (for all data from granulas) wouldn't be that big.

It's bit of stretch, but example when this happens. (plus second subquery is more "expensive" to run)

SELECT *
FROM hits
WHERE (CounterID = 105857) AND ((_part, _part_offset) IN (
    SELECT
        _part,
        _part_offset
    FROM hits
    WHERE CounterID = 105857
    ORDER BY EventTime DESC
    LIMIT 20000
))
ORDER BY EventTime DESC
LIMIT 20000
FORMAT `Null`

Ok.

0 rows in set. Elapsed: 0.080 sec. Processed 5.82 million rows, 114.52 MB (72.61 million rows/s., 1.43 GB/s.)
Peak memory usage: 41.61 MiB.

SELECT *
FROM hits
WHERE (CounterID = 105857) AND indexHint((_part, _part_offset) IN (
    SELECT *
    FROM
    (
        SELECT
            _part,
            intDiv(_part_offset, 8192) * 8192 AS offset
        FROM hits
        WHERE CounterID = 105857
        ORDER BY EventTime DESC
        LIMIT 20000
    )
    LIMIT 1 BY
        _part,
        offset
))
ORDER BY EventTime DESC
LIMIT 20000
FORMAT `Null`

Ok.

0 rows in set. Elapsed: 0.072 sec. Processed 5.82 million rows, 114.26 MB (80.63 million rows/s., 1.58 GB/s.)
Peak memory usage: 52.32 MiB.

@acking-you acking-you force-pushed the support_order_by_limit_rewrite branch from 75d4f55 to d257ad7 Compare June 25, 2025 03:40
@acking-you
Copy link
Copy Markdown
Contributor Author

Now, by modifying the manual invocation of rewrite in buildQueryTreeForShard and implementing logic in rewrite to identify and optimize only for StorageMergeTree, we've successfully enabled this optimization in distributed tables!

@amosbird
Copy link
Copy Markdown
Collaborator

It's more like, If we are going to read most of granula anyway, wouldn't it be simpler to read it as whole and just sort more rows?

But you'll still need to apply filters again. While there might be edge cases that benefit from this approach, ClickHouse already handles IN sets with around a million elements quite efficiently. I’m skeptical there's a real use case for doing top-N over billions :)

@clickhouse-gh clickhouse-gh bot added the pr-performance Pull request with some performance improvements label Jun 25, 2025
@amosbird
Copy link
Copy Markdown
Collaborator

Could it be useful in an inverted projection index to efficiently store granules to check?

The main benefits of the projection index (row-level index) are twofold:

  • Predicate Filtering – It allows computing a bitmap (row-level filter) directly from the projection index and applying filters at the very beginning of the PREWHERE chain. This has been implemented in PR #81021.

  • Expression Materialization – It enables serving precomputed values for certain expressions directly from the projection index, which avoids both expression evaluation and reading the underlying source columns.

If we only track granules, we can only support the first benefit — and its actual impact can vary significantly depending on workload and data distribution in real-world scenarios.

@acking-you acking-you force-pushed the support_order_by_limit_rewrite branch from c0e9198 to eb995ef Compare June 26, 2025 02:45
@JiaQiTang98
Copy link
Copy Markdown
Contributor

An additional question, does it support pushing optimize_read_in_order into projection? Or will it?

@acking-you
Copy link
Copy Markdown
Contributor Author

acking-you commented Jul 1, 2025

An additional question, does it support pushing optimize_read_in_order into projection? Or will it?

Great point!

I think that whether to support the pushdown of optimize_read_in_order to projection is unrelated to this PR. I noticed that the current implementation of optimizeUseNormalProjections does not seem to account for this. It appears to only support utilizing simple WHERE conditions in projections.

Here is my testing process:

CREATE TABLE mydata (`A` Int64, `B` Int64, `C` String, projection p (select A,B,_part_offset order by B) ) ENGINE = MergeTree ORDER BY A AS SELECT number AS A, 999999 - number AS B, if(number between 1000 and 2000, 'x', toString(number)) AS C FROM numbers(1000000);
insert into mydata SELECT *  from mydata  where B > 14 limit 20;
insert into mydata values(1000001,-100,'a');

-- where can be optimized by the projection
select * from mydata where _part_offset+_part_starting_offset in( select _part_offset+_part_starting_offset from mydata where B < 10 order by B limit 10);
10 rows in set. Elapsed: 0.038 sec. Processed 8.77 thousand rows, 88.13 KB (231.96 thousand rows/s., 2.33 MB/s.)
Peak memory usage: 472.89 KiB.

-- optimize_read_in_order can be applied to non-projection
select * from mydata where _part_offset+_part_starting_offset in( select _part_offset+_part_starting_offset from mydata  order by A limit 10) settings optimize_read_in_order=1;
10 rows in set. Elapsed: 0.027 sec. Processed 16.41 thousand rows, 430.40 KB (607.41 thousand rows/s., 15.94 MB/s.)
Peak memory usage: 448.11 KiB.
-- Cannot be applied to projection
select * from mydata where _part_offset+_part_starting_offset in( select _part_offset+_part_starting_offset from mydata  order by B limit 10) settings optimize_read_in_order=1;

I understand that as long as the original read process in optimizeUseNormalProjections supports pushing down optimize_read_in_order to the projection, this optimization will also benefit.

The potential concern is: will the read process of optimize_read_in_order cause an incorrect representation of row_id (_part_starting_offset + _part_offset)?

@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh bot commented Aug 5, 2025

Dear @novikd, this PR hasn't been updated for a while. You will be unassigned. Will you continue working on it? If so, please feel free to reassign yourself.

@EmeraldShift
Copy link
Copy Markdown
Contributor

I manually tried this rewrite for a use case where the right-hand side of IN is a very large ~90 million row set. It spent >9 seconds in CreatingSets. Is this an expected performance bottleneck of this approach? I separately commented on it in more detail here, but I don't know if that PR is the right fix.

@acking-you
Copy link
Copy Markdown
Contributor Author

It spent >9 seconds in CreatingSets. Is this an expected performance bottleneck of this approach?

Yes, this is expected. But I understand that doing so is similar to splitting a single SQL into two executions. The second execution already clearly identifies which data needs to be scanned, which is very effective in scenarios where a full table scan is required by default, such as the "order by limit" issue resolved in this PR. If you need to avoid the involvement of the CreatingSets operator, the previous ColumnLazy was a good solution. However, it also requires a secondary table-rewriting operation when data is actually read, and it is difficult to make full use of the existing parallel reading pipeline. I haven't thought of a better solution for now.

@amosbird amosbird mentioned this pull request Sep 8, 2025
1 task
@EmeraldShift
Copy link
Copy Markdown
Contributor

Does anyone know why this is so much faster than lazy materialization? I'm looking into a logging use case with search queries like this:

SELECT *
FROM distributed_logs_table
WHERE ...
ORDER BY time DESC
LIMIT 500

At a limit of 500, should I expect this feature to still be ~100x faster (!!!) than ColumnLazy? What are the trade-offs?

Also, in the meantime, without this PR merged yet, is there any way to get the correct behavior with a Distributed table by manually rewriting the query, or is it impossible?

@acking-you
Copy link
Copy Markdown
Contributor Author

Does anyone know why this is so much faster than lazy materialization?

When I tested before, this gap was mainly because ColumLazy was unable to read in parallel when performing final materialized reading. I don’t know if the latest version of ClickHouse has solved this problem.

What are the trade-offs?

SELECT *
FROM distributed_logs_table
WHERE ...
ORDER BY time DESC
LIMIT 500

I think if the data itself is order by time, there should be no advantage in using delayed materialization, including rewriting the query statement. I think you can try turning off delayed materialization.

@EmeraldShift
Copy link
Copy Markdown
Contributor

Now, by modifying the manual invocation of rewrite in buildQueryTreeForShard and implementing logic in rewrite to identify and optimize only for StorageMergeTree, we've successfully enabled this optimization in distributed tables!

I'm curious, can you say more about how this optimization works for Distributed tables? Does it perform a local LIMIT n on each shard, then merge? Or does it do some two-phase thing where it collects a global n-size set of offsets, then fetch the rows? If it's the first one, does it depend on distributed_push_down_limit? (for some reason this pushdown doesn't seem to work for me by default)

@acking-you
Copy link
Copy Markdown
Contributor Author

I'm curious, can you say more about how this optimization works for Distributed tables? Does it perform a local LIMIT n on each shard, then merge? does it depend on distributed_push_down_limit? (for some reason this pushdown doesn't seem to work for me by default)

The current implementation of distributed tables is very simple—essentially just a rewrite over each local table. Your guess is correct: it performs a local LIMIT n on each shard, then merges the results. This is because providing global support across distributed tables involves many challenges at the moment. Whether or not distributed_push_down_limit is used depends on whether the rewritten QueryTree on each shard contains a LIMIT clause. I believe this setting is enabled by default.

Or does it do some two-phase thing where it collects a global n-size set of offsets, then fetch the rows?

We're implementing a two-phase approach to achieve proper global ORDER BY with LIMIT:

Phase 1: Collect globally-sorted RowIDs

-- Rewritten internally
INSERT INTO _rowid_collector()
SELECT _rowid FROM distributed_table 
WHERE condition 
ORDER BY sort_col 
LIMIT 100  -- This is GLOBAL limit

Here, _rowid_collector() is a table function​ that creates a temporary in-memory storage table.

Phase 2: Fetch exact rows

-- Using collected RowIDs
SELECT cols FROM distributed_table 
WHERE _rowid IN (...100 exact IDs...)

Key difference from per-shard LIMIT: Phase 1 performs global sorting across all shards (not just local per shard), ensuring correct top-100 results.

Bonus: Once RowIDs are stored in memory, we can support asynchronous fetching for simple queries that can use these RowIDs directly (though not for aggregations that would require re-scanning data).

@KochetovNicolai
Copy link
Copy Markdown
Member

@acking-you

The improved implementation of lazy materialization in 25.12 shows me about the same speedup

:) SELECT * from test.hits where (_part_starting_offset + _part_offset) IN (SELECT _part_starting_offset+_part_offset FROM test.hits ORDER BY EventTime LIMIT 100000) ORDER BY EventTime format Null settings use_query_condition_cache=0;

SELECT *
FROM test.hits
WHERE (_part_starting_offset + _part_offset) IN (
    SELECT _part_starting_offset + _part_offset
    FROM test.hits
    ORDER BY EventTime ASC
    LIMIT 100000
)
ORDER BY EventTime ASC
FORMAT `Null`
SETTINGS use_query_condition_cache = 0

Query id: a63f5e43-9fd5-4d7e-91f9-09eaedcf61f3

Ok.

0 rows in set. Elapsed: 0.332 sec. Processed 7.76 million rows, 769.19 MB (23.37 million rows/s., 2.32 GB/s.)
Peak memory usage: 446.44 MiB.

:) SELECT * from test.hits ORDER BY EventTime LIMIT 100000 settings query_plan_max_limit_for_lazy_materialization=200000, use_query_condition_cache = 0 format Null

SELECT *
FROM test.hits
ORDER BY EventTime ASC
LIMIT 100000
SETTINGS query_plan_max_limit_for_lazy_materialization = 200000, use_query_condition_cache = 0
FORMAT `Null`

Query id: 43f78e0e-5b70-46be-97ee-02784f4366c6

Ok.

0 rows in set. Elapsed: 0.277 sec. Processed 7.76 million rows, 744.69 MB (28.02 million rows/s., 2.69 GB/s.)
Peak memory usage: 267.48 MiB.

@acking-you
Copy link
Copy Markdown
Contributor Author

The improved implementation of lazy materialization in 25.12 shows me about the same speedup

great job!❤️

@acking-you acking-you closed this Dec 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors pr-performance Pull request with some performance improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants