Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

@hengfeiyang hengfeiyang commented Oct 23, 2025

User description

Support storage type for nat queue.

New env:

ZO_NATS_EVENT_STORAGE="memory"

Default is memory, support memory or file.

Why we need this?

Summary

Benchmark by tools: https://github.com/openobserve/nats-benchmark
Storage with file:

🚀 Throughput:
  Operations/sec:        10965.17
  Throughput:            10.71 MB/s
  Data Transferred:      642.50 MB

Storage with memory:

🚀 Throughput:
  Operations/sec:        16957.44
  Throughput:            16.56 MB/s
  Data Transferred:      993.61 MB

Storage with file

================================================================================
NATS Benchmark Results - JetStream PUBLISH
================================================================================
📊 Overall Statistics:
  Duration:              60.00s
  Total Operations:      657918
  Successful:            657918
  Failed:                0
  Success Rate:          100.00%
🚀 Throughput:
  Operations/sec:        10965.17
  Throughput:            10.71 MB/s
  Data Transferred:      642.50 MB
⏱️ Latency (microseconds):
  Min:                   240
  Max:                   12519
  Mean:                  911.03
  StdDev:                394.07
  p50:                   836
  p90:                   1284
  p95:                   1654
  p99:                   2123
  p99.9:                 5111

Storage with memory:

================================================================================
NATS Benchmark Results - JetStream PUBLISH
================================================================================
📊 Overall Statistics:
  Duration:              60.00s
  Total Operations:      1017459
  Successful:            1017459
  Failed:                0
  Success Rate:          100.00%
🚀 Throughput:
  Operations/sec:        16957.44
  Throughput:            16.56 MB/s
  Data Transferred:      993.61 MB
⏱️ Latency (microseconds):
  Min:                   193
  Max:                   12767
  Mean:                  588.76
  StdDev:                403.22
  p50:                   424
  p90:                   1089
  p95:                   1214
  p99:                   1653
  p99.9:                 4907

PR Type

Enhancement


Description

  • Add ZO_NATS_EVENT_STORAGE config option

  • Introduce queue storage type enum

  • Replace create APIs with config builder

  • Configure coordinator stream via builder


Diagram Walkthrough

flowchart LR
  CFG["Config: add `event_storage`"] -- "read env" --> EVENTS["Coordinator events"]
  EVENTS -- "build QueueConfig (age, retention, storage)" --> QUEUE_API["Queue trait: create_with_config"]
  QUEUE_API -- "convert to NATS config" --> NATS_IMPL["NATS queue impl"]
  NATS_IMPL -- "create/get JetStream stream" --> NATS_JS["NATS JetStream"]
Loading

File Walkthrough

Relevant files
Enhancement
config.rs
Add event storage config to NATS settings                               

src/config/src/config.rs

  • Add ZO_NATS_EVENT_STORAGE env config.
  • Default storage set to memory.
  • Provide help text for storage option.
+6/-0     
events.rs
Coordinator stream uses configurable storage and builder 

src/infra/src/coordinator/events.rs

  • Use new storage config to select storage type.
  • Build QueueConfig with max age and retention.
  • Switch to create_with_config API.
  • Minor variable renames for clarity.
+18/-12 
mod.rs
Introduce queue configuration builder and storage type     

src/infra/src/queue/mod.rs

  • Add StorageType, QueueConfig, and builder.
  • Simplify Queue trait to create_with_config.
  • Remove multiple create variants.
  • Provide defaults and builder methods.
+55/-12 
nats.rs
NATS impl supports unified config and storage mapping       

src/infra/src/queue/nats.rs

  • Map StorageType to JetStream storage.
  • Implement create_with_config using builder values.
  • Set retention, storage, age, replicas, and max bytes.
  • Remove older create methods in impl.
+21/-29 

@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Config Validation

The new event_storage env is a free-form String parsed via lowercase comparison to "memory" and otherwise defaults to "file". Consider validating/normalizing input and rejecting unexpected values to avoid silent misconfiguration.

#[env_config(
    name = "ZO_NATS_EVENT_STORAGE",
    help = "Set the storage type for the event stream, default is: memory, other value is: file",
    default = "memory"
)]
pub event_storage: String,
Max Age Units

In create_with_config, when max_age is None, queue_max_age is treated as days and converted to seconds. Ensure this aligns with the documented semantics elsewhere and remains consistent with previous behavior to avoid retention regressions.

async fn create_with_config(&self, topic: &str, config: super::QueueConfig) -> Result<()> {
    let max_age = match config.max_age {
        Some(dur) => dur,
        None => {
            let max_age = config::get_config().nats.queue_max_age; // days
            std::time::Duration::from_secs(max(1, max_age) * 24 * 60 * 60) // seconds
        }
    };
    let cfg = config::get_config();
Storage Default Choice

Coordinator stream storage defaults to File in the builder; config default is "memory". Verify the builder default or explicit selection matches intended default behavior to prevent unexpected memory/disk usage.

    let storage = if cfg.nats.event_storage.to_lowercase() == "memory" {
        queue::StorageType::Memory
    } else {
        queue::StorageType::File
    };
    let q = queue::get_queue().await;
    let config = queue::QueueConfigBuilder::new()
        .max_age(max_age_secs)
        .retention_policy(queue::RetentionPolicy::Limits)
        .storage_type(storage)
        .build();
    q.create_with_config(COORDINATOR_STREAM, config).await
}

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix unsafe default storage

Set a safe default for storage_type to align with the documented default of memory
to prevent unexpected persistence and disk usage. Adjust the builder’s default to
StorageType::Memory.

