-
Notifications
You must be signed in to change notification settings - Fork 715
feat: support choose kv watcher or queue for sync events #8858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Explore these optional code suggestions:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Summary
Introduces configurable event synchronization mechanism allowing administrators to choose between NATS KV watcher and queue-based approaches per module using the ZO_NATS_KV_WATCH_MODULES environment variable.
Key changes:
- Added
ZO_NATS_KV_WATCH_MODULESconfiguration for specifying which module prefixes use KV watching - Implemented
kv_watch()method that spawns a background task to watch NATS KV bucket changes directly - Modified
put(),delete(), andwatch()operations to conditionally skip queue events when KV watcher is enabled for a prefix - Improved string filtering in
COMPACT_OLD_DATA_STREAM_SETto handle empty values - Enhanced log formatting in coordinator events for better readability
Architecture impact:
The change provides flexibility to optimize event propagation by bypassing the queue layer for specific modules, potentially reducing latency and improving real-time responsiveness for critical data like node heartbeats.
Confidence Score: 4/5
- This PR is safe to merge with minor style improvements recommended
- The implementation is solid with proper error handling, retry logic, and follows existing patterns in the codebase. The feature adds a configurable mechanism without breaking existing functionality. Score is 4 due to a minor variable shadowing issue that could affect code maintainability, though it doesn't cause functional problems
- Pay attention to
src/infra/src/db/nats.rsfor the variable shadowing issue on line 196, which should be addressed to improve code clarity
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/config/src/config.rs | 5/5 | Added ZO_NATS_KV_WATCH_MODULES config variable and improved COMPACT_OLD_DATA_STREAM_SET to filter empty strings |
| src/infra/src/coordinator/events.rs | 5/5 | Improved log formatting by changing debug format specifiers from {:?} to {} for cleaner output |
| src/infra/src/db/nats.rs | 4/5 | Implemented new kv_watch method for direct KV watching and integrated use_kv_watcher check into put/delete/watch operations; minor variable shadowing issue |
Sequence Diagram
sequenceDiagram
participant App as Application
participant DB as NatsDb
participant Config as use_kv_watcher()
participant KVWatch as kv_watch()
participant Coord as coordinator::events
participant NATS as NATS Server
Note over App,NATS: Configuration Phase
App->>Config: Check ZO_NATS_KV_WATCH_MODULES
Config-->>App: Module prefixes configured
Note over App,NATS: Watch Setup
App->>DB: watch(prefix)
DB->>Config: use_kv_watcher(prefix)?
alt KV Watcher Mode (prefix in config)
Config-->>DB: true
DB->>KVWatch: kv_watch(prefix)
KVWatch->>NATS: get_bucket_by_key()
NATS-->>KVWatch: bucket + key
KVWatch->>NATS: bucket.watch_all()
NATS-->>KVWatch: entries stream
KVWatch->>KVWatch: spawn background task
KVWatch-->>DB: Receiver<Event>
loop Background Task Loop
KVWatch->>NATS: entries.next()
NATS-->>KVWatch: KV entry
KVWatch->>KVWatch: decode & filter by prefix
KVWatch->>App: send Event (Put/Delete)
end
else Queue Mode (default)
Config-->>DB: false
DB->>Coord: coordinator::events::watch(prefix)
Coord-->>DB: Receiver<Event>
Note over DB,Coord: Events sent via queue
DB->>Coord: put_event() / delete_event()
Coord->>NATS: Queue message
NATS-->>Coord: Deliver to subscribers
Coord->>App: send Event
end
Note over App,NATS: Put/Delete Operations
App->>DB: put(key, value)
DB->>NATS: bucket.put()
DB->>Config: use_kv_watcher(key)?
alt Using Queue
DB->>Coord: coordinator::events::put_event()
else Using KV Watcher
Note over DB,Coord: Skip queue - KV watcher handles it
end
3 files reviewed, 1 comment
Testdino Test Results
|
Testdino Test Results
|
|
| Status | Total | Passed | Failed | Skipped | Flaky | Pass Rate | Duration |
|---|---|---|---|---|---|---|---|
| All tests passed | 364 | 340 | 0 | 19 | 5 | 93% | 4m 39s |
User description
There is a new ENV
ZO_NATS_KV_WATCH_MODULESand default value is empty.If you want to set something to use
nats kv watcherinstead ofnats queue, you can configure it like this:for
nodesevents:add
user_sessionsevents:When you start node, you will see some logs like this:
PR Type
Enhancement
Description
Add NATS KV watcher option by prefix
New env
ZO_NATS_KV_WATCH_MODULESAvoid duplicate coordinator events when watching
Improve logging for coordinator and KV watcher
Diagram Walkthrough
File Walkthrough
config.rs
Add env config and parsing for KV watch modulessrc/config/src/config.rs
NATS_KV_WATCH_MODULESparsed intoHashSet.kv_watch_modulesenv-config with defaults and help.nats.rs
Implement KV-based watcher and integrate with DB opssrc/infra/src/db/nats.rs
kv_watchusing NATS KVwatch_all.NatsEventand addEventDatausage.use_kv_watcherto gate coordinator events.watchto KV watcher based on prefix.use_kv_watcher.events.rs
Tweak coordinator event dispatch loggingsrc/infra/src/coordinator/events.rs