Skip to content

Commit 6be6fff

Browse files
midwestEjphein
authored andcommitted
perf: batch ChromaDB inserts in miner — 10-30x faster mining
- Extract _build_drawer() helper for DRY metadata construction - Add add_drawers() for batched upserts with sub-batching (CHROMA_BATCH_LIMIT=5000) - Hoist datetime.now() and os.path.getmtime() syscalls out of per-chunk loop - Simplify process_file() to delegate to add_drawers() - add_drawer() contract unchanged (used by MCP tools)
1 parent 5aad994 commit 6be6fff

1 file changed

Lines changed: 81 additions & 72 deletions

File tree

mempalace/miner.py

Lines changed: 81 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,12 @@
6969
CHUNK_SIZE = 800 # chars per drawer
7070
CHUNK_OVERLAP = 100 # overlap between chunks
7171
MIN_CHUNK_SIZE = 50 # skip tiny chunks
72-
DRAWER_UPSERT_BATCH_SIZE = 1000
72+
DRAWER_UPSERT_BATCH_SIZE = 1000 # canonical fork knob — used by add_drawers()
73+
# Alias for upstream PR #1085's name; same semantic (ChromaDB hard cap is
74+
# 5461). Fork keeps the more conservative 1000 default for embedding-pass
75+
# memory headroom; raise it toward 5000 if mining throughput is a
76+
# bottleneck.
77+
CHROMA_BATCH_LIMIT = DRAWER_UPSERT_BATCH_SIZE
7378
MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this.
7479
# Long Claude Code sessions and large transcript exports routinely exceed
7580
# 10 MB. The cap exists as a defensive rail against pathological binary
@@ -733,62 +738,96 @@ def _extract_entities_for_metadata(content: str) -> str:
733738
return ";".join(capped)
734739

735740

736-
def _build_drawer_metadata(
737-
wing: str,
738-
room: str,
739-
source_file: str,
740-
chunk_index: int,
741-
agent: str,
742-
content: str,
743-
source_mtime: Optional[float],
744-
) -> dict:
745-
"""Build the metadata dict for one drawer without upserting.
746-
747-
Split out from ``add_drawer`` so ``process_file`` can batch all chunks
748-
of a file into a single ``collection.upsert`` — one embedding forward
749-
pass per batch instead of per chunk.
741+
def _build_drawer(wing, room, source_file, chunk_index, agent, content, now=None, source_mtime=None):
742+
"""Build the ID, document, and metadata for a single drawer.
743+
744+
Shared by ``add_drawer`` (single insert) and ``add_drawers`` (batch insert)
745+
so metadata construction stays DRY. Hoists ``datetime.now()`` and
746+
``os.path.getmtime()`` so callers can amortize them across all chunks
747+
of a file.
750748
"""
749+
drawer_id = f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(chunk_index)).encode()).hexdigest()[:24]}"
751750
metadata = {
752751
"wing": wing,
753752
"room": room,
754753
"source_file": source_file,
755754
"chunk_index": chunk_index,
756755
"added_by": agent,
757-
"filed_at": datetime.now().isoformat(),
756+
"filed_at": now or datetime.now().isoformat(),
758757
"normalize_version": NORMALIZE_VERSION,
758+
"hall": detect_hall(content),
759759
}
760760
if source_mtime is not None:
761761
metadata["source_mtime"] = source_mtime
762-
metadata["hall"] = detect_hall(content)
762+
else:
763+
try:
764+
metadata["source_mtime"] = os.path.getmtime(source_file)
765+
except OSError:
766+
pass
763767
entities = _extract_entities_for_metadata(content)
764768
if entities:
765769
metadata["entities"] = entities
766-
return metadata
770+
return drawer_id, content, metadata
767771

768772

769773
def add_drawer(
770774
collection, wing: str, room: str, content: str, source_file: str, chunk_index: int, agent: str
771775
):
772-
"""Add one drawer to the palace.
776+
"""Add one drawer to the palace."""
777+
drawer_id, doc, metadata = _build_drawer(wing, room, source_file, chunk_index, agent, content)
778+
collection.upsert(
779+
documents=[doc],
780+
ids=[drawer_id],
781+
metadatas=[metadata],
782+
)
783+
return True
773784

