|
69 | 69 | CHUNK_SIZE = 800 # chars per drawer |
70 | 70 | CHUNK_OVERLAP = 100 # overlap between chunks |
71 | 71 | MIN_CHUNK_SIZE = 50 # skip tiny chunks |
72 | | -DRAWER_UPSERT_BATCH_SIZE = 1000 |
| 72 | +DRAWER_UPSERT_BATCH_SIZE = 1000 # canonical fork knob — used by add_drawers() |
| 73 | +# Alias for upstream PR #1085's name; same semantic (ChromaDB hard cap is |
| 74 | +# 5461). Fork keeps the more conservative 1000 default for embedding-pass |
| 75 | +# memory headroom; raise it toward 5000 if mining throughput is a |
| 76 | +# bottleneck. |
| 77 | +CHROMA_BATCH_LIMIT = DRAWER_UPSERT_BATCH_SIZE |
73 | 78 | MAX_FILE_SIZE = 500 * 1024 * 1024 # 500 MB — skip files larger than this. |
74 | 79 | # Long Claude Code sessions and large transcript exports routinely exceed |
75 | 80 | # 10 MB. The cap exists as a defensive rail against pathological binary |
@@ -733,62 +738,96 @@ def _extract_entities_for_metadata(content: str) -> str: |
733 | 738 | return ";".join(capped) |
734 | 739 |
|
735 | 740 |
|
736 | | -def _build_drawer_metadata( |
737 | | - wing: str, |
738 | | - room: str, |
739 | | - source_file: str, |
740 | | - chunk_index: int, |
741 | | - agent: str, |
742 | | - content: str, |
743 | | - source_mtime: Optional[float], |
744 | | -) -> dict: |
745 | | - """Build the metadata dict for one drawer without upserting. |
746 | | -
|
747 | | - Split out from ``add_drawer`` so ``process_file`` can batch all chunks |
748 | | - of a file into a single ``collection.upsert`` — one embedding forward |
749 | | - pass per batch instead of per chunk. |
| 741 | +def _build_drawer(wing, room, source_file, chunk_index, agent, content, now=None, source_mtime=None): |
| 742 | + """Build the ID, document, and metadata for a single drawer. |
| 743 | +
|
| 744 | + Shared by ``add_drawer`` (single insert) and ``add_drawers`` (batch insert) |
| 745 | + so metadata construction stays DRY. Hoists ``datetime.now()`` and |
| 746 | + ``os.path.getmtime()`` so callers can amortize them across all chunks |
| 747 | + of a file. |
750 | 748 | """ |
| 749 | + drawer_id = f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(chunk_index)).encode()).hexdigest()[:24]}" |
751 | 750 | metadata = { |
752 | 751 | "wing": wing, |
753 | 752 | "room": room, |
754 | 753 | "source_file": source_file, |
755 | 754 | "chunk_index": chunk_index, |
756 | 755 | "added_by": agent, |
757 | | - "filed_at": datetime.now().isoformat(), |
| 756 | + "filed_at": now or datetime.now().isoformat(), |
758 | 757 | "normalize_version": NORMALIZE_VERSION, |
| 758 | + "hall": detect_hall(content), |
759 | 759 | } |
760 | 760 | if source_mtime is not None: |
761 | 761 | metadata["source_mtime"] = source_mtime |
762 | | - metadata["hall"] = detect_hall(content) |
| 762 | + else: |
| 763 | + try: |
| 764 | + metadata["source_mtime"] = os.path.getmtime(source_file) |
| 765 | + except OSError: |
| 766 | + pass |
763 | 767 | entities = _extract_entities_for_metadata(content) |
764 | 768 | if entities: |
765 | 769 | metadata["entities"] = entities |
766 | | - return metadata |
| 770 | + return drawer_id, content, metadata |
767 | 771 |
|
768 | 772 |
|
769 | 773 | def add_drawer( |
770 | 774 | collection, wing: str, room: str, content: str, source_file: str, chunk_index: int, agent: str |
771 | 775 | ): |
772 | | - """Add one drawer to the palace. |
| 776 | + """Add one drawer to the palace.""" |
| 777 | + drawer_id, doc, metadata = _build_drawer(wing, room, source_file, chunk_index, agent, content) |
| 778 | + collection.upsert( |
| 779 | + documents=[doc], |
| 780 | + ids=[drawer_id], |
| 781 | + metadatas=[metadata], |
| 782 | + ) |
| 783 | + return True |
773 | 784 |
|
774 | | - Kept for backward compatibility with external callers. In-tree the |
775 | | - miner uses ``_build_drawer_metadata`` + a batched ``collection.upsert`` |
776 | | - to amortize the embedding model's forward-pass cost across chunks. |
| 785 | + |
| 786 | +def add_drawers(collection, wing, room, chunks, source_file, agent): |
| 787 | + """Batch-insert multiple drawers in one ChromaDB call per sub-batch. |
| 788 | +
|
| 789 | + Collects all chunks into batch lists and upserts them in groups of |
| 790 | + ``DRAWER_UPSERT_BATCH_SIZE`` (alias of ``CHROMA_BATCH_LIMIT``, kept |
| 791 | + so existing fork tests that ``monkeypatch.setattr(miner, |
| 792 | + "DRAWER_UPSERT_BATCH_SIZE", N)`` still drive the sub-batch loop). |
| 793 | + Returns ``(drawers_added, batch_ids)``. |
777 | 794 | """ |
778 | | - drawer_id = f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(chunk_index)).encode()).hexdigest()[:24]}" |
| 795 | + now = datetime.now().isoformat() |
779 | 796 | try: |
780 | 797 | source_mtime = os.path.getmtime(source_file) |
781 | 798 | except OSError: |
782 | 799 | source_mtime = None |
783 | | - metadata = _build_drawer_metadata( |
784 | | - wing, room, source_file, chunk_index, agent, content, source_mtime |
785 | | - ) |
786 | | - collection.upsert( |
787 | | - documents=[content], |
788 | | - ids=[drawer_id], |
789 | | - metadatas=[metadata], |
790 | | - ) |
791 | | - return True |
| 800 | + |
| 801 | + batch_docs = [] |
| 802 | + batch_ids = [] |
| 803 | + batch_metas = [] |
| 804 | + |
| 805 | + for chunk in chunks: |
| 806 | + drawer_id, doc, metadata = _build_drawer( |
| 807 | + wing, room, source_file, chunk["chunk_index"], agent, |
| 808 | + chunk["content"], now=now, source_mtime=source_mtime, |
| 809 | + ) |
| 810 | + batch_docs.append(doc) |
| 811 | + batch_ids.append(drawer_id) |
| 812 | + batch_metas.append(metadata) |
| 813 | + |
| 814 | + if not batch_docs: |
| 815 | + return 0, [] |
| 816 | + |
| 817 | + # Sub-batch to stay under ChromaDB's max batch size (5461). |
| 818 | + # DRAWER_UPSERT_BATCH_SIZE is kept as the public knob (fork-only, |
| 819 | + # preserved across the upstream #1085 cherry-pick); CHROMA_BATCH_LIMIT |
| 820 | + # is upstream's name for the same constant. |
| 821 | + drawers_added = 0 |
| 822 | + for i in range(0, len(batch_docs), DRAWER_UPSERT_BATCH_SIZE): |
| 823 | + collection.upsert( |
| 824 | + documents=batch_docs[i : i + DRAWER_UPSERT_BATCH_SIZE], |
| 825 | + ids=batch_ids[i : i + DRAWER_UPSERT_BATCH_SIZE], |
| 826 | + metadatas=batch_metas[i : i + DRAWER_UPSERT_BATCH_SIZE], |
| 827 | + ) |
| 828 | + drawers_added += len(batch_docs[i : i + DRAWER_UPSERT_BATCH_SIZE]) |
| 829 | + |
| 830 | + return drawers_added, batch_ids |
792 | 831 |
|
793 | 832 |
|
794 | 833 | # ============================================================================= |
@@ -847,51 +886,21 @@ def process_file( |
847 | 886 | except Exception: |
848 | 887 | pass |
849 | 888 |
|
850 | | - # Batch chunks into bounded upserts so the embedding model sees many |
851 | | - # chunks per forward pass without building one huge Chroma/SQLite |
852 | | - # request for pathological files. A bad chunk can fail its sub-batch; |
853 | | - # that is the deliberate trade-off for amortizing embedding overhead. |
854 | | - try: |
855 | | - source_mtime = os.path.getmtime(source_file) |
856 | | - except OSError: |
857 | | - source_mtime = None |
858 | | - |
859 | | - drawers_added = 0 |
860 | | - for batch_start in range(0, len(chunks), DRAWER_UPSERT_BATCH_SIZE): |
861 | | - batch_docs: list = [] |
862 | | - batch_ids: list = [] |
863 | | - batch_metas: list = [] |
864 | | - for chunk in chunks[batch_start : batch_start + DRAWER_UPSERT_BATCH_SIZE]: |
865 | | - drawer_id = f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(chunk['chunk_index'])).encode()).hexdigest()[:24]}" |
866 | | - batch_docs.append(chunk["content"]) |
867 | | - batch_ids.append(drawer_id) |
868 | | - batch_metas.append( |
869 | | - _build_drawer_metadata( |
870 | | - wing, |
871 | | - room, |
872 | | - source_file, |
873 | | - chunk["chunk_index"], |
874 | | - agent, |
875 | | - chunk["content"], |
876 | | - source_mtime, |
877 | | - ) |
878 | | - ) |
879 | | - collection.upsert( |
880 | | - documents=batch_docs, |
881 | | - ids=batch_ids, |
882 | | - metadatas=batch_metas, |
883 | | - ) |
884 | | - drawers_added += len(batch_docs) |
| 889 | + # Batch all chunks through a single add_drawers() call — sub- |
| 890 | + # batches at DRAWER_UPSERT_BATCH_SIZE so the embedding model sees |
| 891 | + # many chunks per forward pass without building one huge |
| 892 | + # Chroma/SQLite request for pathological files. A bad chunk fails |
| 893 | + # its sub-batch; that is the deliberate trade-off for amortizing |
| 894 | + # embedding overhead. (Upstream PR #1085, cherry-picked.) |
| 895 | + drawers_added, batch_ids = add_drawers( |
| 896 | + collection, wing, room, chunks, source_file, agent, |
| 897 | + ) |
885 | 898 |
|
886 | 899 | # Build closet — the searchable index pointing to these drawers. |
887 | 900 | # Purge first: a re-mine (mtime change or normalize_version bump) must |
888 | 901 | # fully replace the prior closets, not append to them. |
889 | 902 | if closets_col and drawers_added > 0: |
890 | | - drawer_ids = [ |
891 | | - f"drawer_{wing}_{room}_{hashlib.sha256((source_file + str(c['chunk_index'])).encode()).hexdigest()[:24]}" |
892 | | - for c in chunks |
893 | | - ] |
894 | | - closet_lines = build_closet_lines(source_file, drawer_ids, content, wing, room) |
| 903 | + closet_lines = build_closet_lines(source_file, batch_ids, content, wing, room) |
895 | 904 | closet_id_base = ( |
896 | 905 | f"closet_{wing}_{room}_{hashlib.sha256(source_file.encode()).hexdigest()[:24]}" |
897 | 906 | ) |
|
0 commit comments