Skip to content

feat: parallel file processing and batched upserts in mine()#416

Open
tbaldur wants to merge 8 commits intoMemPalace:developfrom
tbaldur:feat/parallel-mine
Open

feat: parallel file processing and batched upserts in mine()#416
tbaldur wants to merge 8 commits intoMemPalace:developfrom
tbaldur:feat/parallel-mine

Conversation

@tbaldur
Copy link
Copy Markdown

@tbaldur tbaldur commented Apr 9, 2026

Problem

mine() processes files sequentially and calls collection.upsert() once per chunk. On large codebases this is slow — file I/O stalls the CPU, and the ONNX embedder runs one vector at a time instead of batching.

Solution

Two changes that compound:

1. Thread pool for file reading + chunking

Extracted the ChromaDB-free part of process_file() into a new process_file_cpu() function (read → chunk → detect room → build metadata records). This has no shared state and is safe to run concurrently. mine() now dispatches these via ThreadPoolExecutor, overlapping file I/O across files while the main thread handles upserts.

2. Batched upserts

Instead of collection.upsert([1 chunk]) N times, records from completed futures are accumulated and flushed every BATCH_SIZE (128) chunks. ChromaDB's ONNX backend processes a batch as a single matrix operation, which is significantly faster than N sequential single-vector calls.

Constants added

BATCH_SIZE = 128          # chunks per upsert call
MAX_WORKERS = min(32, (os.cpu_count() or 4) * 2)

Compatibility

  • dry_run path is unchanged (still uses original process_file()).
  • Already-mined files are pre-filtered before the thread pool to avoid redundant ChromaDB reads from worker threads.
  • No new dependencies.

@tbaldur tbaldur force-pushed the feat/parallel-mine branch from b13d708 to bbcc55c Compare April 9, 2026 18:28
@bgauryy
Copy link
Copy Markdown

bgauryy commented Apr 9, 2026

PR #416 Review: perf: parallel file processing and batched upserts in mine()

URL: #416
Author: tbaldur | Status: Open | Branch: feat/parallel-minemain
Scope: 1 file (mempalace/miner.py) | +106 / -17


Executive Summary

Goal: Speed up mine() by parallelizing file I/O + chunking and batching ChromaDB upserts.
Risk: HIGH — concurrency in the core data ingestion pipeline.
Recommendation: REQUEST CHANGES — Contains copy-paste duplicates, a data-loss path on exceptions, and logic duplication that creates maintenance risk.

Ratings

Dimension Score Notes
Correctness 4/10 Duplicate imports/constants, data loss on exception
Security 8/10 No new attack surface; thread-safe reads
Performance 7/10 Good batching strategy; thread pool sizing is reasonable
Maintainability 4/10 process_file / process_file_cpu duplication

Issues

1. CRITICAL — Duplicate import line (copy-paste error)

Confidence: HIGH
Location: mempalace/miner.py, diff +17-18

The same import is added twice:

+from concurrent.futures import ThreadPoolExecutor, as_completed
+from concurrent.futures import ThreadPoolExecutor, as_completed

Python silently ignores duplicate imports, but this signals sloppy copy-paste and violates the project's code quality bar.

Fix: Remove one of the two lines.


2. CRITICAL — Duplicate constant definitions (copy-paste error)

Confidence: HIGH
Location: mempalace/miner.py, diff +60-63

Both constants are defined twice:

+BATCH_SIZE = 128  # chunks per upsert call
+MAX_WORKERS = min(32, (os.cpu_count() or 4) * 2)
+BATCH_SIZE = 128  # chunks per upsert call
+MAX_WORKERS = min(32, (os.cpu_count() or 4) * 2)

The second definitions silently overwrite the first. No functional harm today, but if someone later edits "the" definition and it's the wrong one, the change will be silently lost.

Fix: Remove the duplicate pair.


3. HIGH — Data loss on exception: unflushed batch is silently dropped

