feat(mine): queue requests during repair-rebuild + drain after#4
Merged
Conversation
Mirrors the existing /silent-save queue pattern for /mine. Closes a
gap JP noticed: when the daemon is in repair-mode rebuild, hook fires
that POST /mine fail outright (the rebuild replaces the collection
mid-flight; running a concurrent mine subprocess would race the
swap). The /silent-save queue covered diary writes; /mine had no
equivalent, so transcript-ingest requests during a rebuild window
were lost.
Adds three pieces, all parallel to the silent-save infrastructure:
* `_pending_mines_path()` — separate jsonl queue file (next to the
silent-save pending file).
* `_enqueue_pending_mine(payload)` — appends a /mine request body to
the queue, off-loop via asyncio.to_thread.
* `_drain_pending_mines()` — replays queued mines after rebuild via
the same subprocess pattern the live /mine endpoint uses, gated by
`_mine_sem`. Same rename-then-read pattern as
`_drain_pending_writes` so concurrent /mine POSTs landing during
the drain go to a fresh queue file. Dedup by (dir, wing, mode)
before replay — a storm of hook fires queues the same target many
times, but one mine catches up all of them via convo_miner's
mtime-based dedup, so we don't need to run it N times.
The /mine endpoint checks `_repair_state` and queues if in rebuild
mode, returning `{"queued": true, "reason": "repair-in-progress"}`
to signal the caller. After the rebuild completes,
`_drain_pending_mines()` runs alongside `_drain_pending_writes()`.
Also extends `/repair/status` to surface `pending_mines` count
alongside `pending_writes`.
Tests: 5 new unittest cases — pending-mines path is distinct from
writes, enqueue→drain replays each target, drain dedups repeated
targets (hook-storm scenario), failed replays quarantine to
.failed-* files instead of getting lost, empty queue returns 0.
NOT a fix for the HNSW corruption that prompted the current incident
— that came from concurrent update_drawer calls hitting chromadb's
HNSW concurrency hazards (CLAUDE.md row 15). The corruption-side fix
is `PALACE_MAX_WRITE_CONCURRENCY=1` in the daemon env. This PR
covers a separate failure mode: hook writes during repair windows.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
There was a problem hiding this comment.
Pull request overview
This PR adds a “pending mines” queue to prevent /mine requests from failing (or racing a collection swap) while /repair mode=rebuild is in progress, and replays the queued mines after rebuild completes—mirroring the existing pending silent-save write queue behavior.
Changes:
- Add
_pending_mines_path(),_enqueue_pending_mine(), and_drain_pending_mines()to queue and replay/minerequests across rebuild windows (including dedup + quarantine-on-failure behavior). - Update
/mineto enqueue requests during rebuild instead of spawning the miner subprocess immediately, and update/repairto drain pending mines after rebuild. - Extend
/repair/statusto surface pending mines queue depth; add unit tests covering enqueue/drain, dedup, quarantine, and empty-queue behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
main.py |
Implements pending-mines queueing/draining, /mine queuing during rebuild, /repair draining, and /repair/status queue depth reporting. |
tests/test_mine_queue.py |
Adds unit tests for the new pending-mines queue behavior (enqueue/drain, dedup, quarantine, empty queue). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| wing = payload.get("wing", "general") | ||
| mode = payload.get("mode", "convos") | ||
| mempalace_bin = os.path.join(os.path.dirname(sys.executable), "mempalace") | ||
| cmd = [mempalace_bin, "mine", directory, "--mode", mode, "--wing", wing] |
Comment on lines
+391
to
+394
| directory = _translate_client_path(payload["dir"]) | ||
| if not Path(directory).is_dir(): | ||
| _log.warning("drain-mine: skipping %s — not a directory", directory) | ||
| continue |
Comment on lines
+68
to
+86
| async def test_enqueue_then_drain_replays_each_target(self): | ||
| await main._enqueue_pending_mine({"dir": "/a", "wing": "wa", "mode": "convos"}) | ||
| await main._enqueue_pending_mine({"dir": "/b", "wing": "wb", "mode": "convos"}) | ||
| self.assertTrue(os.path.isfile(self._queue_path)) | ||
|
|
||
| # Stub the subprocess: each call returns rc=0 | ||
| async def _fake_subprocess(*args, **kwargs): | ||
| proc = MagicMock() | ||
| proc.communicate = AsyncMock(return_value=(b"", b"")) | ||
| proc.returncode = 0 | ||
| return proc | ||
|
|
||
| with patch("asyncio.create_subprocess_exec", side_effect=_fake_subprocess) as spawn: | ||
| count = await main._drain_pending_mines() | ||
| self.assertEqual(count, 2) | ||
| # Queue file is gone after a clean drain | ||
| self.assertFalse(os.path.isfile(self._queue_path)) | ||
| # Subprocess was invoked twice (once per unique target) | ||
| self.assertEqual(spawn.call_count, 2) |
jphein
added a commit
that referenced
this pull request
May 6, 2026
Seven findings, batched into one cleanup PR. PR #3 (file-watcher) findings: * watcher.py:30 — drop unused ``import time``. * watcher.py:336 — type ``_log_future_exception`` parameter as ``concurrent.futures.Future`` (NOT ``asyncio.Future``); that's what ``asyncio.run_coroutine_threadsafe`` returns. Catch its cancellation/state errors plus the asyncio variants so the callback can't itself crash on a concurrent cancellation. * main.py:560 — surface stderr/stdout when watcher mine returns non-zero. The rc alone hides 'No mempalace.yaml found' / python tracebacks operators need to diagnose. * main.py:1202 — ``GET /watch`` now double-checks ``watcher.is_running`` so a thread crash that flips is_running to False between startup and now is reflected in the endpoint response. Lifespan still gates publication; this adds defense in depth. PR #4 (mine-queue) findings: * main.py drain replay — re-apply optional ``extract`` and ``limit`` fields. The prior drain dropped them silently, so a queue entry with those fields got replayed without. Live /mine accepts them; replay must too. * main.py drain replay — enforce the same safety guards as the live /mine endpoint: ``isinstance(dir, str)``, absolute-path, no ``..`` traversal, ``mode in {convos, projects}``, ``extract in {exchange, general}`` if set, ``limit`` int-coercible if set. Without these a queue entry could smuggle through values the live endpoint would 400. Tests: +2 — replay-extract-and-limit (asserts the new fields make it onto the subprocess argv) and skips-invalid-payload-fields (5 invalid-payload variants, each correctly skipped pre-spawn). 35 daemon tests pass. Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes a gap noticed during today's incident: when the daemon is in repair-mode rebuild, hook fires that POST /mine fail outright. The rebuild replaces the collection mid-flight; running a concurrent mine subprocess would race the swap. /silent-save covered diary writes; /mine had no equivalent.
Mirrors the existing _pending_writes infrastructure exactly:
_pending_mines_path()— separate jsonl queue file_enqueue_pending_mine(payload)— off-loop append_drain_pending_mines()— replay through the same subprocess pattern the live /mine uses, gated by _mine_sem, dedup by (dir, wing, mode) before replay (one mine covers a storm of hook fires via convo_miner's mtime-dedup)5 new unittest cases — path distinctness, enqueue-drain replay, dedup, .failed-* quarantine, empty queue returns 0.
Not a fix for today's HNSW corruption — that came from concurrent update_drawer calls hitting chromadb HNSW concurrency hazards (CLAUDE.md row 15). The corruption-side mitigation is
PALACE_MAX_WRITE_CONCURRENCY=1in env. This PR covers a separate failure mode: hook writes during repair windows.🤖 Generated with Claude Code