feat: parallel file processing and batched upserts in mine()#416
feat: parallel file processing and batched upserts in mine()#416tbaldur wants to merge 8 commits intoMemPalace:developfrom
Conversation
b13d708 to
bbcc55c
Compare
PR #416 Review:
|
| Dimension | Score | Notes |
|---|---|---|
| Correctness | 4/10 | Duplicate imports/constants, data loss on exception |
| Security | 8/10 | No new attack surface; thread-safe reads |
| Performance | 7/10 | Good batching strategy; thread pool sizing is reasonable |
| Maintainability | 4/10 | process_file / process_file_cpu duplication |
Issues
1. CRITICAL — Duplicate import line (copy-paste error)
Confidence: HIGH
Location: mempalace/miner.py, diff +17-18
The same import is added twice:
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from concurrent.futures import ThreadPoolExecutor, as_completedPython silently ignores duplicate imports, but this signals sloppy copy-paste and violates the project's code quality bar.
Fix: Remove one of the two lines.
2. CRITICAL — Duplicate constant definitions (copy-paste error)
Confidence: HIGH
Location: mempalace/miner.py, diff +60-63
Both constants are defined twice:
+BATCH_SIZE = 128 # chunks per upsert call
+MAX_WORKERS = min(32, (os.cpu_count() or 4) * 2)
+BATCH_SIZE = 128 # chunks per upsert call
+MAX_WORKERS = min(32, (os.cpu_count() or 4) * 2)The second definitions silently overwrite the first. No functional harm today, but if someone later edits "the" definition and it's the wrong one, the change will be silently lost.
Fix: Remove the duplicate pair.
3. HIGH — Data loss on exception: unflushed batch is silently dropped
Confidence: HIGH
Location: mine() function — the for future in as_completed(futures) loop + final flush_batch()
future.result() is called without a try/except. If any file causes an unexpected exception in process_file_cpu, the exception propagates out of the loop, exits the with ThreadPoolExecutor block, and the final flush_batch() is never reached:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = { ... }
for future in as_completed(futures):
result = future.result() # <-- unhandled exception
# ...
if len(batch_ids) >= BATCH_SIZE:
flush_batch()
# ...
flush_batch() # <-- NEVER REACHED if exception aboveAll chunks accumulated in batch_ids / batch_docs / batch_metas from successfully processed files that haven't been flushed yet are permanently lost. On a large codebase with BATCH_SIZE=128, this could be up to 127 chunks worth of work silently discarded.
Fix:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = { ... }
for future in as_completed(futures):
try:
result = future.result()
except Exception as exc:
fp = futures[future]
print(f" ✗ {fp.name}: {exc}")
continue
# ... rest of loop ...
flush_batch() # now always reachedOr at minimum, move flush_batch() into a finally block.
4. MEDIUM — Logic duplication between process_file and process_file_cpu
Confidence: HIGH
Both functions implement the same sequence: read file → strip → MIN_CHUNK_SIZE check → detect_room → chunk_text → build metadata with hashlib drawer IDs. The only difference is process_file calls add_drawer (single ChromaDB upserts) while process_file_cpu returns records for batching.
process_file is still used for the dry_run path. If someone later changes room detection, metadata fields, or chunking logic, they must update both functions or the dry_run output will diverge from real behavior.
Fix: Refactor process_file to delegate to process_file_cpu:
def process_file(filepath, project_path, collection, wing, rooms, agent, dry_run):
if not dry_run and file_already_mined(collection, str(filepath), check_mtime=True):
return 0, None
result = process_file_cpu(filepath, project_path, wing, rooms, agent)
if result is None:
return 0, None
source_file, room, records = result
if dry_run:
print(f" [DRY RUN] {filepath.name} → room:{room} ({len(records)} drawers)")
return len(records), room
for drawer_id, content, meta in records:
collection.upsert(documents=[content], ids=[drawer_id], metadatas=[meta])
return len(records), room5. MEDIUM — Behavioral change in files_skipped count
Confidence: HIGH
| Original | New | |
|---|---|---|
files_skipped counts |
Any file with drawers == 0 (already-mined + empty/tiny) | Only file_already_mined matches |
The output message says "Files skipped (already filed)" so the new behavior is arguably more correct, but it's an undocumented change. Files that are too small (< MIN_CHUNK_SIZE) or produce no chunks will no longer appear in the skip count.
Suggestion: Document this behavioral change in the PR description, or add a separate counter for truly empty/tiny files.
6. LOW — No progress feedback during pre-filtering
Confidence: MEDIUM
pending = [fp for fp in files if not file_already_mined(collection, str(fp), check_mtime=True)]For large file lists (thousands of files), this fires a ChromaDB query per file with no output. Users may think the tool is hung.
Suggestion: Add a brief progress line before the list comprehension.
Thread Safety Analysis
| Component | Thread-safe? | Evidence |
|---|---|---|
process_file_cpu |
Yes | Pure: reads input args, returns new tuple, no shared state |
chunk_text |
Yes | Pure function: string slicing on immutable inputs |
detect_room |
Yes | Reads from rooms (shared but immutable), local defaultdict |
datetime.now() |
Yes | Thread-safe in CPython |
os.path.getmtime |
Yes | OS-level read-only call |
batch_ids/docs/metas |
N/A | Only touched from main thread |
collection.upsert |
N/A | Only called from main thread (flush_batch) |
The concurrency model (threads do CPU work, main thread does all ChromaDB I/O) is sound.
Flow Impact Analysis
process_file: Still called in thedry_runpath. No external callers found outsideminer.py.mine(): Public API — signature unchanged. Callers unaffected.file_already_mined: Now called in a sequential pre-filter rather than inside the per-file loop. No functional impact.- Blast radius: Low — changes are internal to the
mine()hot path.
Guidelines Compliance
| Guideline | Status |
|---|---|
Conventional commits (perf: ...) |
Pass |
| No new dependencies (ThreadPoolExecutor is stdlib) | Pass |
| Docstrings on new function | Pass |
| Tests updated | Missing — no test changes for the new parallel path |
| Ruff formatting (100-char lines) | Likely pass |
Summary
| Priority | Count | Items |
|---|---|---|
| Critical | 2 | Duplicate imports (#1), duplicate constants (#2) |
| High | 1 | Data loss on exception (#3) |
| Medium | 2 | Logic duplication (#4), behavioral change (#5) |
| Low | 1 | No pre-filter progress (#6) |
Split process_file() into a thread-safe process_file_cpu() that handles reading, chunking, and room detection without touching ChromaDB, and a batched upsert path in mine() that flushes to ChromaDB in groups of BATCH_SIZE chunks. Changes: - Add process_file_cpu(): reads/chunks/routes files, returns serialized records ready for upsert. No ChromaDB calls, safe for ThreadPoolExecutor. - Add BATCH_SIZE (128) and MAX_WORKERS (cpu_count * 2, capped at 32). - mine() now pre-filters already-mined files before the thread pool, dispatches process_file_cpu() via ThreadPoolExecutor, accumulates records into a batch, and flushes to ChromaDB every BATCH_SIZE chunks. - dry_run path unchanged (still uses original process_file()). The two speedups compound: threads overlap file I/O with chunking across files, and batched upserts let the ONNX embedder process a full matrix op per flush instead of one vector at a time. Tests: added test_process_file_cpu_returns_records, test_process_file_cpu_returns_none_for_empty_file, test_mine_parallel_produces_same_result_as_serial, test_mine_skips_already_mined_files_on_rerun.
bbcc55c to
56a2824
Compare
- Eliminate logic duplication: process_file now delegates to process_file_cpu; shared read/chunk/route logic lives in one place. - Fix data loss risk: wrap futures loop in try/finally so flush_batch() always runs even if a future raises. - Restore files_skipped accuracy: count both already-mined and too-small/unreadable files (previously only already-mined were counted). - Add pre-filter progress output so large repos don't appear to hang during the already-mined check.
|
Port PR MemPalace#416's parallelization pattern from miner.py to convo_miner.py. PR MemPalace#416 only batched project mining (mempalace mine <dir>), leaving conversation mining (mempalace mine <dir> --mode convos) on the slow per-chunk collection.add() path. Since convo mining is the primary ingest path for large corpora like ~/.claude/projects/ (~1.5 GB of Claude Code JSONL), this omission negated most of the speedup potential from both PR MemPalace#416 (batching) and PR MemPalace#442 (GPU embedding). Changes: 1. Extract process_convo_file_cpu() — pure CPU worker that normalizes, chunks, detects room, and builds drawer records. Thread-safe by construction: no ChromaDB calls, no shared state, all inputs passed explicitly. Returns (source_file, room, records, room_counts_delta) or None for skipped files. 2. Rewrite mine_convos() ingest loop: - Pre-filter pending files with file_already_mined() (sequential, matches PR MemPalace#416's pattern) - Submit all pending files to ThreadPoolExecutor with MAX_WORKERS = min(32, cpu_count()*2) for parallel normalize/chunk - Main thread accumulates records into batch_ids/docs/metas lists and flushes via collection.upsert() every BATCH_SIZE (128) records - try/finally around the executor guarantees final flush_batch() runs even if a worker raises, preventing silent loss of up to BATCH_SIZE-1 pending drawers - Per-worker exceptions are caught and logged instead of aborting the whole run (each file is independent) 3. Keep dry_run path sequential — matches miner.py, preserves original output formatting (per-file [DRY RUN] lines, room distribution), uses the same extracted worker for consistency. 4. Switch collection.add() -> collection.upsert() — idempotent, removes the try/except 'already exists' dance, matches miner.py. Performance expectations (M-series Mac with MEMPALACE_EMBEDDING_DEVICE=mps): Before (single-file sequential loop + per-chunk .add()): 502 drawers in 12.3s = 40.7 drawers/s After (parallel reads + batched upserts): Expected ~3-5x improvement from batching alone (GPU finally gets meaningful batch sizes), plus another ~2-3x from parallelizing the normalize() step on large JSONL files. Combined: ~5-15x. All 556 tests still pass, including tests/test_convo_miner.py which exercises the real ChromaDB write path end-to-end. No changes required to the public API. Callers (cli.py, mcp_server.py) are unaffected.
|
Great work on this PR, the parallelization pattern for miner.py is really clean and the batched upserts make a measurable difference. One thing I noticed while testing this branch on a large Claude Code conversation archive (1.5 GB of JSONL in /.claude/projects/): convo_miner.py still uses the sequential per-chunk collection.add() path, so users mining conversation corpora via mempalace mine --mode convos don't benefit from the new batching.I ported your pattern to convo_miner.py on my fork: FabioLissi/mempalace@fe74a7d (FabioLissi@fe74a7d). Same structure as your miner.py changes — process_convo_file_cpu worker function, ThreadPoolExecutor(max_workers=MAX_WORKERS) for parallel normalize/chunk, batched collection.upsert() with BATCH_SIZE=128, try/finally around the executor to guarantee final flush on exception. Benchmark on a 10-file, 502-drawer sample from my Claude Code history (M-series Mac, MPS + BGE-base): All 556 tests pass including tests/test_convo_miner.py::test_convo_mining which exercises the real ChromaDB write path end-to-end. Totally happy to: Whichever works best for you. Thanks again for the contribution! |
web3guru888
left a comment
There was a problem hiding this comment.
Review: Parallel File Processing + Batched Upserts
Performance-focused PR. The two optimizations (threaded I/O + batched upserts) are independent and compound well. Let me dig into the thread safety and correctness.
Thread Safety Analysis
process_file_cpu() — Thread-safe? ✅
filepath.read_text()— file reads are inherently thread-safe (OS handles concurrent reads)detect_room()— appears to be a pure function (reads file content and matches patterns)chunk_text()— pure function (splits text into chunks)hashlib.sha256()— thread-safe in Python (each call creates its own hasher)datetime.now().isoformat()— thread-safeos.path.getmtime()— thread-safe (stat call)
No shared mutable state. Good separation. The key design decision — extracting CPU work from ChromaDB access — is correct.
Batched Upserts
BATCH_SIZE = 128 — Reasonable. ChromaDB's ONNX backend computes embeddings as a batch matrix operation, so 128 chunks in one upsert() call is significantly faster than 128 individual calls. For reference, we use batch sizes of 100 in our integration with similar performance gains.
flush_batch() as inner function with closure over mutable lists:
batch_ids, batch_docs, batch_metas = [], [], []
def flush_batch():
if batch_ids:
collection.upsert(documents=batch_docs, ids=batch_ids, metadatas=batch_metas)
batch_ids.clear()
batch_docs.clear()
batch_metas.clear()This works because list.clear() mutates in-place. But it's a subtle coupling — if someone later refactors to batch_ids = [] inside flush_batch(), it would silently break (rebinding the local variable without affecting the closure). Consider passing the lists as arguments, or using a small dataclass.
✅ try/finally on flush — Good. If a future raises an exception, the accumulated batch still gets flushed:
try:
with ThreadPoolExecutor(...) as executor:
...
finally:
flush_batch()This prevents data loss on errors.
Pre-filtering Optimization
pending = [fp for fp in files if not file_already_mined(collection, str(fp), check_mtime=True)]This pre-filters before the thread pool. Smart — it avoids sending already-mined files to worker threads and prevents ChromaDB reads from concurrent threads (which could cause contention on the ChromaDB lock).
file_already_mined() before any parallel work starts. For a large project with mostly-mined files (incremental re-mine scenario), this sequential pre-scan could dominate the wall time. Consider batching the file_already_mined checks too, or using the content hash approach from #424 as a faster pre-check.
MAX_WORKERS
MAX_WORKERS = min(32, (os.cpu_count() or 4) * 2)Standard formula from Python docs for I/O-bound thread pools. Since process_file_cpu is mostly file I/O + CPU-light text processing, this is appropriate. The min(32, ...) cap prevents oversubscription.
Dry-run path
Good that dry_run still uses the original serial process_file() path. The dry-run path doesn't need performance and keeping it simple avoids introducing bugs in the diagnostic path.
Test coverage
test_process_file_cpu_returns_records — Good unit test for the new extracted function.
test_mine_parallel_produces_same_result_as_serial — The critical correctness test. Verifies that parallel mining indexes the same files. However, it doesn't compare exact drawer content between serial and parallel — just that files are present and count > 0. Consider asserting that the drawer count matches between serial and parallel runs on identical input.
test_mine_skips_already_mined_files_on_rerun — Incremental behavior preservation test. Important.
Overall: well-engineered performance improvement with correct thread safety boundaries. The flush_batch closure pattern and the sequential pre-scan are the main areas for potential refinement.
🔭 Reviewed as part of the MemPalace-AGI integration project — autonomous research with perfect memory. Community interaction updates are posted regularly on the dashboard.
|
Thank you for the analysis, changes made:
|
Problem
mine()processes files sequentially and callscollection.upsert()once per chunk. On large codebases this is slow — file I/O stalls the CPU, and the ONNX embedder runs one vector at a time instead of batching.Solution
Two changes that compound:
1. Thread pool for file reading + chunking
Extracted the ChromaDB-free part of
process_file()into a newprocess_file_cpu()function (read → chunk → detect room → build metadata records). This has no shared state and is safe to run concurrently.mine()now dispatches these viaThreadPoolExecutor, overlapping file I/O across files while the main thread handles upserts.2. Batched upserts
Instead of
collection.upsert([1 chunk])N times, records from completed futures are accumulated and flushed everyBATCH_SIZE(128) chunks. ChromaDB's ONNX backend processes a batch as a single matrix operation, which is significantly faster than N sequential single-vector calls.Constants added
Compatibility
dry_runpath is unchanged (still uses originalprocess_file()).