-
Notifications
You must be signed in to change notification settings - Fork 111
Introduce a generic pubsub mod in cdk-common
#1098
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cdk-common
245fea0 to
e6ad93f
Compare
e6ad93f to
ec2e7b2
Compare
|
Does this address this issue? #1104 |
We're still checking on the db. But we can check to the LN backend easily. I'll make it happen before setting this PR to reviewable. For context, that would happen on |
a1cb722 to
4333e67
Compare
@thesimplekid I deal with this in many places introducing a thin wapper on top of the Sender, for instance |
a71535c to
6d822aa
Compare
|
@crodas can you rebase? |
7f07aca to
c6327f6
Compare
asmogo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only one little improvement suggestion.
I will test this together with FFI asap.
…rough wallet/mint/WS/FFI
Refactor the pub/sub engine to a single Spec trait, move Event alongside it,
and propagate Arc-backed subscription IDs across the stack. This simplifies
generics, clarifies responsibilities, and preserves coalescing +
latest-on-subscribe semantics.
- **Single source of truth:** `Spec` owns `Topic`, `Event`, `SubscriptionId`,
`Context`, new_instance, and fetch_events.
- **Lean & explicit API:** Remove Topic trait split;
`Subscriber::send(Event)` carries sub-ID internally.
- **Performance/ergonomics:** `Arc<SubscriptionId>` avoids heavy clones and
makes channel/task hops trivial.
- Introduce `pub_sub/typ.rs` with:
- trait `Spec`
- trait `Event` colocated with Spec.
- Remove `pub_sub/event.rs` fold `Event` into `typ.rs`.
- Make `Pubsub<S>` generic over `Spec` and store `Arc<S>`.
- The subscriber holds `Arc<SubscriptionId>` and deduplicates the latest
entry per subscription.
- SubscriptionRequest: rename SubscriptionName → SubscriptionId; return
`Arc<...>` from `subscription_name()`.
- Remote consumer (Transport) now parameterized by `Spec`; control types
updated:
- `StreamCtrl<S>`, `SubscribeMessage<S>`, internal caches keyed by
`S::Topic`.
- Mint/wallet:
- Mint: `MintPubSubSpec` (Context = `DynMintDatabase`),
`PubSubManager(Pubsub<MintPubSubSpec>)`.
- Wallet: lightweight MintSubTopics Spec with `Context = ()`.
- IDs go Arc end-to-end:
- cdk-axum WS maps `HashMap<Arc<SubId>, JoinHandle<()>>`, publisher sends
`(Arc<SubId>, NotificationPayload)`.
- `subscription::{Params, WalletParams}` now use `Arc<...>`.
- cdk-ffi conversions & wallet glue updated.
- Integration tests updated for new types.
- Coalescing unchanged: multiple local subs to the same topic are combined
into a single remote sub.
- Backfill via `Spec::fetch_events(topics, Subscriber)`; Subscriber enforces
latest-only dedupe per subscription.
**Result:** a slimmer, more maintainable pub/sub core that’s easier to embed
across mint, wallet, transports, and FFI without sacrificing performance or
semantics.
8c1b161 to
99d0646
Compare
An Architectural Analysis of the Refactored Publish-Subscribe Module
Section 1: Foundational Primitives of the New Pub-Sub Architecture
The recent refactoring of the publish-subscribe (pub-sub) module represents a paradigm shift in the system's approach to event-driven communication. The core of this transformation lies in extracting a generic, transport-agnostic pub-sub engine into the
cdk-commoncrate. This new engine is built upon a set of foundational traits that establish a clear, contract-based architecture. These traits decouple the core eventing logic from domain-specific concerns, promoting reusability, extensibility, and testability. This section provides a detailed analysis of these core primitives, which form the conceptual bedrock of the new system.1.1. The
SpecTrait: A Verbatim AnalysisAt the heart of the new architecture is the
Spectrait, which serves as a central configuration and behavior contract for a specific publish-subscribe (pub-sub) domain. It replaces the previous monolithicManagerwith a declarative specification that binds all necessary components together.Verbatim Trait Definition:
The
Spectrait is the cornerstone of the system's newfound generic nature. Through its associated types—Topic,Event,SubscriptionId, andContext—it defines the complete set of types for a given pub-sub implementation. This allows the corePubsubengine to remain entirely agnostic of the domain it serves. The same engine can power mint-side eventing, wallet-side notifications, or any future eventing domain simply by being provided with a differentSpecimplementation at compile time.This represents a fundamental shift from an imperative design, where the old
Managerstruct contained hardwired logic, to a declarative one. Developers can now understand the capabilities and data flow of an entire eventing domain by examining itsSpecimplementation, a much higher level of abstraction that significantly enhances system comprehensibility.Furthermore, the
fetch_eventsformalizes the "latest-on-subscribe" pattern, a common requirement in pub-sub systems and a specific requirement of NUT-17. By making this a required method on theSpectrait, the framework delegates the responsibility of sourcing historical or current-state data to the domain-specific implementation. The corePubsubengine triggers this callback for every new subscription, but remains decoupled from persistence concerns like database access. This approach maintains the event bus's cleanliness while providing essential functionality.1.2. The
EventTrait: Defining the Contract for Publishable DataThe
Eventtrait establishes a critical yet straightforward contract for any data type that can be published through the system. It decouples thePubsubengine from the concrete structure of the events themselves.Verbatim Trait Definition:
The sole responsibility of the
Eventtrait, via itsget_topicsmethod, is to define a mapping from a domain object to one or more indexableTopic's. This abstraction is what enables thePubsubengine'sBTreeMap-based indexing and matching mechanism to operate on any data type. The engine does not need to understand the event's fields; it only needs to know which topics the event corresponds to. The introduction ofMintEvent<T>in thecdkcrate, which wraps theNotificationPayloadto satisfy this trait contract, is a direct application of this powerful pattern.1.3. The
SubscriptionRequestTrait: Abstracting Subscriber IntentThe
SubscriptionRequesttrait introduces a vital layer of abstraction between a client's high-level intent and the low-level topics used for internal routing.Verbatim Trait Definition:
This trait encapsulates the logic for parsing and validating a client's request (such as a NUT-17
Paramsobject) and converting its filters into a list of concreteTopic's that thePubsubengine can index. This effectively separates the concern of request parsing and validation from the core subscription management logic. The changes observed incrates/cdk-common/src/subscription.rs, where bothParams(for the mint) andWalletParams(for the wallet) are refactored to implementSubscriptionRequest, demonstrate this principle in action, cleanly integrating domain-specific request formats into the generic framework.1.4. Conceptual Interaction Model
The relationships between these foundational traits form a cohesive and logical system. The following diagram illustrates their static dependencies and conceptual roles within the architecture.
graph TD subgraph Foundational Primitives Spec -- defines --> Event_Trait(Event Trait); Spec -- defines --> Topic_Type(Topic Type); Spec -- defines --> SubscriptionId_Type(SubscriptionId Type); Event_Trait -- "maps to" --> Topic_Type; SubscriptionRequest_Trait(SubscriptionRequest Trait) -- "converts to" --> Topic_Type; SubscriptionRequest_Trait -- "defines" --> SubscriptionId_Type; end Pubsub_Engine(Pubsub Engine) -- "is generic over" --> Spec; Subscriber -- "submits a" --> SubscriptionRequest_Trait; Pubsub_Engine -- "publishes an" --> Event_Trait;Section 2: The
PubsubEngine: A High-Performance Local Event BusAt the core of the new architecture is the
Pubsub<S>engine, a high-performance, in-memory event bus located incrates/cdk-common/src/pub_sub/pubsub.rs. This component is responsible for the efficient management of local subscriptions and the fan-out of published events to all relevant subscribers. Its design prioritizes performance, thread safety, and robust resource management.2.1. The
Pubsub<S>Struct: Core ResponsibilitiesThe
Pubsubstruct serves as the central coordinator for all local eventing activities.The key fields of the
Pubsubstruct are:inner: Arc<S>: AnArc-wrapped instance of a type that implements theSpectrait. This allows the genericPubsubengine to delegate domain-specific behaviors, such as fetching initial state upon subscription, to the concrete implementation.listeners_topics: TopicTree<S>: This is the high-performance indexing engine, defined as a type alias forArc<RwLock<BTreeMap<(<T as Spec>::Topic, usize), Subscriber<T>>>>. The choice of data structures here is significant. TheBTreeMapprovides efficient, sorted key-value storage, enabling rapid range-based lookups for matching subscribers to a given topic. The composite key,(Topic, usize), is crucial for uniqueness; theusizecomponent, populated by an atomic counter, ensures that multiple distinct subscriptions to the same topic can coexist in the map.active_subscribers: Arc<AtomicUsize>: A thread-safe atomic counter provides a non-blocking mechanism for monitoring the total number of active subscriptions, which is useful for telemetry and system health checks.A notable micro-architectural decision is the use of
parking_lot::RwLockinstead of the standardtokio::sync::RwLock. Thetokiolock is async-aware and yields to the runtime scheduler when contended, which is ideal for long-held locks. However, the lock contention within thePubsubengine is expected to be very brief—reads during event publication and writes during subscription are extremely fast map operations. For such short-lived critical sections, a spinning, blocking mutex likeparking_lot::RwLockoffers significantly better performance by avoiding the overhead of async context switching. This choice reflects a nuanced understanding of the workload and a commitment to performance optimization.2.2. The Subscription Lifecycle and Resource Management
The
subscribemethod returns anActiveSubscription<S>struct to the caller. This struct acts as a guard, holding the receiving end of atokio::sync::mpscchannel for event delivery.Crucially,
ActiveSubscription<S>implements theDroptrait. When an instance of this struct goes out of scope (e.g., the client is no longer interested in the subscription), itsdropimplementation is automatically invoked. This implementation performs the necessary cleanup: it removes all of the subscription's associated entries from thelisteners_topicsBTreeMapand atomically decrements theactive_subscriberscounter. This automatic, deterministic cleanup mechanism guarantees that system resources are reclaimed promptly and prevents common issues like memory leaks or dangling subscribers.2.3. Event Publication and Dispatch Flow
The process of publishing an event and dispatching it to all relevant subscribers is highly optimized. The architecture provides two distinct methods for publication: an asynchronous, non-blocking
publishmethod, and a synchronous, blockingpublish_nowmethod. Thepublishmethod spawns a new task to handle the fan-out, immediately returning control to the caller, which is ideal for performance-sensitive contexts like request handlers. In contrast,publish_nowperforms the dispatch inline, which is suitable for contexts requiring sequential consistency, such as within a state machine or transaction.The core logic for both is contained within the
publish_internalfunction, as visualized in the following flowchart.flowchart TD A[publish_now] --> B{Get read lock on listeners_topics}; B --> C{event.get_topics}; C --> D{For each topic}; D --> E{Perform range query on B-Tree for topic}; E --> F{For each matching subscriber}; F --> G{Check if already sent to this subscriber}; G -- No --> H[sender.send]; G -- Yes --> F; H --> F; F -- No more subscribers for topic --> D; D -- No more topics --> I; I --> J[End];This flow efficiently identifies all interested subscribers for an event's topics while ensuring that each unique subscriber receives the event only once, even if they are subscribed to multiple matching topics for that event.
Section 3: The Remote Consumer Framework: Bridging Distributed Systems
The most significant architectural innovation introduced by this refactoring is the comprehensive remote consumer framework, located in the new file
crates/cdk-common/src/pub_sub/remote_consumer.rs. This framework provides a standardized, resilient, and highly efficient mechanism for a local component (e.g., a wallet) to subscribe to and consume events from a remote producer (e.g., a mint). It abstracts away the complexities of network communication, state management, and error handling.3.1. The
TransportTrait: A Verbatim AnalysisThe lynchpin of the remote consumer's flexibility is the
Transporttrait. It completely abstracts the underlying network protocol, decoupling the high-level consumer logic from the low-level details of sending and receiving messages.Verbatim Trait Definition:
Source:
crates/cdk-common/src/pub_sub/remote_consumer.rs1This trait establishes a simple yet powerful contract. The
streammethod is designed for efficient, persistent, server-push protocols like WebSockets, while thepollmethod provides a necessary fallback for traditional request-response protocols like HTTP. This dual-method approach is central to the framework's resilience. This design is a textbook example of the Bridge pattern, where theConsumer(the high-level abstraction) is permanently decoupled from theTransport(the implementation interface). This allows them to vary independently; a new network protocol can be supported by adding a newTransportimplementation without any changes to theConsumer's sophisticated logic.3.2. The
Consumer<T>Struct: Advanced Subscription ManagementThe
Consumer<T>struct is the high-level, client-side manager that orchestrates remote subscriptions. It introduces several advanced features that dramatically improve the system's efficiency and robustness.Request Coalescing: The
subscribemethod intelligently manages upstream network connections. When multiple local clients subscribe to the same remote topic, theConsumerestablishes only a single subscription with the remote producer. Incoming events for that topic are then efficiently fanned out to all interested local subscribers via the internalPubsubengine. This is a critical optimization that minimizes network chatter, reduces server load, and conserves client-side resources.Automatic Protocol Fallback and Resilience: The main connection management loop within the
Consumeris designed for resilience. It first attempts to use thetransport.stream()method. If this method returns anError::NotSupportedor fails repeatedly, theConsumerautomatically and transparently transitions to a polling strategy, callingtransport.poll()at regular intervals. This ensures that the client can maintain functionality even when connecting to servers that do not support WebSockets or when operating in network environments that prohibit long-lived connections.Built-in Exponential Backoff: The connection loop incorporates a classic exponential backoff strategy for handling connection failures. Upon a failed connection attempt, it calculates a
retry_attimestamp based on an incrementally increasingbackoffduration, which is capped at a maximum value. This prevents a client from engaging in rapid, aggressive reconnection attempts that could overwhelm a temporarily unavailable server (a "thundering herd" problem). Building this resilience directly into the framework ensures that all remote subscriptions benefit from a standardized, robust error recovery mechanism.Latest-Event Caching: The
Consumermaintains acached_eventsmap that stores the most recent event received for each subscribed topic. When a new local subscriber is added, it is immediately served this cached event, fulfilling the "latest-on-subscribe" pattern without the latency of an additional network request. The cache is intelligently managed; an entry is evicted only when the last local subscriber for that topic unsubscribes, ensuring data freshness and efficient memory usage.3.3. Remote Subscription State Management
Communication between the
Consumer's main logic and its background transport task is managed via a simple, internal message-passing protocol defined by theStreamCtrlenum.Enum Definition Snippet:
Source:
crates/cdk-common/src/pub_sub/remote_consumer.rs1This enum allows the
Consumerto dynamically alter the set of active remote subscriptions on a live connection. For instance, when a new local subscriber requests a previously-unsubscribed topic, aStreamCtrl::Subscribemessage is sent to the transport task, which then sends the appropriate protocol message over the network. This avoids the high cost of tearing down and re-establishing the entire network connection for minor changes to the subscription set.3.4. Remote Consumer Architectural Blueprint
The following diagram illustrates the components and interactions within the remote consumer framework, showing how it mediates between local application code and a remote producer.
graph TD subgraph Local Client AppCode1[App Code 1] --> |subscribes| Consumer; AppCode2[App Code 2] --> |subscribes| Consumer; end subgraph Consumer Framework Consumer -- "uses" --> InternalPubsub(Internal Pubsub Engine); Consumer -- "manages" --> TransportTask(Background Transport Task); TransportTask -- "implements" --> TransportTrait(Transport Trait); Consumer -- "sends StreamCtrl msgs" --> TransportTask; end subgraph Network TransportTask -- "sends/receives over network" --> RemoteProducer(Remote Producer); end InternalPubsub -- "delivers events to" --> AppCode1; InternalPubsub -- "delivers events to" --> AppCode2;Section 4: System-Wide Integration and Concrete Implementations
The true power of the new abstract framework is realized through its concrete implementations within the mint and wallet components. These implementations demonstrate how the generic primitives are adapted to solve specific domain problems, providing a clear separation between the reusable engine and the application-specific logic.
4.1. The Mint-Side
MintPubSubSpecFor the mint, the abstract pub-sub framework is concretized in the new
crates/cdk/src/mint/subscription.rsfile.This file introducesMintPubSubSpec, which implements theSpectrait for the mint's eventing domain.The most critical part of this implementation is the
fetch_eventsmethod. When a new subscription is created, this method is invoked by thePubsubengine. It intelligently parses the requested topics, distinguishing betweenProofStaterequests and various quote status requests. It then queries theDynMintDatabaseto retrieve the current state for those entities. The results are packaged intoMintEventobjects and sent back to the new subscriber, fulfilling the "latest-on-subscribe" requirement. This is a perfect demonstration of theSpectrait's role in integrating domain-specific data sources (in this case, a database) into the generic eventing flow.The
PubSubManageron the mint side has been refactored into a lightweight wrapper aroundPubsub<MintPubSubSpec>. It no longer contains complex management logic itself but instead provides a set of convenient, domain-specific helper methods likeproof_state,mint_quote_payment, andmelt_quote_statusthat simplify the process of publishing events from elsewhere in the mint's codebase.4.2. The Wallet-Side
SubscriptionClientOn the client side, the wallet's subscription logic has been completely overhauled to adopt the new remote consumer framework. The refactored
crates/cdk/src/wallet/subscription/mod.rsintroduces theSubscriptionClient, which serves as the wallet's concreteTransportimplementation.This
SubscriptionClientelegantly implements the dual-protocol contract of theTransporttrait:poll()method: This method provides the HTTP fallback mechanism. When invoked by theConsumer, it makes direct, stateless HTTP requests to the mint using the existinghttp_clientfor various checks likeget_mint_quote_statusandpost_check_state. The responses are then wrapped and forwarded to theConsumer's internal relay for local fan-out.stream()method: This method, defined in the newws.rsfile, handles the preferred WebSocket protocol. It establishes a persistent connection to the mint's/v1/wsendpoint. It listens forStreamCtrlmessages from theConsumerand translates them into the appropriate JSON-RPCsubscribeorunsubscribemessages to be sent over the WebSocket. Conversely, it parses incoming WebSocket notifications from the mint and passes them to theConsumer's relay.The top-level
SubscriptionManagerin the wallet is now a factory that manages a map ofMintUrltoConsumer<SubscriptionClient>instances. This ensures that for any given mint, only oneConsumer—and therefore only one underlying network connection (be it WebSocket or polling HTTP)—is ever created, centralizing all subscription management for that mint in a single, efficient component.4.3. End-to-End Event Flow
The following sequence diagram illustrates the complete, end-to-end flow of a wallet subscribing to a mint quote status update, showcasing the seamless interaction between all components of the new architecture.
sequenceDiagram participant Wallet as Wallet App participant SubManager as Wallet SubscriptionManager participant Consumer as Consumer<SubscriptionClient> participant Transport as SubscriptionClient (Transport) participant Mint as Mint WebSocket Server participant MintPubsub as Mint PubSubManager Wallet->>SubManager: subscribe(mint_url, quote_id) SubManager->>Consumer: subscribe(quote_id) Consumer->>Transport: stream() connection initiated Transport->>Mint: WebSocket CONNECT Transport-->>Consumer: Connection OK Consumer->>Transport: StreamCtrl::Subscribe(quote_id) Transport->>Mint: {"method": "subscribe", "params":...} Mint->>MintPubsub: subscribe(quote_id) Note over Mint,MintPubsub: Mint fetches current quote state from DB MintPubsub-->>Mint: current_state_event Mint-->>Transport: Notification(current_state) Transport-->>Consumer: reply_to.send(event) Consumer-->>Wallet: event received loop Later Note over MintPubsub,Mint: Quote is paid MintPubsub->>MintPubsub: publish(paid_event) MintPubsub-->>Mint: paid_event Mint-->>Transport: Notification(paid_event) Transport-->>Consumer: reply_to.send(event) Consumer-->>Wallet: event received endSection 5: A Comparative Analysis of Architectural Improvements
The refactoring of the pub-sub module is not merely an incremental improvement but a fundamental re-architecting of the system's communication layer. By contrasting the old and new designs, the strategic benefits in terms of decoupling, reusability, resilience, and performance become strikingly clear.
5.1. From Tightly Coupled to Decoupled and Reusable
The previous architecture, evidenced by the now-deleted
cdk/src/pub_sub.rsfile, was a monolithic implementation confined within thecdkcrate. TheManagerstruct was tightly coupled to its internal indexing logic and the specificNotificationPayloadtype. This created a rigid system that was difficult to test in isolation, adapt for new use cases, or share across different parts of the platform.The new architecture dismantles this monolith. The entire core pub-sub engine has been extracted into the
cdk-commoncrate, establishing it as a foundational, reusable utility. The trait-based design completely decouples the engine from any specific domain, allowing it to be reused for both the mint and the wallet, and any future components, with zero modification to the core logic.5.2. Table: Architectural Comparison
The following table provides a side-by-side comparison of the key architectural characteristics of the old and new systems, highlighting the strategic value of each change.
cdk/src/pub_sub.rscdk-common/src/pub_sub/ManagerstructSpec)Indexabletrait,IndexstructEventtrait mapping toTopicIndexableParamswrapperSubscriptionRequesttraitwallet/subscriptionConsumer/TransportTransporttraitActiveSubscriptionwithDropActiveSubscriptionwithDrop5.3. Gains in System Resilience and Performance
The new architecture delivers tangible improvements in both system resilience and performance, particularly on the client side. These are not minor tweaks but foundational features that enable the construction of a more scalable and reliable distributed system.
The
Consumer's automatic protocol fallback from WebSockets to HTTP polling makes the wallet client inherently more robust. It can gracefully handle mints with varying capabilities and adapt to restrictive network environments without requiring any changes to the application code. This resilience is further enhanced by the built-in exponential backoff mechanism for reconnections, which promotes system stability under adverse conditions.Performance and efficiency are significantly boosted by the
Consumer's advanced features. Request coalescing drastically reduces the number of outbound network requests and subscriptions, saving bandwidth and lessening the load on mint servers, especially in applications with many concurrent components subscribing to similar events. The latest-event caching mechanism enhances the user experience by reducing perceived latency for new subscriptions, providing immediate state feedback without requiring a network round-trip.Conclusion
The refactoring of the publish-subscribe module successfully transforms it from a specific, monolithic implementation into a robust, scalable, and transport-agnostic cornerstone of the platform's distributed communication architecture. The new design is a masterclass in modern systems architecture, emphasizing abstraction through traits, clear separation of concerns, and built-in resilience. By providing a generic local event bus and a sophisticated remote consumer framework as reusable, foundational components, the new architecture not only resolves the limitations of the previous system but also provides a powerful and extensible platform for future development. This strategic investment in a clean, decoupled design will pay dividends in maintainability, testability, and the ability to evolve the system to meet new challenges rapidly.