774-
Kept for backward compatibility with external callers. In-tree the
775-
miner uses ``_build_drawer_metadata`` + a batched ``collection.upsert``
776-
to amortize the embedding model's forward-pass cost across chunks.
785+
786+
def add_drawers(collection, wing, room, chunks, source_file, agent):
787+
"""Batch-insert multiple drawers in one ChromaDB call per sub-batch.
788+
789+
Collects all chunks into batch lists and upserts them in groups of
790+
``DRAWER_UPSERT_BATCH_SIZE`` (alias of ``CHROMA_BATCH_LIMIT``, kept
791+
so existing fork tests that ``monkeypatch.setattr(miner,
792+
"DRAWER_UPSERT_BATCH_SIZE", N)`` still drive the sub-batch loop).
793+
Returns ``(drawers_added, batch_ids)``.
777794
"""
778-
drawer_id = f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(chunk_index)).encode()).hexdigest()[:24]}"
795+
now = datetime.now().isoformat()
779796
try:
780797
source_mtime = os.path.getmtime(source_file)
781798
except OSError:
782799
source_mtime = None
783-
metadata = _build_drawer_metadata(
784-
wing, room, source_file, chunk_index, agent, content, source_mtime
785-
)
786-
collection.upsert(
787-
documents=[content],
788-
ids=[drawer_id],
789-
metadatas=[metadata],
790-
)
791-
return True
800+
801+
batch_docs = []
802+
batch_ids = []
803+
batch_metas = []
804+
805+
for chunk in chunks:
806+
drawer_id, doc, metadata = _build_drawer(
807+
wing, room, source_file, chunk["chunk_index"], agent,
808+
chunk["content"], now=now, source_mtime=source_mtime,
809+
)
810+
batch_docs.append(doc)
811+
batch_ids.append(drawer_id)
812+
batch_metas.append(metadata)
813+
814+
if not batch_docs:
815+
return 0, []
816+
817+
# Sub-batch to stay under ChromaDB's max batch size (5461).
818+
# DRAWER_UPSERT_BATCH_SIZE is kept as the public knob (fork-only,
819+
# preserved across the upstream #1085 cherry-pick); CHROMA_BATCH_LIMIT
820+
# is upstream's name for the same constant.
821+
drawers_added = 0
822+
for i in range(0, len(batch_docs), DRAWER_UPSERT_BATCH_SIZE):
823+
collection.upsert(
824+
documents=batch_docs[i : i + DRAWER_UPSERT_BATCH_SIZE],
825+
ids=batch_ids[i : i + DRAWER_UPSERT_BATCH_SIZE],
826+
metadatas=batch_metas[i : i + DRAWER_UPSERT_BATCH_SIZE],
827+
)
828+
drawers_added += len(batch_docs[i : i + DRAWER_UPSERT_BATCH_SIZE])
829+
830+
return drawers_added, batch_ids
792831

793832

794833
# =============================================================================
@@ -847,51 +886,21 @@ def process_file(
847886
except Exception:
848887
pass
849888

850-
# Batch chunks into bounded upserts so the embedding model sees many
851-
# chunks per forward pass without building one huge Chroma/SQLite
852-
# request for pathological files. A bad chunk can fail its sub-batch;
853-
# that is the deliberate trade-off for amortizing embedding overhead.
854-
try:
855-
source_mtime = os.path.getmtime(source_file)
856-
except OSError:
857-
source_mtime = None
858-
859-
drawers_added = 0
860-
for batch_start in range(0, len(chunks), DRAWER_UPSERT_BATCH_SIZE):
861-
batch_docs: list = []
862-
batch_ids: list = []
863-
batch_metas: list = []
864-
for chunk in chunks[batch_start : batch_start + DRAWER_UPSERT_BATCH_SIZE]:
865-
drawer_id = f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}"
866-
batch_docs.append(chunk["content"])
867-
batch_ids.append(drawer_id)
868-
batch_metas.append(
869-
_build_drawer_metadata(
870-
wing,
871-
room,
872-
source_file,
873-
chunk["chunk_index"],
874-
agent,
875-
chunk["content"],
876-
source_mtime,
877-
)
878-
)
879-
collection.upsert(
880-
documents=batch_docs,
881-
ids=batch_ids,
882-
metadatas=batch_metas,
883-
)
884-
drawers_added += len(batch_docs)
889+
# Batch all chunks through a single add_drawers() call — sub-
890+
# batches at DRAWER_UPSERT_BATCH_SIZE so the embedding model sees
891+
# many chunks per forward pass without building one huge
892+
# Chroma/SQLite request for pathological files. A bad chunk fails
893+
# its sub-batch; that is the deliberate trade-off for amortizing
894+
# embedding overhead. (Upstream PR #1085, cherry-picked.)
895+
drawers_added, batch_ids = add_drawers(
896+
collection, wing, room, chunks, source_file, agent,
897+
)
885898

886899
# Build closet — the searchable index pointing to these drawers.
887900
# Purge first: a re-mine (mtime change or normalize_version bump) must
888901
# fully replace the prior closets, not append to them.
889902
if closets_col and drawers_added > 0:
890-
drawer_ids = [
891-
f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(c['chunk_index'])).encode()).hexdigest()[:24]}"
892-
for c in chunks
893-
]
894-
closet_lines = build_closet_lines(source_file, drawer_ids, content, wing, room)
903+
closet_lines = build_closet_lines(source_file, batch_ids, content, wing, room)
895904
closet_id_base = (
896905
f"closet_{wing}_{room}_{hashlib.sha256(source_file.encode()).hexdigest()[:24]}"
897906
)

0 commit comments

Comments
 (0)