Skip to content

Conversation

@Subhra264
Copy link
Contributor

@Subhra264 Subhra264 commented Sep 12, 2025

User description

Currently we use nats kv watch function to listen for various kv events from different nodes. This pr aims to replace kv with nats stream queue as the cluster coordinator for when nats is the cluster coordinator.


PR Type

Enhancement


Description

  • Introduce NATS-based coordinator events stream

  • Replace KV watch with queue watchers

  • Add constants for shared key prefixes

  • Improve enrichment table debug logging


Diagram Walkthrough

flowchart LR
  NatsKV["NATS KV (existing)"] -- "put/delete (need_watch)" --> EventsPub["Coordinator Events publish"]
  EventsPub -- "publish JSON" --> Stream["NATS Stream 'coordinator_events'"]
  Stream -- "subscribe" --> EventsSub["Coordinator Events subscriber"]
  EventsSub -- "dispatch by prefix" --> Watchers["Registered prefix watchers"]
  Watchers -- "deliver" --> Consumers["Alerts/Schema/OFGA/Nodes watchers"]
Loading

File Walkthrough

Relevant files
Enhancement
13 files
mod.rs
Add nodes key constant and use in watchers                             
+3/-2     
nats.rs
Initialize coordinator events during register                       
+12/-0   
alerts.rs
Prefix constant and export key parser; watch with constant
+4/-2     
events.rs
New coordinator events stream and watcher registry             
+235/-0 
mod.rs
Expose events module and tighten Db path                                 
+2/-3     
nats.rs
Publish coordinator events on put/delete; delegate watch 
+21/-93 
mod.rs
Add retention policy and API for create                                   
+10/-0   
nats.rs
Support retention policy and robust consumer loop               
+39/-3   
nop.rs
Stub create_with_retention_policy method                                 
+9/-1     
mod.rs
Introduce schema key constant and use it                                 
+7/-5     
alert.rs
Use namespaced alerts coordinator functions                           
+13/-8   
ofga.rs
Add OFGA key prefix constant; unify logs                                 
+6/-5     
mod.rs
Add detailed debug logs for fetch path                                     
+4/-0     

@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Concurrency Risk

COORDINATOR_WATCHER_PREFIXES is a Vec storing (prefix, Sender). There is no removal or drop handling; senders for dropped watchers will remain and be iterated forever, potentially leaking memory and causing repeated send failures. Consider a registration handle to deregister on drop, and filter out closed channels when send fails.

/// The prefix to watch for coordinator events.
type WatcherPrefix = (String, mpsc::Sender<crate::db::Event>);
static COORDINATOR_WATCHER_PREFIXES: Lazy<RwLock<Vec<WatcherPrefix>>> =
    Lazy::new(|| RwLock::new(Vec::new()));

pub async fn init() -> Result<()> {
    // create the coordinator stream if not exists
    create_stream().await?;

    let (tx, mut rx) = mpsc::channel(65535);
    tokio::task::spawn(async move {
        if let Err(e) = subscribe(tx).await {
            log::error!("[COORDINATOR::EVENTS] failed to subscribe to coordinator topic: {e}");
        }
    });
    tokio::task::spawn(async move {
        while let Some(event) = rx.recv().await {
            match event {
                CoordinatorEvent::Meta(event) => {
                    let r = COORDINATOR_WATCHER_PREFIXES.read().await;
                    for (prefix, tx) in r.iter() {
                        if event.key.starts_with(prefix) {
                            log::debug!(
                                "[COORDINATOR::EVENTS] sending event to watcher: [{:?}]{:?}",
                                event.action,
                                event.key
                            );
                            if let Err(e) = tx.send(event.clone().into()).await {
                                log::error!(
                                    "[COORDINATOR::EVENTS] failed to send event to watcher: {e}"
                                );
                            }
                        }
                    }
                }
            }
        }
    });
    Ok(())
}

pub async fn watch(prefix: &str) -> Result<Arc<mpsc::Receiver<crate::db::Event>>> {
    let (tx, rx) = mpsc::channel(65535);
    let mut w = COORDINATOR_WATCHER_PREFIXES.write().await;
    w.push((prefix.to_string(), tx));
    w.sort_by(|a, b| a.0.cmp(&b.0));
    Ok(Arc::new(rx))
}
Potential Panic

Multiple uses of Arc::get_mut(...).unwrap_or_else(|| panic!(...)) and Arc::get_mut(&mut events).unwrap() assume unique Arc ownership. If that invariant changes, this will panic. Prefer cloning receivers where appropriate or restructuring without requiring get_mut.

    }
};
let receiver = Arc::get_mut(&mut receiver).unwrap_or_else(|| {
    panic!("[COORDINATOR::EVENTS] failed to get mutable reference to receiver")
});
loop {
Key Mismatch

In delete with prefix handling, when iterating keys, delete_event is emitted with purge_key (encoded key or bucket-relative key) rather than the external logical key. This may break consumers expecting canonical keys with global prefixes. Ensure emitted keys match the watch prefix semantics.

    with_prefix
};
let new_key = if start_dt.is_some() {
    format!("{}/{}", new_key, start_dt.unwrap())
} else {
    new_key.to_string()
};
if !with_prefix {
    let purge_key = key_encode(&new_key);
    bucket
        .purge(purge_key)
        .await
        .map_err(|e| Error::Message(format!("[NATS:delete] bucket.purge error: {e}")))?;
    if need_watch {
        cluster_coordinator::events::delete_event(key, start_dt).await?;
    }
    return Ok(());
}
let keys = keys(&bucket, &new_key)
    .await
    .map_err(|e| Error::Message(format!("[NATS:delete] bucket.keys error: {e}")))?;
for purge_key in keys {
    bucket
        .purge(purge_key.clone())
        .await
        .map_err(|e| Error::Message(format!("[NATS:delete] bucket.purge error: {e}")))?;
    if need_watch {
        cluster_coordinator::events::delete_event(&purge_key, start_dt).await?;
    }
}

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR implements a significant architectural change by replacing NATS key-value (KV) watch functionality with NATS stream queues for cluster coordination. The changes introduce a new event-driven coordination system centered around a new cluster_coordinator::events module that handles publishing and consuming coordination events through NATS streams.

The core architectural shift involves:

Event System Implementation: A new events.rs file introduces CoordinatorEvent and MetaEvent structures with comprehensive event publishing/subscribing using NATS streams. This replaces the previous approach of directly watching KV operations with a more controlled event distribution system.

Queue Interface Enhancement: The Queue trait is extended with RetentionPolicy support, allowing different retention behaviors - Interest-based retention for coordination events (messages kept only while consumers exist) versus Limits-based retention for other use cases.

Database Layer Changes: The NATS database implementation (nats.rs) removes complex KV watching logic and instead emits explicit events through the cluster coordinator when need_watch is enabled during put/delete operations.

Key Centralization: Multiple modules extract hardcoded path strings into constants (NODES_KEY, SCHEMA_KEY, ALERT_WATCHER_PREFIX, OFGA_KEY_PREFIX) improving maintainability and preparing for the new coordination system.

The new architecture maintains the same API surface (watch, put_event, delete_event) while switching from passive KV observation to active stream-based event distribution. This provides better durability, ordering guarantees, and acknowledgment semantics for cluster coordination operations. The system uses Interest retention policy for coordination streams, ensuring events are only kept while there are active consumers, which is appropriate for coordination events that don't require long-term storage.

Confidence score: 3/5

  • This PR introduces complex architectural changes with potential reliability concerns including task panic handling, message acknowledgment on failures, and global state management issues
  • Score reflects significant complexity in the new event coordination system with multiple potential failure points including Arc reference handling and task spawning without error recovery
  • Pay close attention to src/infra/src/cluster_coordinator/events.rs and src/infra/src/db/nats.rs for potential race conditions and error handling issues

Context used:

Context - Avoid using expect with potentially failing operations; instead, handle the None case to prevent panics. (link)

13 files reviewed, 5 comments

Edit Code Review Bot Settings | Greptile

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Remove unsafe Arc mutations

Arc::get_mut on the subscription receiver and the dispatch loop assume single
ownership; in multi-task scenarios this can panic. Avoid Arc::get_mut entirely by
keeping receivers unshared within the task, and guard against channel closure by
pruning senders during dispatch to increase robustness.

src/infra/src/cluster_coordinator/events.rs [72-105]

 let (tx, mut rx) = mpsc::channel(65535);
