Skip to content

Commit fe74a7d

Browse files
committed
feat(convo_miner): parallel file processing + batched upserts
Port PR MemPalace#416's parallelization pattern from miner.py to convo_miner.py. PR MemPalace#416 only batched project mining (mempalace mine <dir>), leaving conversation mining (mempalace mine <dir> --mode convos) on the slow per-chunk collection.add() path. Since convo mining is the primary ingest path for large corpora like ~/.claude/projects/ (~1.5 GB of Claude Code JSONL), this omission negated most of the speedup potential from both PR MemPalace#416 (batching) and PR MemPalace#442 (GPU embedding). Changes: 1. Extract process_convo_file_cpu() — pure CPU worker that normalizes, chunks, detects room, and builds drawer records. Thread-safe by construction: no ChromaDB calls, no shared state, all inputs passed explicitly. Returns (source_file, room, records, room_counts_delta) or None for skipped files. 2. Rewrite mine_convos() ingest loop: - Pre-filter pending files with file_already_mined() (sequential, matches PR MemPalace#416's pattern) - Submit all pending files to ThreadPoolExecutor with MAX_WORKERS = min(32, cpu_count()*2) for parallel normalize/chunk - Main thread accumulates records into batch_ids/docs/metas lists and flushes via collection.upsert() every BATCH_SIZE (128) records - try/finally around the executor guarantees final flush_batch() runs even if a worker raises, preventing silent loss of up to BATCH_SIZE-1 pending drawers - Per-worker exceptions are caught and logged instead of aborting the whole run (each file is independent) 3. Keep dry_run path sequential — matches miner.py, preserves original output formatting (per-file [DRY RUN] lines, room distribution), uses the same extracted worker for consistency. 4. Switch collection.add() -> collection.upsert() — idempotent, removes the try/except 'already exists' dance, matches miner.py. Performance expectations (M-series Mac with MEMPALACE_EMBEDDING_DEVICE=mps): Before (single-file sequential loop + per-chunk .add()): 502 drawers in 12.3s = 40.7 drawers/s After (parallel reads + batched upserts): Expected ~3-5x improvement from batching alone (GPU finally gets meaningful batch sizes), plus another ~2-3x from parallelizing the normalize() step on large JSONL files. Combined: ~5-15x. All 556 tests still pass, including tests/test_convo_miner.py which exercises the real ChromaDB write path end-to-end. No changes required to the public API. Callers (cli.py, mcp_server.py) are unaffected.
1 parent 66c2825 commit fe74a7d

1 file changed

Lines changed: 161 additions & 80 deletions

File tree

mempalace/convo_miner.py

Lines changed: 161 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from pathlib import Path
1515
from datetime import datetime
1616
from collections import defaultdict
17+
from concurrent.futures import ThreadPoolExecutor, as_completed
1718

1819
from .normalize import normalize
1920
from .palace import SKIP_DIRS, get_collection, file_already_mined
@@ -28,6 +29,8 @@
2829
}
2930

3031
MIN_CHUNK_SIZE = 30
32+
BATCH_SIZE = 128 # chunks per upsert call (matches miner.py)
33+
MAX_WORKERS = min(32, (os.cpu_count() or 4) * 2) # parallel file readers
3134
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB — skip files larger than this
3235

3336

@@ -229,6 +232,87 @@ def scan_convos(convo_dir: str) -> list:
229232
# =============================================================================
230233

231234