Confidence: HIGH
Location: mine() function — the for future in as_completed(futures) loop + final flush_batch()

future.result() is called without a try/except. If any file causes an unexpected exception in process_file_cpu, the exception propagates out of the loop, exits the with ThreadPoolExecutor block, and the final flush_batch() is never reached:

        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            futures = { ... }
            for future in as_completed(futures):
                result = future.result()       # <-- unhandled exception
                # ...
                    if len(batch_ids) >= BATCH_SIZE:
                        flush_batch()
                # ...

        flush_batch()  # <-- NEVER REACHED if exception above

All chunks accumulated in batch_ids / batch_docs / batch_metas from successfully processed files that haven't been flushed yet are permanently lost. On a large codebase with BATCH_SIZE=128, this could be up to 127 chunks worth of work silently discarded.

Fix:

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = { ... }
    for future in as_completed(futures):
        try:
            result = future.result()
        except Exception as exc:
            fp = futures[future]
            print(f"  ✗ {fp.name}: {exc}")
            continue
        # ... rest of loop ...

flush_batch()  # now always reached

Or at minimum, move flush_batch() into a finally block.


4. MEDIUM — Logic duplication between process_file and process_file_cpu

Confidence: HIGH

Both functions implement the same sequence: read file → strip → MIN_CHUNK_SIZE check → detect_room → chunk_text → build metadata with hashlib drawer IDs. The only difference is process_file calls add_drawer (single ChromaDB upserts) while process_file_cpu returns records for batching.

process_file is still used for the dry_run path. If someone later changes room detection, metadata fields, or chunking logic, they must update both functions or the dry_run output will diverge from real behavior.

Fix: Refactor process_file to delegate to process_file_cpu:

def process_file(filepath, project_path, collection, wing, rooms, agent, dry_run):
    if not dry_run and file_already_mined(collection, str(filepath), check_mtime=True):
        return 0, None

    result = process_file_cpu(filepath, project_path, wing, rooms, agent)
    if result is None:
        return 0, None

    source_file, room, records = result
    if dry_run:
        print(f"    [DRY RUN] {filepath.name} → room:{room} ({len(records)} drawers)")
        return len(records), room

    for drawer_id, content, meta in records:
        collection.upsert(documents=[content], ids=[drawer_id], metadatas=[meta])
    return len(records), room

5. MEDIUM — Behavioral change in files_skipped count

Confidence: HIGH

Original New
files_skipped counts Any file with drawers == 0 (already-mined + empty/tiny) Only file_already_mined matches

The output message says "Files skipped (already filed)" so the new behavior is arguably more correct, but it's an undocumented change. Files that are too small (< MIN_CHUNK_SIZE) or produce no chunks will no longer appear in the skip count.

Suggestion: Document this behavioral change in the PR description, or add a separate counter for truly empty/tiny files.


6. LOW — No progress feedback during pre-filtering

Confidence: MEDIUM

pending = [fp for fp in files if not file_already_mined(collection, str(fp), check_mtime=True)]

For large file lists (thousands of files), this fires a ChromaDB query per file with no output. Users may think the tool is hung.

Suggestion: Add a brief progress line before the list comprehension.


Thread Safety Analysis

Component Thread-safe? Evidence
process_file_cpu Yes Pure: reads input args, returns new tuple, no shared state
chunk_text Yes Pure function: string slicing on immutable inputs
detect_room Yes Reads from rooms (shared but immutable), local defaultdict
datetime.now() Yes Thread-safe in CPython
os.path.getmtime Yes OS-level read-only call
batch_ids/docs/metas N/A Only touched from main thread
collection.upsert N/A Only called from main thread (flush_batch)

The concurrency model (threads do CPU work, main thread does all ChromaDB I/O) is sound.


Flow Impact Analysis

  • process_file: Still called in the dry_run path. No external callers found outside miner.py.
  • mine(): Public API — signature unchanged. Callers unaffected.
  • file_already_mined: Now called in a sequential pre-filter rather than inside the per-file loop. No functional impact.
  • Blast radius: Low — changes are internal to the mine() hot path.

