Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

@hengfeiyang hengfeiyang commented Oct 21, 2025

User description

There is a new ENV ZO_NATS_KV_WATCH_MODULES and default value is empty.

If you want to set something to use nats kv watcher instead of nats queue, you can configure it like this:

for nodes events:

ZO_NATS_KV_WATCH_MODULES="/nodes/"

add user_sessions events:

ZO_NATS_KV_WATCH_MODULES="/nodes/,/user_sessions/"

When you start node, you will see some logs like this:

2025-10-21T06:24+00:00 DEBUG infra::db::nats: [NATS:kv_watch] bucket: o2_nodes, prefix: /nodes/

PR Type

Enhancement


Description

  • Add NATS KV watcher option by prefix

  • New env ZO_NATS_KV_WATCH_MODULES

  • Avoid duplicate coordinator events when watching

  • Improve logging for coordinator and KV watcher


Diagram Walkthrough

flowchart LR
  Env["Env ZO_NATS_KV_WATCH_MODULES"] -- "prefix set" --> UseKV["use_kv_watcher(prefix)"]
  WatchCall["Db::watch(prefix)"] -- "true" --> KvWatch["NATS KV watch_all()"]
  WatchCall -- "false" --> CoordWatch["Coordinator events::watch()"]
  PutDelete["Db::put/delete(..., need_watch)"] -- "kv watcher true" --> SkipCoord["Skip coordinator event"]
  PutDelete -- "kv watcher false" --> SendCoord["Send coordinator event"]
Loading

File Walkthrough

Relevant files
Enhancement
config.rs
Add env config and parsing for KV watch modules                   

src/config/src/config.rs

  • Filter empty items when parsing lists.
  • Add NATS_KV_WATCH_MODULES parsed into HashSet.
  • Introduce kv_watch_modules env-config with defaults and help.
+30/-1   
nats.rs
Implement KV-based watcher and integrate with DB ops         

src/infra/src/db/nats.rs

  • Introduce kv_watch using NATS KV watch_all.
  • Export NatsEvent and add EventData usage.
  • Add use_kv_watcher to gate coordinator events.
  • Route watch to KV watcher based on prefix.
  • Skip coordinator events on put/delete when KV watcher active.
  • Add tests for use_kv_watcher.
+118/-6 
Formatting
events.rs
Tweak coordinator event dispatch logging                                 

src/infra/src/coordinator/events.rs

  • Improve debug log format for watcher dispatch.
+1/-1     

@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Backpressure Risk

The KV watch channel is created with a very large buffer (65535) and uses try_send without handling Full specifically, which may drop events silently under sustained load; verify acceptable loss semantics or implement backpressure/retry.

let (tx, rx) = mpsc::channel(65535);
let prefix = prefix.to_string();
let self_prefix = self.prefix.to_string();
let _task: JoinHandle<Result<()>> = tokio::task::spawn(async move {
    loop {
        if cluster::is_offline() {
            break;
        }
        let (bucket, new_key) = match get_bucket_by_key(&self_prefix, &prefix).await {
            Ok(v) => v,
            Err(e) => {
                log::error!("[NATS:kv_watch] prefix: {prefix}, get bucket error: {e}");
                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                continue;
            }
        };
        let bucket_prefix = "/".to_string() + bucket.name.trim_start_matches(&self_prefix);
        log::debug!(
            "[NATS:kv_watch] bucket: {}, prefix: {}",
            bucket.name,
            prefix
        );
        let mut entries = match bucket.watch_all().await {
            Ok(v) => v,
            Err(e) => {
                log::error!(
                    "[NATS:kv_watch] prefix: {prefix}, bucket.watch_all error: {e}"
                );
                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                continue;
            }
        };
        loop {
            match entries.next().await {
                None => {
                    log::error!("[NATS:kv_watch] prefix: {prefix}, get message error");
                    break;
                }
                Some(entry) => {
                    let entry = match entry {
                        Ok(entry) => entry,
                        Err(e) => {
                            log::error!(
                                "[NATS:kv_watch] prefix: {prefix}, get message error: {e}"
                            );
                            break;
                        }
                    };
                    let item_key = key_decode(&entry.key);
                    if !item_key.starts_with(new_key) {
                        continue;
                    }
                    let new_key = bucket_prefix.to_string() + &item_key;
                    let ret = match entry.operation {
                        jetstream::kv::Operation::Put => {
                            tx.try_send(Event::Put(EventData {
                                key: new_key.clone(),
                                value: Some(entry.value),
                                start_dt: None,
                            }))
                        }
                        jetstream::kv::Operation::Delete
                        | jetstream::kv::Operation::Purge => {
                            tx.try_send(Event::Delete(EventData {
                                key: new_key.clone(),
                                value: None,
                                start_dt: None,
                            }))
                        }
                    };
                    if let Err(e) = ret {
                        log::warn!(
                            "[NATS:kv_watch] prefix: {prefix}, key: {new_key}, send error: {e}"
                        );
                    }
Task Lifecycle

The spawned kv_watch task's JoinHandle is not retained or aborted on shutdown; ensure it won’t leak on service stop and that cluster offline detection is sufficient for orderly exit.

    let _task: JoinHandle<Result<()>> = tokio::task::spawn(async move {
        loop {
            if cluster::is_offline() {
                break;
            }
            let (bucket, new_key) = match get_bucket_by_key(&self_prefix, &prefix).await {
                Ok(v) => v,
                Err(e) => {
                    log::error!("[NATS:kv_watch] prefix: {prefix}, get bucket error: {e}");
                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                    continue;
                }
            };
            let bucket_prefix = "/".to_string() + bucket.name.trim_start_matches(&self_prefix);
            log::debug!(
                "[NATS:kv_watch] bucket: {}, prefix: {}",
                bucket.name,
                prefix
            );
            let mut entries = match bucket.watch_all().await {
                Ok(v) => v,
                Err(e) => {
                    log::error!(
                        "[NATS:kv_watch] prefix: {prefix}, bucket.watch_all error: {e}"
                    );
                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                    continue;
                }
            };
            loop {
                match entries.next().await {
                    None => {
                        log::error!("[NATS:kv_watch] prefix: {prefix}, get message error");
                        break;
                    }
                    Some(entry) => {
                        let entry = match entry {
                            Ok(entry) => entry,
                            Err(e) => {
                                log::error!(
                                    "[NATS:kv_watch] prefix: {prefix}, get message error: {e}"
                                );
                                break;
                            }
                        };
                        let item_key = key_decode(&entry.key);
                        if !item_key.starts_with(new_key) {
                            continue;
                        }
                        let new_key = bucket_prefix.to_string() + &item_key;
                        let ret = match entry.operation {
                            jetstream::kv::Operation::Put => {
                                tx.try_send(Event::Put(EventData {
                                    key: new_key.clone(),
                                    value: Some(entry.value),
                                    start_dt: None,
                                }))
                            }
                            jetstream::kv::Operation::Delete
                            | jetstream::kv::Operation::Purge => {
                                tx.try_send(Event::Delete(EventData {
                                    key: new_key.clone(),
                                    value: None,
                                    start_dt: None,
                                }))
                            }
                        };
                        if let Err(e) = ret {
                            log::warn!(
                                "[NATS:kv_watch] prefix: {prefix}, key: {new_key}, send error: {e}"
                            );
                        }
                    }
                }
            }
        }
        Ok(())
    });
    Ok(Arc::new(rx))
}
Config Parsing

kv_watch_modules parsing trims and skips empty items, but quoted examples in docs (e.g., '"/user_sessions/"') may leave quotes in values; confirm how quotes are provided via env and whether additional sanitization is needed.

pub static NATS_KV_WATCH_MODULES: Lazy<HashSet<String>> = Lazy::new(|| {
    get_config()
        .nats
        .kv_watch_modules
        .split(',')
        .filter_map(|s| {
            let s = s.trim();
            if s.is_empty() {
                None
            } else {
                Some(s.to_string())
            }
        })

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure spawned task lifecycle

The spawned task handle is never awaited or stored, which risks it being dropped if
the runtime shuts down and makes graceful shutdown/leak tracking hard. Store the
JoinHandle inside the returned receiver wrapper or detach explicitly with
tokio::spawn without binding, and add a shutdown check on tx.is_closed() to exit the
inner loop.

src/infra/src/db/nats.rs [143-226]

 async fn kv_watch(&self, prefix: &str) -> Result<Arc<mpsc::Receiver<Event>>> {
     let (tx, rx) = mpsc::channel(65535);
     let prefix = prefix.to_string();
     let self_prefix = self.prefix.to_string();
-    let _task: JoinHandle<Result<()>> = tokio::task::spawn(async move {
-        loop {
-            if cluster::is_offline() {
-                break;
-            }
-            let (bucket, new_key) = match get_bucket_by_key(&self_prefix, &prefix).await {
-                Ok(v) => v,
-                Err(e) => {
-                    log::error!("[NATS:kv_watch] prefix: {prefix}, get bucket error: {e}");
-                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
-                    continue;
+    tokio::task::spawn({
+        let mut tx = tx.clone();
+        async move {
+            loop {
+                if cluster::is_offline() || tx.is_closed() {
+                    break;
                 }
-            };
-            let bucket_prefix = "/".to_string() + bucket.name.trim_start_matches(&self_prefix);
-            log::debug!(
-                "[NATS:kv_watch] bucket: {}, prefix: {}",
-                bucket.name,
-                prefix
-            );
-            let mut entries = match bucket.watch_all().await {
-                Ok(v) => v,
-                Err(e) => {
-                    log::error!(
-                        "[NATS:kv_watch] prefix: {prefix}, bucket.watch_all error: {e}"
-                    );
-                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
-                    continue;
-                }
-            };
-            loop {
-                match entries.next().await {
-                    None => {
-                        log::error!("[NATS:kv_watch] prefix: {prefix}, get message error");
-                        break;
+                let (bucket, new_key) = match get_bucket_by_key(&self_prefix, &prefix).await {
+                    Ok(v) => v,
+                    Err(e) => {
+                        log::error!("[NATS:kv_watch] prefix: {prefix}, get bucket error: {e}");
+                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+                        continue;
                     }
-                    Some(entry) => {
-                        let entry = match entry {
-                            Ok(entry) => entry,
-                            Err(e) => {
-                                log::error!(
-                                    "[NATS:kv_watch] prefix: {prefix}, get message error: {e}"
-                                );
-                                break;
+                };
+                let bucket_prefix = "/".to_string() + bucket.name.trim_start_matches(&self_prefix);
+                log::debug!("[NATS:kv_watch] bucket: {}, prefix: {}", bucket.name, prefix);
+                let mut entries = match bucket.watch_all().await {
+                    Ok(v) => v,
+                    Err(e) => {
+                        log::error!("[NATS:kv_watch] prefix: {prefix}, bucket.watch_all error: {e}");
+                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+                        continue;
+                    }
+                };
+                loop {
+                    if tx.is_closed() {
+                        return;
+                    }
+                    match entries.next().await {
+                        None => {
+                            log::error!("[NATS:kv_watch] prefix: {prefix}, get message error");
+                            break;
+                        }
+                        Some(entry) => {
+                            let entry = match entry {
+                                Ok(entry) => entry,
+                                Err(e) => {
+                                    log::error!("[NATS:kv_watch] prefix: {prefix}, get message error: {e}");
+                                    break;
+                                }
+                            };
+                            let item_key = key_decode(&entry.key);
+                            if !item_key.starts_with(new_key) {
+                                continue;
                             }
-                        };
-                        let item_key = key_decode(&entry.key);
-                        if !item_key.starts_with(new_key) {
-                            continue;
-                        }
-                        let new_key = bucket_prefix.to_string() + &item_key;
-                        let ret = match entry.operation {
-                            jetstream::kv::Operation::Put => {
-                                tx.try_send(Event::Put(EventData {
-                                    key: new_key.clone(),
-                                    value: Some(entry.value),
-                                    start_dt: None,
-                                }))
+                            let new_key = bucket_prefix.to_string() + &item_key;
+                            let ret = match entry.operation {
+                                jetstream::kv::Operation::Put => {
+                                    tx.try_send(Event::Put(EventData {
+                                        key: new_key.clone(),
+                                        value: Some(entry.value),
+                                        start_dt: None,
+                                    }))
+                                }
+                                jetstream::kv::Operation::Delete | jetstream::kv::Operation::Purge => {
+                                    tx.try_send(Event::Delete(EventData {
+                                        key: new_key.clone(),
+                                        value: None,
+                                        start_dt: None,
+                                    }))
+                                }
+                            };
+                            if let Err(e) = ret {
+                                log::warn!("[NATS:kv_watch] prefix: {prefix}, key: {new_key}, send error: {e}");
                             }
-                            jetstream::kv::Operation::Delete
-                            | jetstream::kv::Operation::Purge => {
-                                tx.try_send(Event::Delete(EventData {
-                                    key: new_key.clone(),
-                                    value: None,
-                                    start_dt: None,
-                                }))
-                            }
-                        };
-                        if let Err(e) = ret {
-                            log::warn!(
-                                "[NATS:kv_watch] prefix: {prefix}, key: {new_key}, send error: {e}"
-                            );
                         }
                     }
                 }
             }
         }
-        Ok(())
     });
     Ok(Arc::new(rx))
 }
Suggestion importance[1-10]: 6

__

Why: Storing or handling the spawned task lifecycle and checking tx.is_closed() improves robustness and shutdown behavior; the idea is reasonable and grounded in the new code, though not strictly critical to correctness.

Low
General
Add retry backoff on stream end

When the stream ends (None) or errors, you immediately break but then reopen the
entire bucket watch in a tight loop, potentially spamming logs. Add a short backoff
before retrying the outer loop and distinguish EOF from transient errors to avoid
hot-looping and log floods.

src/infra/src/db/nats.rs [166-221]

 let mut entries = match bucket.watch_all().await {
     Ok(v) => v,
     Err(e) => {
-        log::error!(
-            "[NATS:kv_watch] prefix: {prefix}, bucket.watch_all error: {e}"
-        );
+        log::error!("[NATS:kv_watch] prefix: {prefix}, bucket.watch_all error: {e}");
         tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
         continue;
     }
 };
 loop {
     match entries.next().await {
         None => {
-            log::error!("[NATS:kv_watch] prefix: {prefix}, get message error");
+            log::warn!("[NATS:kv_watch] prefix: {prefix}, stream ended, retrying after backoff");
+            tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
             break;
         }
         Some(entry) => {
             let entry = match entry {
                 Ok(entry) => entry,
                 Err(e) => {
-                    log::error!(
-                        "[NATS:kv_watch] prefix: {prefix}, get message error: {e}"
-                    );
+                    log::warn!("[NATS:kv_watch] prefix: {prefix}, stream error: {e}, retrying after backoff");
+                    tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
                     break;
                 }
             };
             ...
         }
     }
 }
Suggestion importance[1-10]: 5

__

Why: Adding a small backoff on stream end/error can reduce log spam and hot-looping; it's a sensible, moderate improvement aligned with the existing retry behavior.

Low
Sanitize prefixes before matching

The check is a simple prefix match; stray whitespace or empty entries in
ZO_NATS_KV_WATCH_MODULES could cause unexpected behavior if not filtered
consistently. Trim prefixes once and skip empties to ensure reliable matching.

src/infra/src/db/nats.rs [895-899]

 #[inline]
 fn use_kv_watcher(key: &str) -> bool {
     config::NATS_KV_WATCH_MODULES
         .iter()
+        .filter_map(|p| {
+            let t = p.trim();
+            if t.is_empty() { None } else { Some(t) }
+        })
         .any(|prefix| key.starts_with(prefix))
 }
Suggestion importance[1-10]: 4

__

Why: While sanitization is generally good, the PR already trims and filters empties when populating NATS_KV_WATCH_MODULES, so this adds limited extra value.

Low

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Summary

Introduces configurable event synchronization mechanism allowing administrators to choose between NATS KV watcher and queue-based approaches per module using the ZO_NATS_KV_WATCH_MODULES environment variable.

Key changes:

  • Added ZO_NATS_KV_WATCH_MODULES configuration for specifying which module prefixes use KV watching
  • Implemented kv_watch() method that spawns a background task to watch NATS KV bucket changes directly
  • Modified put(), delete(), and watch() operations to conditionally skip queue events when KV watcher is enabled for a prefix
  • Improved string filtering in COMPACT_OLD_DATA_STREAM_SET to handle empty values
  • Enhanced log formatting in coordinator events for better readability

Architecture impact:
The change provides flexibility to optimize event propagation by bypassing the queue layer for specific modules, potentially reducing latency and improving real-time responsiveness for critical data like node heartbeats.

Confidence Score: 4/5

  • This PR is safe to merge with minor style improvements recommended
  • The implementation is solid with proper error handling, retry logic, and follows existing patterns in the codebase. The feature adds a configurable mechanism without breaking existing functionality. Score is 4 due to a minor variable shadowing issue that could affect code maintainability, though it doesn't cause functional problems
  • Pay attention to src/infra/src/db/nats.rs for the variable shadowing issue on line 196, which should be addressed to improve code clarity

Important Files Changed

File Analysis

Filename Score Overview
src/config/src/config.rs 5/5 Added ZO_NATS_KV_WATCH_MODULES config variable and improved COMPACT_OLD_DATA_STREAM_SET to filter empty strings
src/infra/src/coordinator/events.rs 5/5 Improved log formatting by changing debug format specifiers from {:?} to {} for cleaner output
src/infra/src/db/nats.rs 4/5 Implemented new kv_watch method for direct KV watching and integrated use_kv_watcher check into put/delete/watch operations; minor variable shadowing issue

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant DB as NatsDb
    participant Config as use_kv_watcher()
    participant KVWatch as kv_watch()
    participant Coord as coordinator::events
    participant NATS as NATS Server
    
    Note over App,NATS: Configuration Phase
    App->>Config: Check ZO_NATS_KV_WATCH_MODULES
    Config-->>App: Module prefixes configured
    
    Note over App,NATS: Watch Setup
    App->>DB: watch(prefix)
    DB->>Config: use_kv_watcher(prefix)?
    
    alt KV Watcher Mode (prefix in config)
        Config-->>DB: true
        DB->>KVWatch: kv_watch(prefix)
        KVWatch->>NATS: get_bucket_by_key()
        NATS-->>KVWatch: bucket + key
        KVWatch->>NATS: bucket.watch_all()
        NATS-->>KVWatch: entries stream
        KVWatch->>KVWatch: spawn background task
        KVWatch-->>DB: Receiver<Event>
        
        loop Background Task Loop
            KVWatch->>NATS: entries.next()
            NATS-->>KVWatch: KV entry
            KVWatch->>KVWatch: decode & filter by prefix
            KVWatch->>App: send Event (Put/Delete)
        end
        
    else Queue Mode (default)
        Config-->>DB: false
        DB->>Coord: coordinator::events::watch(prefix)
        Coord-->>DB: Receiver<Event>
        
        Note over DB,Coord: Events sent via queue
        DB->>Coord: put_event() / delete_event()
        Coord->>NATS: Queue message
        NATS-->>Coord: Deliver to subscribers
        Coord->>App: send Event
    end
    
    Note over App,NATS: Put/Delete Operations
    App->>DB: put(key, value)
    DB->>NATS: bucket.put()
    DB->>Config: use_kv_watcher(key)?
    alt Using Queue
        DB->>Coord: coordinator::events::put_event()
    else Using KV Watcher
        Note over DB,Coord: Skip queue - KV watcher handles it
    end
Loading

3 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@testdino-playwright-reporter
Copy link

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 0 0 0 0 0 0% 4.3s

View Detailed Results

@testdino-playwright-reporter
Copy link

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 0 0 0 0 0 0% 4.4s

View Detailed Results

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: hengfeiyang | Branch: feat/nats-kv-watcher | Commit: c1bf024

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 364 340 0 19 5 93% 4m 39s

View Detailed Results

@hengfeiyang hengfeiyang merged commit 3554e39 into main Oct 21, 2025
32 checks passed
@hengfeiyang hengfeiyang deleted the feat/nats-kv-watcher branch October 21, 2025 09:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants