Memory reservation & metrics for cross join#5339
Conversation
|
Thank you @korowa -- I plan to review this carefully tomorrow |
alamb
left a comment
There was a problem hiding this comment.
Thank you @korowa -- this looks great
I think we should also add sql level test for the memory limit code in; I think it should be easy to add one to https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/memory_limit.rs. I am happy to add such a test if you prefer as a follow on PR.
I also tried the metrics out in datafusion-cli and it looks good to me
❯ create or replace table t as select column1 as value, column2 as time from (select * from (values
(1, timestamp '2022-01-01 00:00:30'),
(2, timestamp '2022-01-01 01:00:10'),
(3, timestamp '2022-01-02 00:00:20')
) as sq) as sq;
0 rows in set. Query took 0.006 seconds.
❯ explain analyze select * from t t1 CROSS JOIN (select * from t where value < 3) as t2;
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalescePartitionsExec, metrics=[output_rows=6, elapsed_compute=53.015µs, spill_count=0, spilled_bytes=0, mem_used=0] |
| | ProjectionExec: expr=[value@0 as value, time@1 as time, value@2 as value, time@3 as time], metrics=[output_rows=6, elapsed_compute=10.477µs, spill_count=0, spilled_bytes=0, mem_used=0] |
| | CrossJoinExec, metrics=[output_rows=6, input_rows=6, build_input_rows=3, input_batches=1, build_input_batches=1, output_batches=3, build_mem_used=496, build_time=107.605µs, join_time=64.668µs] |
| | MemoryExec: partitions=1, partition_sizes=[1], metrics=[] |
| | ProjectionExec: expr=[value@0 as value, time@1 as time], metrics=[output_rows=2, elapsed_compute=6.424µs, spill_count=0, spilled_bytes=0, mem_used=0] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=69.635µs, spill_count=0, spilled_bytes=0, mem_used=0] |
| | FilterExec: value@0 < 3, metrics=[output_rows=2, elapsed_compute=147.686µs, spill_count=0, spilled_bytes=0, mem_used=0] |
| | RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1, metrics=[fetch_time=15.452µs, repart_time=1ns, send_time=9.12µs] |
| | MemoryExec: partitions=1, partition_sizes=[1], metrics=[] |
| | |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| let batch_size = batch.get_array_memory_size(); | ||
| // Reserve memory for incoming batch | ||
| acc.3.lock().try_grow(batch_size)?; | ||
| // Update metrics |
| Some(result) | ||
| } | ||
| other => { | ||
| debug!( |
There was a problem hiding this comment.
I think it is a nice improvement that the metrics are now included in things like EXPLAIN ANALYZE
|
Thank you for the review @alamb!
Seems like new UPD: |
|
Benchmark runs are scheduled for baseline = 20d08ab and contender = ea3b965. ea3b965 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #5162.
Part of #5220.
Rationale for this change
Memory management for 1/4 Join operators
What changes are included in this PR?
CrossJoinExec.reservation-- the actual reservation, used inload_left_inputclosure to reserve memory for build-side data, and in stream polling function to free memory after probe-side stream has been exhausted. Another point where memory gets freed is a destructor of reservation (i.e. in case when parent operator hasLIMIT), so it's not required to store reservation as a stream attribute, but explicitly callingfreeseems like more correct wayBuildProbeJoinMetrics-- at this moment CrossJoinExec lacks metrics which could be exposed inexplain analyze-- the counters fromCrossJoinStreamhas been moved toBuildProbeJoinMetricsstructure -- it could also be used in followups for bothNestedLoopJoinExecandHashJoinExecas they all share almost the same "build & probe" idea, so their metrics could be similar.Are these changes tested?
Test cases for normal cross join execution and asserting error in case of overallocation attempt have been added
Are there any user-facing changes?
Not really, but should help to avoid OOM and fail with
Resources exhaustederror.