Guidelines Compliance

Guideline Status
Conventional commits (perf: ...) Pass
No new dependencies (ThreadPoolExecutor is stdlib) Pass
Docstrings on new function Pass
Tests updated Missing — no test changes for the new parallel path
Ruff formatting (100-char lines) Likely pass

Summary

Priority Count Items
Critical 2 Duplicate imports (#1), duplicate constants (#2)
High 1 Data loss on exception (#3)
Medium 2 Logic duplication (#4), behavioral change (#5)
Low 1 No pre-filter progress (#6)

Split process_file() into a thread-safe process_file_cpu() that handles
reading, chunking, and room detection without touching ChromaDB, and a
batched upsert path in mine() that flushes to ChromaDB in groups of
BATCH_SIZE chunks.

Changes:
- Add process_file_cpu(): reads/chunks/routes files, returns serialized
  records ready for upsert. No ChromaDB calls, safe for ThreadPoolExecutor.
- Add BATCH_SIZE (128) and MAX_WORKERS (cpu_count * 2, capped at 32).
- mine() now pre-filters already-mined files before the thread pool,
  dispatches process_file_cpu() via ThreadPoolExecutor, accumulates
  records into a batch, and flushes to ChromaDB every BATCH_SIZE chunks.
- dry_run path unchanged (still uses original process_file()).

The two speedups compound: threads overlap file I/O with chunking across
files, and batched upserts let the ONNX embedder process a full matrix
op per flush instead of one vector at a time.

Tests: added test_process_file_cpu_returns_records,
test_process_file_cpu_returns_none_for_empty_file,
test_mine_parallel_produces_same_result_as_serial,
test_mine_skips_already_mined_files_on_rerun.
@tbaldur tbaldur force-pushed the feat/parallel-mine branch from bbcc55c to 56a2824 Compare April 9, 2026 18:32
- Eliminate logic duplication: process_file now delegates to
  process_file_cpu; shared read/chunk/route logic lives in one place.
- Fix data loss risk: wrap futures loop in try/finally so flush_batch()
  always runs even if a future raises.
- Restore files_skipped accuracy: count both already-mined and
  too-small/unreadable files (previously only already-mined were counted).
- Add pre-filter progress output so large repos don't appear to hang
  during the already-mined check.
@tbaldur tbaldur changed the title perf: parallel file processing and batched upserts in mine() feat: parallel file processing and batched upserts in mine() Apr 9, 2026
@tbaldur
Copy link
Copy Markdown
Author

tbaldur commented Apr 9, 2026

  • Duplicate import/constants — fixed in this PR, won't appear in future changes
  • Data loss on exception — wrapped the futures loop in try/finally so flush_batch() always runs
  • Logic duplicationprocess_file now delegates to process_file_cpu; shared logic lives in one place
  • files_skipped accuracy — now counts both already-mined and too-small/unreadable files
  • Pre-filter progress — added "Checking N files for changes..." print before the mtime check loop

FabioLissi added a commit to FabioLissi/mempalace that referenced this pull request Apr 9, 2026
Port PR MemPalace#416's parallelization pattern from miner.py to convo_miner.py.
PR MemPalace#416 only batched project mining (mempalace mine <dir>), leaving
conversation mining (mempalace mine <dir> --mode convos) on the slow
per-chunk collection.add() path. Since convo mining is the primary
ingest path for large corpora like ~/.claude/projects/ (~1.5 GB of
Claude Code JSONL), this omission negated most of the speedup potential
from both PR MemPalace#416 (batching) and PR MemPalace#442 (GPU embedding).

Changes:

1. Extract process_convo_file_cpu() — pure CPU worker that normalizes,
   chunks, detects room, and builds drawer records. Thread-safe by
   construction: no ChromaDB calls, no shared state, all inputs
   passed explicitly. Returns (source_file, room, records, room_counts_delta)
   or None for skipped files.

2. Rewrite mine_convos() ingest loop:
   - Pre-filter pending files with file_already_mined() (sequential,
     matches PR MemPalace#416's pattern)
   - Submit all pending files to ThreadPoolExecutor with
     MAX_WORKERS = min(32, cpu_count()*2) for parallel normalize/chunk
   - Main thread accumulates records into batch_ids/docs/metas lists
     and flushes via collection.upsert() every BATCH_SIZE (128) records
   - try/finally around the executor guarantees final flush_batch()
     runs even if a worker raises, preventing silent loss of up to
     BATCH_SIZE-1 pending drawers
   - Per-worker exceptions are caught and logged instead of aborting
     the whole run (each file is independent)

3. Keep dry_run path sequential — matches miner.py, preserves original
   output formatting (per-file [DRY RUN] lines, room distribution),
   uses the same extracted worker for consistency.

4. Switch collection.add() -> collection.upsert() — idempotent, removes
   the try/except 'already exists' dance, matches miner.py.

Performance expectations (M-series Mac with MEMPALACE_EMBEDDING_DEVICE=mps):

Before (single-file sequential loop + per-chunk .add()):
  502 drawers in 12.3s = 40.7 drawers/s

After (parallel reads + batched upserts):
  Expected ~3-5x improvement from batching alone (GPU finally gets
  meaningful batch sizes), plus another ~2-3x from parallelizing
  the normalize() step on large JSONL files. Combined: ~5-15x.

All 556 tests still pass, including tests/test_convo_miner.py which
exercises the real ChromaDB write path end-to-end.

No changes required to the public API. Callers (cli.py, mcp_server.py)
are unaffected.
@FabioLissi
Copy link
Copy Markdown

Great work on this PR, the parallelization pattern for miner.py is really clean and the batched upserts make a measurable difference.

One thing I noticed while testing this branch on a large Claude Code conversation archive (1.5 GB of JSONL in /.claude/projects/): convo_miner.py still uses the sequential per-chunk collection.add() path, so users mining conversation corpora via mempalace mine

--mode convos don't benefit from the new batching.

I ported your pattern to convo_miner.py on my fork: FabioLissi/mempalace@fe74a7d (FabioLissi@fe74a7d). Same structure as your miner.py changes — process_convo_file_cpu worker function, ThreadPoolExecutor(max_workers=MAX_WORKERS) for parallel normalize/chunk, batched collection.upsert() with BATCH_SIZE=128, try/finally around the executor to guarantee final flush on exception.

Benchmark on a 10-file, 502-drawer sample from my Claude Code history (M-series Mac, MPS + BGE-base):
• Before any batching: 23.35s
• With your miner.py batching only: unchanged (convo path not covered)
• With the convo_miner.py port: 5.69s (4.1x speedup)

All 556 tests pass including tests/test_convo_miner.py::test_convo_mining which exercises the real ChromaDB write path end-to-end.

Totally happy to:
• Let you cherry-pick the commit onto this PR branch
• Open it as a separate follow-up PR after this one merges
• Or skip it entirely if you'd rather keep this PR scoped to miner.py

Whichever works best for you. Thanks again for the contribution!

Copy link
Copy Markdown

@web3guru888 web3guru888 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: Parallel File Processing + Batched Upserts

Performance-focused PR. The two optimizations (threaded I/O + batched upserts) are independent and compound well. Let me dig into the thread safety and correctness.

Thread Safety Analysis

process_file_cpu() — Thread-safe? ✅

  • filepath.read_text() — file reads are inherently thread-safe (OS handles concurrent reads)
  • detect_room() — appears to be a pure function (reads file content and matches patterns)
  • chunk_text() — pure function (splits text into chunks)
  • hashlib.sha256() — thread-safe in Python (each call creates its own hasher)
  • datetime.now().isoformat() — thread-safe
  • os.path.getmtime() — thread-safe (stat call)

No shared mutable state. Good separation. The key design decision — extracting CPU work from ChromaDB access — is correct.

Batched Upserts

BATCH_SIZE = 128 — Reasonable. ChromaDB's ONNX backend computes embeddings as a batch matrix operation, so 128 chunks in one upsert() call is significantly faster than 128 individual calls. For reference, we use batch sizes of 100 in our integration with similar performance gains.

⚠️ flush_batch() as inner function with closure over mutable lists:

batch_ids, batch_docs, batch_metas = [], [], []
def flush_batch():
    if batch_ids:
        collection.upsert(documents=batch_docs, ids=batch_ids, metadatas=batch_metas)
        batch_ids.clear()
        batch_docs.clear()
        batch_metas.clear()

This works because list.clear() mutates in-place. But it's a subtle coupling — if someone later refactors to batch_ids = [] inside flush_batch(), it would silently break (rebinding the local variable without affecting the closure). Consider passing the lists as arguments, or using a small dataclass.

try/finally on flush — Good. If a future raises an exception, the accumulated batch still gets flushed:

try:
    with ThreadPoolExecutor(...) as executor:
        ...
finally:
    flush_batch()

This prevents data loss on errors.

Pre-filtering Optimization

pending = [fp for fp in files if not file_already_mined(collection, str(fp), check_mtime=True)]

This pre-filters before the thread pool. Smart — it avoids sending already-mined files to worker threads and prevents ChromaDB reads from concurrent threads (which could cause contention on the ChromaDB lock).

⚠️ However: This scans all files sequentially with file_already_mined() before any parallel work starts. For a large project with mostly-mined files (incremental re-mine scenario), this sequential pre-scan could dominate the wall time. Consider batching the file_already_mined checks too, or using the content hash approach from #424 as a faster pre-check.

MAX_WORKERS

MAX_WORKERS = min(32, (os.cpu_count() or 4) * 2)

Standard formula from Python docs for I/O-bound thread pools. Since process_file_cpu is mostly file I/O + CPU-light text processing, this is appropriate. The min(32, ...) cap prevents oversubscription.

Dry-run path

Good that dry_run still uses the original serial process_file() path. The dry-run path doesn't need performance and keeping it simple avoids introducing bugs in the diagnostic path.

Test coverage

test_process_file_cpu_returns_records — Good unit test for the new extracted function.
test_mine_parallel_produces_same_result_as_serial — The critical correctness test. Verifies that parallel mining indexes the same files. However, it doesn't compare exact drawer content between serial and parallel — just that files are present and count > 0. Consider asserting that the drawer count matches between serial and parallel runs on identical input.
test_mine_skips_already_mined_files_on_rerun — Incremental behavior preservation test. Important.

Overall: well-engineered performance improvement with correct thread safety boundaries. The flush_batch closure pattern and the sequential pre-scan are the main areas for potential refinement.

🔭 Reviewed as part of the MemPalace-AGI integration project — autonomous research with perfect memory. Community interaction updates are posted regularly on the dashboard.

@tbaldur
Copy link
Copy Markdown
Author

tbaldur commented Apr 10, 2026

Thank you for the analysis, changes made:

  • flush_batch now takes explicit (ids, docs, metas) args

  • Added idempotency assertion to the parallel test (re-mine same data, verify count unchanged)

  • Pre-scan optimization: replaced the sequential file_already_mined() loop with build_mined_cache(), one bulk ChromaDB fetch + dict lookups. Greatly improved on my 32k+ files project.

  • FabioLissi herry-picked your convo_miner.py port onto this branch — you're credited as author on the commit. Applied the same flush_batch hardening (explicit args) for consistency with the miner.py changes. Thank you very much for the contribution and the benchmark numbers!

@bensig bensig changed the base branch from main to develop April 11, 2026 22:22
@igorls igorls added area/mining File and conversation mining enhancement New feature or request labels Apr 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/mining File and conversation mining enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants