Queue proxy shard transfer uses pipelining#8656
Conversation
7c68cde to
8f6f680
Compare
| // is guaranteed to return reached_end = true. | ||
| let _update_lock = self.update_lock.lock().await; | ||
| let transfer_from = self.transfer_from.load(Ordering::Relaxed); | ||
| let wal_batch = self.read_wal_batch(transfer_from).await?; |
There was a problem hiding this comment.
This does not guarantee we read everything.
If we've accumulated a lot more operations before taking the update lock, the batch we get here may be missing the tail.
| if is_last { | ||
| // For the last batch, acquire update_lock to prevent new writes from accumulating | ||
| // on the WAL, then re-read to ensure we capture everything. | ||
| // Since update_lock blocks all update() calls, the WAL is frozen and the re-read | ||
| // is guaranteed to return reached_end = true. | ||
| let _update_lock = self.update_lock.lock().await; | ||
| let transfer_from = self.transfer_from.load(Ordering::Relaxed); | ||
| let wal_batch = self.read_wal_batch(transfer_from).await?; | ||
| self.send_wal_batch(&wal_batch, true).await?; | ||
| break; | ||
| } |
There was a problem hiding this comment.
I find this separate send branch a bit confusing.
I have the following proposal to simplify it: #8675
Please feel free to pick the one you like best.
qdrant-cloud-bot
left a comment
There was a problem hiding this comment.
Good optimization — the pipelining approach is clean and the second commit properly addresses Tim's concern about the re-read under lock needing to loop to completion.
A few observations:
1. Spurious WaitUntil::Segment on a non-final batch
When a batch read without the lock has reached_end = true, the code acquires update_lock and sends it with is_last = true → WaitUntil::Segment. But if writes arrived between that unlocked read and lock acquisition, this isn't actually the last batch. The subsequent batches (under lock) will use WaitUntil::Wal until the true last one gets Segment again.
This is harmless (one extra segment-level wait, rarely triggered) but worth a comment to avoid confusion on future reads. Consider deriving is_last as batch.reached_end && update_lock.is_some() instead — that way only batches read under the lock can be treated as truly final, and you avoid the unnecessary Segment wait.
2. Prefetch on the truly-last batch is a no-op read
When update_lock is held and reached_end = true, the tokio::join! still fires read_wal_batch(next_from) which will return an empty batch that's immediately used to exit the loop on the next iteration. This is cheap, so probably not worth optimizing away, but a short comment like // prefetch will return empty; handled on next iteration would help readability.
3. send_wal_batch takes is_last but also uses wal_batch.total and transfer_from
send_wal_batch reads self.transfer_from at the top, but this value could have been updated by the previous call's self.transfer_from.store(idx + 1, ...) inside the same function. This works correctly because each send updates it on success and the next send reads the new value, but the "Load → use for is_first" pattern is a bit non-obvious. The is_first check works because only the very first call has transfer_from == self.started_at. Fine as-is, just noting the implicit coupling.
4. Minor: WalBatch.total can go stale (CodeRabbit's nitpick)
Agree this is cosmetic only — progress may briefly wobble but can't cause a bug. Not worth fixing unless you want to.
Overall LGTM — the lock management is correct and the pipelining design is sound. The key invariant — "once update_lock is held, no new writes accumulate, and we drain all remaining batches under the lock" — holds.
* Queue proxy shard transfer uses pipelining * make sure last WAL read is made under update_lock * simplify
Summary
During shard snapshot (and WAL delta) transfer, the backfill phase drains queued WAL operations to the remote via
transfer_all_missed_updates.The current implementation is fully sequential:
This PR overlaps WAL reads with network sends using
tokio::join!; while the current batch is being sent to the remote, the next batch is prefetched from the WAL concurrently.This eliminates the WAL read latency between network round-trips.
Changes
transfer_wal_batchintoread_wal_batchandsend_wal_batchtransfer_all_missed_updatesto pipeline non-last batchesupdate_lockand re-reads WAL for correctnessDesign
Before: READ₁ → SEND₁ → READ₂ → SEND₂ → ... → READ_N → SEND_N(last)
After: READ₁ → [SEND₁ + READ₂] → [SEND₂ + READ₃] → ... → SEND_N(last)
The last batch is always handled under
update_lockwith a fresh WAL read to guarantee no writes are missed.Prefetched batches that appear to be "last" are discarded and re-read under the lock.
Peak memory usage increases from ~1 to ~2 batch buffers (~64 MiB) during pipelined transfers.
Test plan
cargo build -p collectioncargo test -p collection transfertest_shard_snapshot_transfer.py)