perf: Reduce native shuffle memory overhead by 50%#1452
perf: Reduce native shuffle memory overhead by 50%#1452andygrove merged 7 commits intoapache:mainfrom
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1452 +/- ##
============================================
+ Coverage 56.12% 58.56% +2.43%
- Complexity 976 1015 +39
============================================
Files 119 122 +3
Lines 11743 12223 +480
Branches 2251 2295 +44
============================================
+ Hits 6591 7158 +567
+ Misses 4012 3913 -99
- Partials 1140 1152 +12 ☔ View full report in Codecov by Sentry. |
47491a9 to
4cfe993
Compare
| num_output_partitions: usize, | ||
| runtime: Arc<RuntimeEnv>, | ||
| metrics: ShuffleRepartitionerMetrics, | ||
| reservation: MemoryReservation, |
There was a problem hiding this comment.
This is the main change; removing the memory tracking in ShuffleRepartitioner because we already track the memory in each instance of PartitionedBuffer.
| let mut mem_diff = self | ||
| .append_rows_to_partition( | ||
| input.columns(), | ||
| &shuffled_partition_ids[start..end], | ||
| partition_id, | ||
| ) | ||
| .await?; | ||
|
|
||
| if mem_diff > 0 { | ||
| let mem_increase = mem_diff as usize; | ||
|
|
||
| let try_grow = { | ||
| let mut mempool_timer = self.metrics.mempool_time.timer(); | ||
| let result = self.reservation.try_grow(mem_increase); | ||
| mempool_timer.stop(); | ||
| result | ||
| }; | ||
|
|
||
| if try_grow.is_err() { | ||
| self.spill().await?; | ||
| let mut mempool_timer = self.metrics.mempool_time.timer(); | ||
| self.reservation.free(); | ||
| self.reservation.try_grow(mem_increase)?; | ||
| mempool_timer.stop(); | ||
| mem_diff = 0; | ||
| } | ||
| } | ||
|
|
||
| if mem_diff < 0 { | ||
| let mem_used = self.reservation.size(); | ||
| let mem_decrease = mem_used.min(-mem_diff as usize); | ||
| let mut mempool_timer = self.metrics.mempool_time.timer(); | ||
| self.reservation.shrink(mem_decrease); | ||
| mempool_timer.stop(); | ||
| } |
There was a problem hiding this comment.
We don't need any of this memory accounting because it is already handled within append_rows_to_partition
|
|
||
| mem_diff += self.active_slots_mem_size as isize; | ||
| } | ||
| Ok(mem_diff) |
There was a problem hiding this comment.
no need to return memory size here because we already reserved the memory in this method
| } | ||
| start = end; | ||
| } | ||
| AppendRowStatus::MemDiff(Ok(mem_diff)) |
There was a problem hiding this comment.
No need to return memory size here because all accounting already took place in the calls to allocate_active_builders and flush in this method
| mempool_timer.stop(); | ||
|
|
||
| mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; | ||
| Ok(mem_diff) |
There was a problem hiding this comment.
No need to return memory size because memory accounting already happened in this method.
| // TODO reservation should not be zero because there are active builders again | ||
| assert_eq!(0, buffer.reservation.size()); | ||
| assert_eq!(106496, buffer.reservation.size()); |
There was a problem hiding this comment.
This demonstrates that the memory accounting is now more accurate
|
@mbutrovich This PR is ready for review now |
There was a problem hiding this comment.
I find "active" vague and the mixing of "active rows" and "active slots" a bit confusing in PartitionBuffer, but that shouldn't stop this PR. Nicely done, @andygrove!
|
Thanks for the reviews @viirya and @mbutrovich. I agree that we could update some of the naming. |
Which issue does this PR close?
Closes #1448
Rationale for this change
We were reserving memory twice in native shuffle, resulting in excessive shuffling.
Here are results for TPC-H q9 with 3GB off-heap memory allocated:
Before (main branch)
After
What changes are included in this PR?
Stop double reserving memory.
How are these changes tested?