235+
def process_convo_file_cpu(
236+
filepath: Path,
237+
wing: str,
238+
agent: str,
239+
extract_mode: str,
240+
) -> tuple:
241+
"""
242+
Pure CPU worker: normalize, chunk, detect room, build drawer records.
243+
Thread-safe — no ChromaDB calls, no shared state.
244+
245+
Returns (source_file, room, records, room_counts_delta) or None if the file
246+
should be skipped (empty, too small, failed to normalize).
247+
248+
- source_file: absolute path string
249+
- room: detected room name (or None for extract_mode='general')
250+
- records: list of (drawer_id, content, metadata) tuples ready for upsert
251+
- room_counts_delta: dict of {room: count} contributed by this file
252+
(for general mode this holds per-chunk memory_type counts; for exchange
253+
mode it holds {room: 1})
254+
"""
255+
source_file = str(filepath)
256+
257+
# Normalize format (may raise — caller catches)
258+
try:
259+
content = normalize(source_file)
260+
except (OSError, ValueError):
261+
return None
262+
263+
if not content or len(content.strip()) < MIN_CHUNK_SIZE:
264+
return None
265+
266+
# Chunk — either exchange pairs or general extraction
267+
if extract_mode == "general":
268+
from .general_extractor import extract_memories
269+
270+
chunks = extract_memories(content)
271+
else:
272+
chunks = chunk_exchanges(content)
273+
274+
if not chunks:
275+
return None
276+
277+
# Detect room from content (general mode uses memory_type instead)
278+
if extract_mode != "general":
279+
room = detect_convo_room(content)
280+
else:
281+
room = None # set per-chunk below
282+
283+
now = datetime.now().isoformat()
284+
records = []
285+
room_counts_delta = defaultdict(int)
286+
287+
if extract_mode != "general":
288+
room_counts_delta[room] = 1
289+
290+
for chunk in chunks:
291+
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
292+
if extract_mode == "general":
293+
room_counts_delta[chunk_room] += 1
294+
295+
drawer_id = (
296+
f"drawer_{wing}_{chunk_room}_"
297+
+ hashlib.sha256(
298+
(source_file + str(chunk["chunk_index"])).encode()
299+
).hexdigest()[:24]
300+
)
301+
meta = {
302+
"wing": wing,
303+
"room": chunk_room,
304+
"source_file": source_file,
305+
"chunk_index": chunk["chunk_index"],
306+
"added_by": agent,
307+
"filed_at": now,
308+
"ingest_mode": "convos",
309+
"extract_mode": extract_mode,
310+
}
311+
records.append((drawer_id, chunk["content"], meta))
312+
313+
return source_file, room, records, dict(room_counts_delta)
314+
315+
232316
def mine_convos(
233317
convo_dir: str,
234318
palace_path: str,
@@ -270,93 +354,90 @@ def mine_convos(
270354
files_skipped = 0
271355
room_counts = defaultdict(int)
272356

273-
for i, filepath in enumerate(files, 1):
274-
source_file = str(filepath)
275-
276-
# Skip if already filed
277-
if not dry_run and file_already_mined(collection, source_file):
278-
files_skipped += 1
279-
continue
280-
281-
# Normalize format
282-
try:
283-
content = normalize(str(filepath))
284-
except (OSError, ValueError):
285-
continue
286-
287-
if not content or len(content.strip()) < MIN_CHUNK_SIZE:
288-
continue
289-
290-
# Chunk — either exchange pairs or general extraction
291-
if extract_mode == "general":
292-
from .general_extractor import extract_memories
293-
294-
chunks = extract_memories(content)
295-
# Each chunk already has memory_type; use it as the room name
296-
else:
297-
chunks = chunk_exchanges(content)
298-
299-
if not chunks:
300-
continue
301-
302-
# Detect room from content (general mode uses memory_type instead)
303-
if extract_mode != "general":
304-
room = detect_convo_room(content)
305-
else:
306-
room = None # set per-chunk below
307-
308-
if dry_run:
357+
# --------------------------------------------------------------
358+
# DRY RUN: sequential, no writes, preserves original output
359+
# --------------------------------------------------------------
360+
if dry_run:
361+
for i, filepath in enumerate(files, 1):
362+
result = process_convo_file_cpu(filepath, wing, agent, extract_mode)
363+
if result is None:
364+
continue
365+
_, room, records, room_counts_delta = result
309366
if extract_mode == "general":
310367
from collections import Counter
311368

312-
type_counts = Counter(c.get("memory_type", "general") for c in chunks)
369+
type_counts = Counter(meta["room"] for (_, _, meta) in records)
313370
types_str = ", ".join(f"{t}:{n}" for t, n in type_counts.most_common())
314-
print(f" [DRY RUN] {filepath.name}{len(chunks)} memories ({types_str})")
371+
print(f" [DRY RUN] {filepath.name}{len(records)} memories ({types_str})")
315372
else:
316-
print(f" [DRY RUN] {filepath.name} → room:{room} ({len(chunks)} drawers)")
317-
total_drawers += len(chunks)
318-
# Track room counts
319-
if extract_mode == "general":
320-
for c in chunks:
321-
room_counts[c.get("memory_type", "general")] += 1
322-
else:
323-
room_counts[room] += 1
324-
continue
325-
326-
if extract_mode != "general":
327-
room_counts[room] += 1
328-
329-
# File each chunk
330-
drawers_added = 0
331-
for chunk in chunks:
332-
chunk_room = chunk.get("memory_type", room) if extract_mode == "general" else room
333-
if extract_mode == "general":
334-
room_counts[chunk_room] += 1
335-
drawer_id = f"drawer_{wing}_{chunk_room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
336-
try:
337-
collection.add(
338-
documents=[chunk["content"]],
339-
ids=[drawer_id],
340-
metadatas=[
341-
{
342-
"wing": wing,
343-
"room": chunk_room,
344-
"source_file": source_file,
345-
"chunk_index": chunk["chunk_index"],
346-
"added_by": agent,
347-
"filed_at": datetime.now().isoformat(),
348-
"ingest_mode": "convos",
349-
"extract_mode": extract_mode,
350-
}
351-
],
373+
print(f" [DRY RUN] {filepath.name} → room:{room} ({len(records)} drawers)")
374+
total_drawers += len(records)
375+
for r, c in room_counts_delta.items():
376+
room_counts[r] += c
377+
378+
# --------------------------------------------------------------
379+
# REAL MINE: parallel file processing + batched upserts
380+
# --------------------------------------------------------------
381+
else:
382+
print(f" Checking {len(files)} files for changes...")
383+
pending = [fp for fp in files if not file_already_mined(collection, str(fp))]
384+
already_mined = len(files) - len(pending)
385+
386+
batch_ids, batch_docs, batch_metas = [], [], []
387+
completed = 0
388+
skipped_small = 0
389+
390+
def flush_batch():
391+
if batch_ids:
392+
collection.upsert(
393+
documents=batch_docs,
394+
ids=batch_ids,
395+
metadatas=batch_metas,
352396
)
353-
drawers_added += 1
354-
except Exception as e:
355-
if "already exists" not in str(e).lower():
356-
raise
397+
batch_ids.clear()
398+
batch_docs.clear()
399+
batch_metas.clear()
357400

358-
total_drawers += drawers_added
359-
print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers_added}")
401+
# try/finally guarantees flush_batch() runs even if a worker raises,
402+
# preventing silent loss of up to BATCH_SIZE-1 pending drawers.
403+
try:
404+
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
405+
futures = {
406+
executor.submit(
407+
process_convo_file_cpu, fp, wing, agent, extract_mode
408+
): fp
409+
for fp in pending
410+
}
411+
for future in as_completed(futures):
412+
filepath = futures[future]
413+
try:
414+
result = future.result()
415+
except Exception as e:
416+
print(f" ! [ERROR] {filepath.name}: {e}")
417+
completed += 1
418+
continue
419+
completed += 1
420+
if result is None:
421+
skipped_small += 1
422+
continue
423+
source_file, room, records, room_counts_delta = result
424+
for drawer_id, chunk_content, meta in records:
425+
batch_ids.append(drawer_id)
426+
batch_docs.append(chunk_content)
427+
batch_metas.append(meta)
428+
if len(batch_ids) >= BATCH_SIZE:
429+
flush_batch()
430+
total_drawers += len(records)
431+
for r, c in room_counts_delta.items():
432+
room_counts[r] += c
433+
print(
434+
f" ✓ [{completed:4}/{len(pending)}] "
435+
f"{Path(source_file).name[:50]:50} +{len(records)}"
436+
)
437+
finally:
438+
flush_batch()
439+
440+
files_skipped = already_mined + skipped_small
360441

361442
print(f"\n{'=' * 55}")
362443
print(" Done.")

0 commit comments

Comments
 (0)