-
Notifications
You must be signed in to change notification settings - Fork 715
fix: improve nats queue with limit policy #8436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR implements a comprehensive refactoring of the coordinator module structure alongside significant improvements to NATS queue management with configurable retention policies. The changes can be broken down into two main categories:
Module Restructuring: The entire cluster_coordinator module has been renamed and reorganized to coordinator, simplifying the module hierarchy across 20+ files. This includes moving files like src/infra/src/cluster_coordinator/ to src/infra/src/coordinator/ and updating all import paths throughout the codebase. The refactoring maintains identical functionality while providing cleaner, more concise module paths.
NATS Queue Enhancements: The Queue trait has been significantly expanded to support more sophisticated queue management. Key improvements include:
- Addition of
create_with_max_ageandcreate_with_retention_policy_and_max_agemethods for fine-grained retention control - Introduction of a
DeliverPolicyenum (All, Last, New) to control message delivery patterns - Enhanced
consumemethod that accepts optional delivery policies - New
event_max_ageconfiguration field (default 3600 seconds) for controlling individual event lifespans - Switch from Interest-based retention to time-based retention for coordinator events
- Improved reconnection logic with intelligent deliver policy selection
The coordinator events system now uses configurable max age policies instead of fixed Interest retention, providing operators with better control over event lifecycle management. The system intelligently handles reconnections by using DeliverPolicy::All to catch up on missed events versus DeliverPolicy::New for fresh connections.
These changes integrate well with OpenObserve's distributed architecture, where the coordinator manages cluster-wide events for alerts, pipelines, destinations, and AI prompts. The enhanced queue capabilities allow for more resilient message handling in distributed deployments while the module restructuring improves code maintainability.
Confidence score: 4/5
- This PR requires careful review due to extensive module restructuring and queue interface changes
- Score reflects the broad scope of changes affecting critical infrastructure components, though individual changes appear well-structured
- Pay close attention to the NATS queue implementation and coordinator event handling logic
29 files reviewed, 3 comments
PR Code Suggestions ✨Explore these optional code suggestions:
|
This PR change event RetentionPolicy::Interest to Retention::Limit and with a new TTL:
Because the consumer will got error:
And the consumer will delete by nats-server. then we will loss event with the policy
Interest.PR Type
Enhancement, Bug fix
Description
Introduce unified
infra::coordinatormoduleAdd NATS event max-age configurability
Improve coordinator subscription reconnect behavior
Replace legacy imports, adjust logging
Diagram Walkthrough
File Walkthrough
28 files
Use new coordinator path during registerAdd NATS event max age configurationSwitch AI prompt events to new coordinatorNew coordinator AI prompts event helpersNew alerts coordinator with watch and keysNew destination/template coordinator eventsStream max-age and resilient subscribeIntroduce unified coordinator moduleNew pipeline coordinator event helpersMigrate to new coordinator events APIRename module to coordinatorQueue API adds max-age and deliver policyImplement max-age, deliver policy, errorsStub max-age and deliver policy methodsUpdate watch prefix import pathUse new coordinator for alert eventsUse new coordinator for destination eventsUse new coordinator for template eventsSwitch to new coordinator accessorUse new coordinator for pipeline eventsSwitch to new coordinator accessorUse new coordinator for prompt eventsUse new coordinator for alert eventsSwitch to new coordinator accessorUse new coordinator for destination eventsUse new coordinator for pipeline eventsSwitch to new coordinator accessorUse new coordinator for template events1 files
Improve error formatting with {e}