-tokio::task::spawn(async move {
-    if let Err(e) = subscribe(tx).await {
-        log::error!("[COORDINATOR::EVENTS] failed to subscribe to coordinator topic: {e}");
+tokio::task::spawn({
+    let tx = tx.clone();
+    async move {
+        if let Err(e) = subscribe(tx).await {
+            log::error!("[COORDINATOR::EVENTS] failed to subscribe to coordinator topic: {e}");
+        }
     }
 });
 tokio::task::spawn(async move {
     while let Some(event) = rx.recv().await {
-        match event {
-            CoordinatorEvent::Meta(event) => {
-                let r = COORDINATOR_WATCHER_PREFIXES.read().await;
-                for (prefix, tx) in r.iter() {
-                    if event.key.starts_with(prefix) {
-                        ...
-                        if let Err(e) = tx.send(event.clone().into()).await {
-                            log::error!(
-                                "[COORDINATOR::EVENTS] failed to send event to watcher: {e}"
-                            );
-                        }
+        if let CoordinatorEvent::Meta(event) = event {
+            // prune closed senders before dispatch
+            {
+                let mut w = COORDINATOR_WATCHER_PREFIXES.write().await;
+                w.retain(|(_, tx)| !tx.is_closed());
+            }
+            let r = COORDINATOR_WATCHER_PREFIXES.read().await;
+            for (prefix, tx) in r.iter() {
+                if event.key.starts_with(prefix) {
+                    if let Err(e) = tx.send(event.clone().into()).await {
+                        log::error!("[COORDINATOR::EVENTS] failed to send event to watcher: {e}");
                     }
                 }
             }
         }
     }
 });
Suggestion importance[1-10]: 8

__

Why: The suggestion addresses a potential panic due to Arc::get_mut on a possibly shared receiver and enhances robustness by pruning closed senders during dispatch. This improves correctness and reliability of event delivery, making it a high-impact fix.

Medium
Emit consistent logical delete keys

The delete watch events use encoded KV keys (purge_key) for prefix case and the
original logical key for non-prefix, causing inconsistent event keys. Always emit
events with the logical new_key (unencoded, and aligned with what watchers expect)
to avoid downstream parse failures. Preserve start_dt handling consistently.

src/infra/src/db/nats.rs [288-329]

 async fn delete(
     &self,
     key: &str,
     with_prefix: bool,
     need_watch: bool,
     start_dt: Option<i64>,
 ) -> Result<()> {
-    let (bucket, new_key) = get_bucket_by_key(&self.prefix, key).await?;
-    let with_prefix = if start_dt.is_some() {
-        true
-    } else {
-        with_prefix
-    };
-    let new_key = if start_dt.is_some() {
-        format!("{}/{}", new_key, start_dt.unwrap())
-    } else {
-        new_key.to_string()
-    };
+    let (bucket, base_key) = get_bucket_by_key(&self.prefix, key).await?;
+    let with_prefix = if start_dt.is_some() { true } else { with_prefix };
+    let new_key = if let Some(ts) = start_dt { format!("{}/{}", base_key, ts) } else { base_key.to_string() };
+
     if !with_prefix {
         let purge_key = key_encode(&new_key);
         bucket
             .purge(purge_key)
             .await
             .map_err(|e| Error::Message(format!("[NATS:delete] bucket.purge error: {e}")))?;
         if need_watch {
-            cluster_coordinator::events::delete_event(key, start_dt).await?;
+            // emit logical key, not encoded/bucket-prefixed key
+            cluster_coordinator::events::delete_event(&new_key, start_dt).await?;
         }
         return Ok(());
     }
-    let keys = keys(&bucket, &new_key)
+
+    let listed = keys(&bucket, &new_key)
         .await
         .map_err(|e| Error::Message(format!("[NATS:delete] bucket.keys error: {e}")))?;
-    for purge_key in keys {
+    for k in listed {
+        // k is the encoded key returned from NATS; purge it
         bucket
-            .purge(purge_key.clone())
+            .purge(k.clone())
             .await
             .map_err(|e| Error::Message(format!("[NATS:delete] bucket.purge error: {e}")))?;
         if need_watch {
-            cluster_coordinator::events::delete_event(&purge_key, start_dt).await?;
+            // emit logical key for watchers: strip encoding context and use new_key prefix
+            cluster_coordinator::events::delete_event(&new_key, start_dt).await?;
         }
     }
     Ok(())
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly spots inconsistent delete event keys between non-prefix and prefix paths and proposes emitting the logical key consistently, which can prevent downstream parsing issues. However, it loses fidelity by emitting only the parent new_key for each purged subkey in the prefix case, which may reduce specificity; impact is moderate but valid.

Medium
General
Prune closed watchers to avoid leaks

Watchers are never removed, leading to unbounded growth and leaks if callers drop
receivers. Track weak sender state and prune closed channels before pushing events
to prevent memory buildup and silent delivery failures. Add periodic cleanup when
registering and during dispatch.

src/infra/src/cluster_coordinator/events.rs [108-114]

 pub async fn watch(prefix: &str) -> Result<Arc<mpsc::Receiver<crate::db::Event>>> {
     let (tx, rx) = mpsc::channel(65535);
-    let mut w = COORDINATOR_WATCHER_PREFIXES.write().await;
-    w.push((prefix.to_string(), tx));
-    w.sort_by(|a, b| a.0.cmp(&b.0));
+    {
+        let mut w = COORDINATOR_WATCHER_PREFIXES.write().await;
+        // prune closed senders before adding a new one
+        w.retain(|(_, tx)| !tx.is_closed());
+        w.push((prefix.to_string(), tx));
+        w.sort_by(|a, b| a.0.cmp(&b.0));
+    }
     Ok(Arc::new(rx))
 }
Suggestion importance[1-10]: 6

__

Why: This is a reasonable improvement to avoid accumulating closed senders by pruning with is_closed() during registration; it's accurate and low-risk but a minor maintainability enhancement rather than a critical fix.

Low

@hengfeiyang hengfeiyang merged commit 6f40d2f into main Sep 13, 2025
15 of 16 checks passed
@hengfeiyang hengfeiyang deleted the nats_queue_main branch September 13, 2025 14:51
Subhra264 added a commit that referenced this pull request Sep 16, 2025
hengfeiyang added a commit that referenced this pull request Sep 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants