feat(indexing): per-chunk progress events for SSE indexing stream#653
feat(indexing): per-chunk progress events for SSE indexing stream#653
Conversation
A 250-chunk file used to show "file.md" frozen for 10+ seconds because ``_index_file`` awaited the embedder's full ``embed_texts`` before any SSE event escaped. The per-file ``progress`` event landed all-or-nothing even though all four embedder backends process chunks in batches. Now each backend (OpenAI, Ollama, ONNX) calls an optional ``on_progress(done, total)`` after every batch completes, the engine forwards those ticks through a per-file ``asyncio.Queue`` into the existing ``index_path_stream`` generator as ``chunk_progress`` events, and the web Index tab surfaces them as ``file.md — 100/250 chunks``. The ONNX backend, previously a single ``to_thread(all_texts)`` call, now loops batch-by-batch using the existing ``EmbeddingConfig.batch_size`` so the default local backend gets visibility too. Why ``asyncio.Queue`` and not a buffered list: a buffer would serialize all ticks until the file completes — defeating the point. The queue forwards events in real time while the engine still waits on the inner task to surface ``_index_file``'s exception, preserving the #590 ``complete.errors`` contract. ``BaseException`` catch + ``task.cancel()`` in the consumer-abandon path stops embedding work past SSE response lifetime (no leaked OpenAI requests on client disconnect). The new ``EmbeddingConfig.progress_threshold`` (default 32, runtime- mutable) gates emission so small files stay quiet; setting it to 0 is an explicit "always emit" debug affordance. JS throttles ``chunk_progress`` DOM writes to 100 ms gaps but lets the final tick (``done == total``) through unconditionally — otherwise the user would see "(192/250)" right before the next file's name pops in (subtle jank, cheap to fix). Tests: - ``test_embedding_progress.py`` (new): per-backend monotonic + end-at- total + raise-swallow contract; one test per backend pinning the expected per-batch call count. - ``test_indexing_engine.py`` (appended): event flow above threshold, ordering invariant (chunk_progress yields before that file's progress), no emission below threshold, threshold=0 emits anywhere, embedding failure still surfaces in ``complete.errors`` (#590 guard), consumer ``aclose()`` cancels in-flight embedding and decrements ``_active_runs``. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
…tionale
Two follow-ups from PR review on the per-chunk progress feature:
1. ``test_onnx_numerical_parity_across_batch_sizes`` — real fastembed
ONNX with all-MiniLM-L6-v2, embeds a mixed-length text set with
``batch_size=2`` and ``batch_size=64`` and asserts
``np.allclose(rtol=1e-5)``. fastembed pads dynamically per ONNX
session call (longest-in-batch); different batch boundaries shift
padding shape. Even though attention masks should null out the
padded positions, ORT's floating-point reduction order can vary
with input shape — a silent drift would degrade search quality
across the entire DB after a reindex with no error to point at.
This test makes that risk visible in CI rather than in user logs.
Skipped via ``importorskip("fastembed")`` so the bulk of the suite
(with the in-process fakes) runs without the optional dep; only
the ``golden-path (ONNX bge-m3)`` job and similar fastembed-bearing
environments exercise it.
2. ``EmbeddingConfig.progress_threshold`` docstring — explain why the
default is ``32`` (= ``batch_size`` default ``64`` / 2). Files
producing ≤ batch_size chunks finish in one batch with no
mid-progress to show, so any threshold ≥ batch_size suppresses
them naturally; 32 means "1-batch files stay quiet, 2+-batch files
start ticking" — the natural break-point between "instant" and
"user wonders if anything is happening". Saves the next reader
(likely a future me) a ``git blame`` round-trip.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
Review follow-up (commit 8f53335)Addressed two of the three review-recommended items: 1. ONNX batch-shape numerical parity test (was: "throughput 보다는 numerical drift 가 더 신경 쓰임")Added This closes the "silent embedding drift after reindex" risk that would otherwise be invisible until search quality regressed across the DB. 2.
|
Wall-clock benchmark on 250 mixed-length texts caught a +20% regression introduced by the per-batch-loop refactor: variant A: model.embed(texts) default bs=256: 0.901s ← old variant B: model.embed(texts, batch_size=64) explicit bs=64: 1.082s variant C: for batch in batches: model.embed(batch) bs=64: 1.099s ← previous fix variant D: for vec in model.embed(texts): stream: 0.922s ← this commit The cost was the number of ORT session.run invocations (4 vs 1), not Python overhead. fastembed already batches internally at its default batch_size=256; forcing 64 (whether via Python chunking or the batch_size kwarg) doubled to quadrupled per-call setup cost. Switch ONNX to a single ``model.embed(texts)`` call and stream-iterate the resulting generator inside ``_embed_sync``, firing ``on_progress`` per yielded vector. Since ``_embed_sync`` runs in a worker thread, ``embed_texts`` wraps the callback with ``loop.call_soon_threadsafe`` and throttles to ~20 ticks per file (``total // 20``), with the final tick (``done == total``) always emitted to honor the SSE consumer's "(N/N)" final-render contract. Re-bench after the fix: +1.69% wall-clock vs no-progress fast path, 21 ticks per 250-text run. Well under the 5% SLO from the PR test plan. Also closes a subtle thread-safety hole: the previous implementation fired ``on_progress`` (often ``queue.put_nowait`` into the SSE event queue) directly from the worker thread, which happened to work because asyncio.Queue ignores the lack of cross-thread guarantees on ``put_nowait`` but isn't documented to. ``call_soon_threadsafe`` makes that contract explicit. Test changes: - ``test_onnx_streams_progress_per_yield`` (new): per-yield contract. - ``test_onnx_throttles_thread_hops_for_large_input`` (new): 200 texts → ~20 ticks (not 200), final tick == (n, n). - ``test_onnx_no_progress_skips_callback_plumbing`` (new): fast path pins ``on_progress=None`` reaching ``_embed_sync`` so a future refactor doesn't always pay the ``call_soon_threadsafe`` cost. - ``test_onnx_numerical_parity_*``: replaced "across batch_size" probe (now meaningless — single ORT call regardless) with "with vs without progress" parity, asserting bit-exact equality between fast path and streaming path. Tightened tolerance to rtol=1e-7 since both paths call the same underlying iterator in the same order. - ``test_onnx_fires_progress_per_batch`` and ``test_onnx_single_batch_*`` removed: ``config.batch_size`` no longer drives ONNX progress granularity. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
Wall-clock + SSE verify (commit d129eab)ONNX wall-clock (the test plan item)The test-plan item caught a real +20% regression on a 250-text run with the original Python-side batching. Diagnosis:
After commit This also closes a subtle thread-safety gap — earlier the callback (often Manual SSE verify (chunk_progress event flow)Stood up Small file (3 chunks, threshold=1) — exercises the always-emit path: Large file (60 chunks, threshold=32) — exercises the throttled path:
UI screenshot/GIF (small → large in one frame, per the review suggestion) is the one item I'm leaving for direct capture — Playwright MCP wasn't available in this session and SSE event-flow already proves the data contract end-to-end. Will attach as a follow-up comment when I do the visual check. CI test matrix should be green by now; other than the visual capture this PR is functionally verified. |
Follow-up trackingThree issues filed for the items deferred from this PR's scope:
A 2-week automated review of the That closes the follow-up surface for this PR. Functional verification (CI, ONNX wall-clock, SSE event flow) is in the comments above; the only remaining manual item is the UI screenshot/GIF, which I'll attach when I do the visual check. |
Summary
A 250-chunk file used to show
file.mdfrozen for 10+ seconds because_index_fileawaited the embedder's fullembed_textsbefore any SSE event escaped. This PR threads per-batch progress through every embedder, forwards it via a per-fileasyncio.Queueinindex_path_stream, and surfaces it in the Web Index tab asfile.md — 100/250 chunks.on_progress: Callable[[int, int], None] | None = NoneonEmbeddingProvider.embed_texts. OpenAI/Ollama fire after each batch in_safe_embed; ONNX switches from a singleto_thread(all_texts)to a per-batch loop usingEmbeddingConfig.batch_size. Noop accepts the kwarg for Protocol conformance and ignores it.EmbeddingConfig.progress_threshold: new field (default32, non-negative validator, runtime-mutable). Engine only forwardschunk_progressevents when a file's chunk count exceeds the threshold;0is an explicit "always emit" debug affordance._index_fileacceptson_chunk_progress.index_path_streamruns each file in a task with a per-fileasyncio.Queue+DONEsentinel — chunk events forward in real time while exceptions still surface viaawait task(preserves thecomplete.errorscontract from Streaming indexing endpoint missing namespace + errors parity #590). Consumer cancel (HTTPException, client disconnect,aclose()) cancels the in-flight task so embedding work doesn't leak past the SSE response.app.jsadds achunk_progressbranch with 100 ms DOM throttle and a final-tick bypass (otherwise the user sees(192/250)right before the next file's name pops in). New i18n keyindex.file_chunk_progressin en/ko.test_embedding_progress.py(per-backend contract — monotonic, ends at total, raise-swallow); appended totest_indexing_engine.py(event flow above threshold, strict ordering invariant, no-emit below threshold,threshold=0always emits,complete.errorsregression guard,aclose()cancels in-flight task).Out of scope (follow-ups)
_seed_with_progresschunk label —item_show_funcassumes path-tail + 40-char truncate, doesn't accept a free-form chunk count without UX redesign.mm index(non-stream CLI) progress — usesindex_path, separate stream-conversion follow-up.5/12 — 100/250.Test plan
uv run pytest -m "not ollama"(3616 passed locally)uv run ruff check packages/memtomem/src packages/memtomem/testsuv run ruff format --check packages/memtomem/src packages/memtomem/testsuv run mypy packages/memtomem/src/memtomem/embedding packages/memtomem/src/memtomem/indexing/engine.pyfile.md — N/M chunksticks in the UI; final tick(N/N)lands before file boundary.chunk_progressnoise./api/indexing/activereturns to 0, no leaked embedding work in server logs.time uv run mm index <250-chunk markdown>before/after — singleto_thread→ per-batch loop should add only thread-handoff overhead (µs vs 50–200 ms inference). Concerning if regression > 5 %.🤖 Generated with Claude Code