src/infra/src/queue/mod.rs [60-64]

 pub struct QueueConfig {
     pub max_age: Option<std::time::Duration>,
     pub retention_policy: RetentionPolicy,
     pub storage_type: StorageType,
 }
+...
+impl QueueConfigBuilder {
+    pub fn new() -> Self {
+        Self {
+            config: QueueConfig {
+                max_age: None,
+                retention_policy: RetentionPolicy::Limits,
+                storage_type: StorageType::Memory,
+            },
+        }
+    }
+    ...
+}
Suggestion importance[1-10]: 7

__

Why: The builder currently defaults storage_type to File, while the new config help suggests defaulting to memory. Aligning the default prevents unintended disk usage and matches documented behavior; the change is accurate and beneficial.

Medium
Avoid parameter shadowing

Avoid shadowing the config parameter by reusing the name for the JetStream config;
it risks accidental misuse of fields and readability issues. Use a distinct name for
the JetStream config to ensure you reference the intended config for retention and
storage.

src/infra/src/queue/nats.rs [103-112]

-let config = jetstream::stream::Config {
+let js_config = jetstream::stream::Config {
     name: topic_name.to_string(),
     subjects: vec![topic_name.to_string(), format!("{}.*", topic_name)],
     retention: config.retention_policy.into(),
     storage: config.storage_type.into(),
     num_replicas: cfg.nats.replicas,
     max_bytes: cfg.nats.queue_max_size,
     max_age,
     ..Default::default()
 };
+_ = jetstream.get_or_create_stream(js_config).await?;
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies shadowing of the config parameter by a local config for JetStream, which can hurt readability and cause confusion. Renaming to js_config is accurate and low-risk, improving maintainability without functional change.

Low
General
Validate storage type input

Normalize and validate event_storage to avoid unexpected values silently defaulting
to file. Handle only recognized values and log or fallback explicitly to memory to
match the config help text.

src/infra/src/coordinator/events.rs [129-133]

-let storage = if cfg.nats.event_storage.to_lowercase() == "memory" {
-    queue::StorageType::Memory
-} else {
-    queue::StorageType::File
+let storage = match cfg.nats.event_storage.to_lowercase().as_str() {
+    "memory" => queue::StorageType::Memory,
+    "file" => queue::StorageType::File,
+    other => {
+        log::warn!("[COORDINATOR::EVENTS] unknown event_storage '{}', defaulting to memory", other);
+        queue::StorageType::Memory
+    }
 };
Suggestion importance[1-10]: 7

__

Why: The current code treats any non-"memory" value as "file"; adding explicit matching with a warning improves robustness and aligns with the documented acceptable values. The improved code cleanly reflects the intended behavior.

Medium

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

Added configurable event storage type for NATS coordinator streams, allowing users to choose between memory-based or file-based storage via the ZO_NATS_EVENT_STORAGE environment variable.

Key Changes:

  • Added new config field event_storage (default: "memory") to control NATS JetStream storage type
  • Refactored queue creation API from multiple specialized methods to a unified builder pattern using QueueConfigBuilder
  • Introduced StorageType enum with Memory and File variants
  • Updated coordinator event stream creation to use configurable storage based on environment settings

Impact:

  • Simplifies queue configuration API with cleaner builder pattern
  • Provides flexibility for production deployments to choose appropriate storage based on durability vs performance requirements
  • Backward compatible - default behavior uses memory storage as before

Confidence Score: 5/5

  • This PR is safe to merge with minimal risk
  • The changes follow clean refactoring principles with a builder pattern that improves code maintainability. The removed methods are not used elsewhere in the codebase, ensuring no breaking changes. The new configuration is well-designed with sensible defaults (memory storage), and the implementation correctly handles both memory and file storage types
  • No files require special attention

Important Files Changed

File Analysis

Filename Score Overview
src/config/src/config.rs 5/5 Added new configuration field event_storage to control NATS event stream storage type (memory vs file)
src/infra/src/queue/mod.rs 5/5 Refactored queue creation API by removing multiple methods and introducing builder pattern with QueueConfig and QueueConfigBuilder
src/infra/src/queue/nats.rs 5/5 Updated NATS implementation to use new builder-based configuration, added StorageType conversion
src/infra/src/coordinator/events.rs 5/5 Updated stream creation to use new builder pattern and configurable storage type based on environment variable

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant Config as config::get_config()
    participant Events as coordinator::events
    participant QueueBuilder as QueueConfigBuilder
    participant Queue as queue::get_queue()
    participant NATS as NATS JetStream

    App->>Events: create_stream()
    Events->>Config: get event_max_age
    Events->>Config: get event_storage
    
    alt event_storage == "memory"
        Events->>Events: storage = StorageType::Memory
    else event_storage != "memory"
        Events->>Events: storage = StorageType::File
    end
    
    Events->>Queue: get_queue().await
    Events->>QueueBuilder: new()
    Events->>QueueBuilder: .max_age(max_age_secs)
    Events->>QueueBuilder: .retention_policy(Limits)
    Events->>QueueBuilder: .storage_type(storage)
    Events->>QueueBuilder: .build()
    QueueBuilder-->>Events: QueueConfig
    
    Events->>Queue: create_with_config(COORDINATOR_STREAM, config)
    Queue->>NATS: get_or_create_stream(stream_config)
    NATS-->>Queue: Stream created/exists
    Queue-->>Events: Result<()>
    Events-->>App: Stream ready
Loading

4 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: hengfeiyang | Branch: nats/queue-config | Commit: c412bfb

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 365 344 0 19 2 94% 4m 38s

View Detailed Results

@hengfeiyang hengfeiyang merged commit 964d975 into main Oct 23, 2025
33 checks passed
@hengfeiyang hengfeiyang deleted the nats/queue-config branch October 23, 2025 12:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants