-
Notifications
You must be signed in to change notification settings - Fork 613
feat: implement paralleled vectorizing and thread-safe processing in … #1111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…KnowledgeStorePresenter
WalkthroughAdds per-file task queues to serialize vector operations for the same file and refactors task processing to a bounded concurrent runner with configurable maxConcurrency; replaces single-task tracking with a runningTasks map and adds lifecycle/cleanup changes. Changes
Sequence Diagram(s)sequenceDiagram
participant Q as Global Queue
participant R as Runner Pool
participant RT as runningTasks
participant T as Task Worker
Q->>R: spawn up to maxConcurrency runners
Note right of R: maxConcurrency configurable (default 16)
par Concurrent runners
R->>Q: dequeue task -> add to RT
Q-->>T: task executed (async)
end
T->>RT: on finally remove from runningTasks
RT->>R: trigger processQueue() to continue
sequenceDiagram
participant App as Application
participant Store as KnowledgeStorePresenter
participant FQ as Per-File Queue
participant Vec as VectorP
App->>Store: addFile(fileId)
Store->>FQ: enqueueFileTask(fileId, updateFile)
App->>Store: processFileAsync(fileId)
Store->>FQ: enqueueFileTask(fileId, insertChunks)
FQ->>Vec: execute updateFile()
Vec-->>FQ: complete
Note over FQ: subsequent tasks for same file run sequentially
FQ->>Vec: execute insertChunks()
Vec-->>FQ: complete
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
I haven't tested it sufficiently yet so I converted this PR into draft. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (4)
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts (2)
12-20: Concurrent tracking viarunningTaskslooks good; consider small robustness tweaksThe switch from a single “current task” to
runningTasksandmaxConcurrencyis consistent across status helpers and should give correct aggregate stats and activity checks.Two minor follow-ups:
maxConcurrencyis accepted as-is (including 0/negative), which would stall processing. Consider clamping to>= 1in the constructor or throwing on invalid values.- The header comment still says “sequential execution”; it might be worth updating to reflect bounded parallelism to avoid confusion for future readers.
Also, CI is flagging Prettier; please run the formatter once you settle the implementation.
Also applies to: 61-64, 77-83, 89-91, 94-101, 103-109, 111-117
29-47: Termination semantics rely on cooperative tasks; document the contract
removeTasksnow correctly:
- Drops matching tasks from the queue.
- Aborts and cleans up any matching entries in
runningTasksviaterminateTask.
destroythen callsremoveTasks(() => true), clears the queue, aborts remaining controllers, and clearsrunningTasks, which is consistent.One behavioral nuance:
terminateTaskremoves a running task fromrunningTasksimmediately after aborting its controller, so the concurrency limiter may start new work even if the aborted task’srunimplementation doesn’t actually respectAbortSignaland keeps running. This is fine ifKnowledgeChunkTask.runis expected to be cooperative w.r.t.signal, but it’s worth documenting that assumption so future callers don’t accidentally violate the contract.Also applies to: 119-130, 132-139
src/main/presenter/knowledgePresenter/knowledgeStorePresenter.ts (2)
89-93: Be aware of changed failure semantics aroundvectorP.*operationsAll these call sites now go through
enqueueFileTask, so until its error-handling is adjusted:
- Failures in
updateFile/insertFileduringaddFilewill not rejectaddFile; callers may think a file was registered while it wasn’t.- Failures in
insertChunks,updateChunkStatus,insertVector,deleteFile,deleteChunksByFile,deleteVectorsByFile, and file status updates inonFileFinish/handleFileProcessingErrorwill be logged but treated as success by their callers.Once
enqueueFileTaskis updated to propagate errors (as suggested earlier), these sites will again behave like the previous directawait-based implementation, while still benefiting from per-file serialization.Also applies to: 127-128, 140-143, 156-158, 213-221, 271-273, 299-300, 317-318, 334-335, 370-371
325-342: Deletion/re-add/destroy flows mostly align; consider a couple of lifecycle refinements
deleteFilenow cancels queued/running chunk tasks and deletes the file via the per-file queue, which is consistent with the new model.reAddFilecleans out chunks and vectors via the per-file queue before callingaddFile, which is sensible.Two optional improvements:
reAddFiletask cancellation.
IfreAddFilecan be called while there are still active tasks for that file, you may want to invokethis.taskP.cancelTasksByFile(fileId)here as well, mirroringdeleteFile.
destroy/closeandfileQueueMap.
Bothdestroyandclosecancel tasks and clearfileProgressMapbut don’t touchfileQueueMap. Once you adjustenqueueFileTaskto clean up entries, that may be sufficient, but if there’s a chance of in-flight queued operations at teardown time, explicitly clearingfileQueueMap(after ensuring you’re OK with abandoning pending DB work) would make lifecycle semantics clearer.These are not correctness blockers but worth considering for long-running processes.
Also applies to: 365-373, 442-465
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/main/presenter/knowledgePresenter/knowledgeStorePresenter.ts(12 hunks)src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts(7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts (1)
src/shared/types/presenters/legacy.presenters.d.ts (3)
KnowledgeChunkTask(1645-1656)TaskStatusSummary(1659-1663)TaskQueueStatus(1666-1670)
🪛 GitHub Actions: PR Check
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
[warning] 1-1: Code style issues found by Prettier. Run 'prettier --write' to fix.
🔇 Additional comments (1)
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts (1)
141-170: Queue scheduling and cleanup logic is soundThe reworked
processQueueloop:
- Correctly pulls from
queuewhilerunningTasks.size < maxConcurrency.- Tracks each started task in
runningTasks.- Uses an async IIFE per task with a
finallyblock that always cleans upcontrollersandrunningTasks, logs completion, and then triggers anotherprocessQueue()pass.The split between:
- cooperative cancellation (success path +
controller.signal.aborted⇒onTerminate)- vs. rejected
AbortError(DOMException) in the catchgives clear semantics for termination vs. “real” failure.
Overall this looks correct and should respect the concurrency cap under the assumption that tasks honor the abort signal.
| // --- 新增:按文件队列保证 vectorP 线程安全 --- | ||
| private fileQueueMap = new Map<string, Promise<void>>() | ||
|
|
||
| private async enqueueFileTask(fileId: string, task: () => Promise<void>): Promise<void> { | ||
| const last = this.fileQueueMap.get(fileId) ?? Promise.resolve() | ||
| const next = last.then(task).catch((err) => { | ||
| console.error(`[RAG] Error in queued task for file ${fileId}:`, err) | ||
| }) | ||
| this.fileQueueMap.set(fileId, next) | ||
| await next | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enqueueFileTask currently swallows errors and can leak queue entries
The per-file queue idea is solid, but the current implementation has two important side effects:
-
Errors are swallowed for all callers.
next = last.then(task).catch(...)logs and then resolves, so anyawait this.enqueueFileTask(...)(e.g., inaddFile,deleteFile,reAddFile,onFileFinish, etc.) will never see a rejection, even if an underlyingvectorP.*call fails. This changes behavior versus the previous directawait this.vectorP.*and can leave callers thinking an operation succeeded when it did not. -
fileQueueMapentries are never cleared.
The map only ever grows; once a file (or any ID key) has queued work, its resolvedPromisestays infileQueueMapindefinitely. Over many files/chunks this becomes a leak.
A simple pattern that keeps per-file sequencing, lets callers observe errors, and still prevents a failed task from blocking subsequent ones is:
private async enqueueFileTask(fileId: string, task: () => Promise<void>): Promise<void> {
const prev = this.fileQueueMap.get(fileId) ?? Promise.resolve();
const next = prev.then(task);
const wrapped = next
.catch((err) => {
console.error(`[RAG] Error in queued task for file ${fileId}:`, err);
})
.finally(() => {
if (this.fileQueueMap.get(fileId) === wrapped) {
this.fileQueueMap.delete(fileId);
}
});
this.fileQueueMap.set(fileId, wrapped);
return next; // callers still see rejection if task fails
}This preserves serialization, avoids unbounded map growth, and restores proper error propagation to callers.
🤖 Prompt for AI Agents
In src/main/presenter/knowledgePresenter/knowledgeStorePresenter.ts around lines
27 to 37, the current enqueueFileTask swallows errors and never clears
fileQueueMap entries; change it to get the previous promise with
Promise.resolve(), set next = prev.then(task) (so callers receive the real
rejection), create a wrapped promise that catches/logs errors and in finally
deletes the map entry only if the stored value equals the wrapped promise, store
wrapped in fileQueueMap, and return next (not wrapped) so callers observe errors
while later tasks still run and the map does not leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts (1)
132-169: Replace DOMException check with signal.aborted and use structured logger instead of consoleThe code at lines 132-169 uses
error instanceof DOMExceptionto detect abort errors, which is fragile in Node/Electron environments and may throw a ReferenceError if DOMException is undefined. Additionally, it violates coding guidelines by usingconsole.logandconsole.errorinstead of structured logging.Changes required:
Line 155: Replace DOMException instanceof check with
controller.signal.abortedto detect abort errors reliably:const isAbortError = controller.signal.aborted || (error as any)?.name === 'AbortError' if (isAbortError) {Lines 156, 158, 163: Replace all console calls with structured logger. Import logger at the top:
import logger from '../../../shared/logger'Then use:
logger.info('[RAG TASK] Task %s aborted during execution.', task.id) logger.error('[RAG TASK] Task %s failed with error:', task.id, { error }) logger.info('[RAG TASK] Task %s finished.', task.id)
🧹 Nitpick comments (3)
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts (3)
1-20: Align class documentation with new concurrent execution modelThe class-level comment still says “sequential execution”, but with
runningTasksandmaxConcurrencyyou now allow up tomaxConcurrencytasks to run in parallel. It would be clearer to update the doc to something like “bounded concurrent execution (default 16 tasks)” so callers understand the behavior and the meaning ofmaxConcurrency.You might also consider mentioning the concurrency behavior in any higher-level presenter docs that reference this class, so the global scheduler’s semantics are explicit.
29-47: Cancellation now covers running tasks; consider semantics for queued task termination callbacksExtending
removeTasksto iteraterunningTasksand callterminateTaskfor matching entries is a good improvement and ensures running tasks are actually aborted and removed.One open design point: for tasks that are still in the queue (never started), you currently remove them and abort their controller, but you never invoke
onTerminate. If callers expectonTerminateto fire whenever a task is canceled (regardless of whether it had started), you may want to invoke it when filtering queued tasks:this.queue = this.queue.filter((task) => { if (filter(task)) { console.log(`[RAG TASK] Removing queued task: ${task.id}`) this.terminateTask(task.id) + task.onTerminate?.() return false } return true })If
onTerminateis intended only for tasks that had actually begun execution, the current behavior is fine; just worth double‑checking the contract.
119-130: Destroy behavior thoroughly cleans up queue, controllers, and runningTasks
destroy()now:
- Cancels all queued and running tasks via
removeTasks(() => true), which drivesterminateTaskand aborts controllers.- Resets
this.queueto[].- Aborts any remaining controllers and clears both
controllersandrunningTasks.This provides a robust cleanup point for the presenter. The extra
controllers.forEach((c) => c.abort())afterremoveTasksis slightly redundant in the current implementation (sinceterminateTaskalready aborts), but it’s harmless and defensive if new code ever adds controllers without going throughremoveTasks.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/main/presenter/knowledgePresenter/knowledgeStorePresenter.ts(12 hunks)src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts(7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/main/presenter/knowledgePresenter/knowledgeStorePresenter.ts
🧰 Additional context used
📓 Path-based instructions (13)
**/*.{ts,tsx,js,jsx,vue}
📄 CodeRabbit inference engine (CLAUDE.md)
Use English for logs and comments (Chinese text exists in legacy code, but new code should use English)
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
**/*.{ts,tsx}
📄 CodeRabbit inference engine (CLAUDE.md)
Enable and maintain strict TypeScript type checking for all files
**/*.{ts,tsx}: Always use try-catch to handle possible errors in TypeScript code
Provide meaningful error messages when catching errors
Log detailed error logs including error details, context, and stack traces
Distinguish and handle different error types (UserError, NetworkError, SystemError, BusinessError) with appropriate handlers in TypeScript
Use structured logging with logger.error(), logger.warn(), logger.info(), logger.debug() methods from logging utilities
Do not suppress errors (avoid empty catch blocks or silently ignoring errors)
Provide user-friendly error messages for user-facing errors in TypeScript components
Implement error retry mechanisms for transient failures in TypeScript
Avoid logging sensitive information (passwords, tokens, PII) in logs
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
src/main/presenter/**/*.ts
📄 CodeRabbit inference engine (CLAUDE.md)
Organize core business logic into dedicated Presenter classes, with one presenter per functional domain
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
src/main/**/*.ts
📄 CodeRabbit inference engine (CLAUDE.md)
Use EventBus from
src/main/eventbus.tsfor main-to-renderer communication, broadcasting events viamainWindow.webContents.send()
src/main/**/*.ts: Use EventBus pattern for inter-process communication within the main process to decouple modules
Use Electron's built-in APIs for file system and native dialogs instead of Node.js or custom implementations
src/main/**/*.ts: Electron main process code belongs insrc/main/with presenters inpresenter/(Window/Tab/Thread/Mcp/Config/LLMProvider) andeventbus.tsfor app events
Use the Presenter pattern in the main process for UI coordination
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
**/*.ts
📄 CodeRabbit inference engine (CLAUDE.md)
Do not include AI co-authoring information (e.g., 'Co-Authored-By: Claude') in git commits
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
**/*.{js,ts,jsx,tsx,mjs,cjs}
📄 CodeRabbit inference engine (.cursor/rules/development-setup.mdc)
Write logs and comments in English
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
{src/main/presenter/**/*.ts,src/renderer/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/electron-best-practices.mdc)
Implement proper inter-process communication (IPC) patterns using Electron's ipcRenderer and ipcMain APIs
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
src/**/*
📄 CodeRabbit inference engine (.cursor/rules/project-structure.mdc)
New features should be developed in the
srcdirectory
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
src/main/**/*.{js,ts}
📄 CodeRabbit inference engine (.cursor/rules/project-structure.mdc)
Main process code for Electron should be placed in
src/main
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
src/**/*.{ts,tsx,vue,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
Use Prettier with single quotes, no semicolons, and 100 character width
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
src/**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
Use OxLint for linting JavaScript and TypeScript files
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
src/**/*.{ts,tsx}
📄 CodeRabbit inference engine (AGENTS.md)
src/**/*.{ts,tsx}: Use camelCase for variable and function names in TypeScript files
Use PascalCase for type and class names in TypeScript
Use SCREAMING_SNAKE_CASE for constant names
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
src/**/*.ts
📄 CodeRabbit inference engine (AGENTS.md)
Use EventBus for inter-process communication events
Files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
🧠 Learnings (5)
📚 Learning: 2025-11-25T05:26:11.297Z
Learnt from: CR
Repo: ThinkInAIXYZ/deepchat PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-11-25T05:26:11.297Z
Learning: Applies to src/main/presenter/**/*.ts : Organize core business logic into dedicated Presenter classes, with one presenter per functional domain
Applied to files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
📚 Learning: 2025-11-25T05:28:20.500Z
Learnt from: CR
Repo: ThinkInAIXYZ/deepchat PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-11-25T05:28:20.500Z
Learning: Applies to src/main/**/*.ts : Use the Presenter pattern in the main process for UI coordination
Applied to files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
📚 Learning: 2025-11-25T05:27:12.201Z
Learnt from: CR
Repo: ThinkInAIXYZ/deepchat PR: 0
File: .cursor/rules/llm-agent-loop.mdc:0-0
Timestamp: 2025-11-25T05:27:12.201Z
Learning: Implement separation of concerns where `src/main/presenter/llmProviderPresenter/index.ts` manages the Agent loop and conversation history, while Provider files handle LLM API interactions, Provider-specific request/response formatting, tool definition conversion, and native vs non-native tool call mechanisms
Applied to files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
📚 Learning: 2025-11-25T05:27:12.201Z
Learnt from: CR
Repo: ThinkInAIXYZ/deepchat PR: 0
File: .cursor/rules/llm-agent-loop.mdc:0-0
Timestamp: 2025-11-25T05:27:12.201Z
Learning: Applies to src/main/presenter/llmProviderPresenter/index.ts : In `src/main/presenter/llmProviderPresenter/index.ts` (`startStreamCompletion`), implement the Agent loop that manages the overall conversation flow, including multiple rounds of LLM calls and tool usage, maintaining `conversationMessages` history, calling `provider.coreStream()` on each iteration, and controlling the loop using `needContinueConversation` and `toolCallCount` (compared against `MAX_TOOL_CALLS`)
Applied to files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
📚 Learning: 2025-11-25T05:27:12.201Z
Learnt from: CR
Repo: ThinkInAIXYZ/deepchat PR: 0
File: .cursor/rules/llm-agent-loop.mdc:0-0
Timestamp: 2025-11-25T05:27:12.201Z
Learning: Applies to src/main/presenter/llmProviderPresenter/index.ts : In `src/main/presenter/llmProviderPresenter/index.ts`, listen for standardized events yielded by `coreStream` and handle them accordingly: buffer text content (`currentContent`), handle `tool_call_start/chunk/end` events by collecting tool details and calling `presenter.mcpPresenter.callTool`, send frontend events via `eventBus` with tool call status, format tool results for the next LLM call, and set `needContinueConversation = true`
Applied to files:
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts
🧬 Code graph analysis (1)
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts (2)
src/shared/types/presenters/legacy.presenters.d.ts (3)
KnowledgeChunkTask(1645-1656)TaskStatusSummary(1659-1663)TaskQueueStatus(1666-1670)scripts/fetch-provider-db.mjs (2)
controller(148-148)error(14-14)
🔇 Additional comments (1)
src/main/presenter/knowledgePresenter/knowledgeTaskPresenter.ts (1)
60-116: Status and “has active tasks” methods correctly incorporate concurrent runningTasksThe updates to
getTaskStatus,hasActiveTasks*, andgetStatuslook consistent with the new concurrent model:
pendingandqueuedTasksderive fromthis.queue.length.processingandrunningTasksderive fromthis.runningTasks.size.byKnowledgeBaseaggregates over both queued and running tasks, keeping per‑KB pending/processing in sync with the global counts.hasActiveTasksForKnowledgeBase/hasActiveTasksForFilecorrectly short‑circuit on either queued or running tasks.Functionally this all lines up with the updated
TaskStatusSummary/TaskQueueStatusexpectations; no issues from a concurrency or counting perspective.
…KnowledgeStorePresenter
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.