Skip to content

Conversation

@crodas
Copy link
Collaborator

@crodas crodas commented Sep 21, 2025

An Architectural Analysis of the Refactored Publish-Subscribe Module

Section 1: Foundational Primitives of the New Pub-Sub Architecture

The recent refactoring of the publish-subscribe (pub-sub) module represents a paradigm shift in the system's approach to event-driven communication. The core of this transformation lies in extracting a generic, transport-agnostic pub-sub engine into the cdk-common crate. This new engine is built upon a set of foundational traits that establish a clear, contract-based architecture. These traits decouple the core eventing logic from domain-specific concerns, promoting reusability, extensibility, and testability. This section provides a detailed analysis of these core primitives, which form the conceptual bedrock of the new system.

1.1. The Spec Trait: A Verbatim Analysis

At the heart of the new architecture is the Spec trait, which serves as a central configuration and behavior contract for a specific publish-subscribe (pub-sub) domain. It replaces the previous monolithic Manager with a declarative specification that binds all necessary components together.

Verbatim Trait Definition:

#[async_trait::async_trait]
pub trait Spec: Send + Sync {
    /// Topic
    type Topic: ...;

    /// Event
    type Event: Event<Topic = Self::Topic> + ...;

    /// Subscription Id
    type SubscriptionId: ...;

    /// Create a new context
    type Context;

    /// Create a new instance from a given context
    fn new_instance(context: Self::Context) -> Arc<Self>
    where
        Self: Sized;

    /// Callback function that is called on new subscriptions, to back-fill optionally the previous
    /// events
    async fn fetch_events(
        self: &Arc<Self>,
        topics: Vec<<Self::Event as Event>::Topic>,
        reply_to: Subscriber<Self>,
    ) where
        Self: Sized;
}

The Spec trait is the cornerstone of the system's newfound generic nature. Through its associated types—Topic, Event, SubscriptionId, and Context—it defines the complete set of types for a given pub-sub implementation. This allows the core Pubsub engine to remain entirely agnostic of the domain it serves. The same engine can power mint-side eventing, wallet-side notifications, or any future eventing domain simply by being provided with a different Spec implementation at compile time.

This represents a fundamental shift from an imperative design, where the old Manager struct contained hardwired logic, to a declarative one. Developers can now understand the capabilities and data flow of an entire eventing domain by examining its Spec implementation, a much higher level of abstraction that significantly enhances system comprehensibility.

Furthermore, the fetch_events formalizes the "latest-on-subscribe" pattern, a common requirement in pub-sub systems and a specific requirement of NUT-17. By making this a required method on the Spec trait, the framework delegates the responsibility of sourcing historical or current-state data to the domain-specific implementation. The core Pubsub engine triggers this callback for every new subscription, but remains decoupled from persistence concerns like database access. This approach maintains the event bus's cleanliness while providing essential functionality.

1.2. The Event Trait: Defining the Contract for Publishable Data

The Event trait establishes a critical yet straightforward contract for any data type that can be published through the system. It decouples the Pubsub engine from the concrete structure of the events themselves.

Verbatim Trait Definition:

pub trait Event: ... {
    /// Generic Topic
    ///
    /// It should be serializable/deserializable to be stored in the database layer and it should
    /// also be sorted in a BTree for in-memory matching
    type Topic;

    /// To topics
    fn get_topics(&self) -> Vec<Self::Topic>;
}

The sole responsibility of the Event trait, via its get_topics method, is to define a mapping from a domain object to one or more indexable Topic's. This abstraction is what enables the Pubsub engine's BTreeMap-based indexing and matching mechanism to operate on any data type. The engine does not need to understand the event's fields; it only needs to know which topics the event corresponds to. The introduction of MintEvent<T> in the cdk crate, which wraps the NotificationPayload to satisfy this trait contract, is a direct application of this powerful pattern.

1.3. The SubscriptionRequest Trait: Abstracting Subscriber Intent

The SubscriptionRequest trait introduces a vital layer of abstraction between a client's high-level intent and the low-level topics used for internal routing.

Verbatim Trait Definition:

pub trait SubscriptionRequest {
    /// Topics
    type Topic;

    /// Subscription Id
    type SubscriptionId;

    /// Try to get topics from the request
    fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error>;

    /// Get the subscription name
    fn subscription_name(&self) -> Arc<Self::SubscriptionId>;
}

This trait encapsulates the logic for parsing and validating a client's request (such as a NUT-17 Params object) and converting its filters into a list of concrete Topic's that the Pubsub engine can index. This effectively separates the concern of request parsing and validation from the core subscription management logic. The changes observed in crates/cdk-common/src/subscription.rs, where both Params (for the mint) and WalletParams (for the wallet) are refactored to implement SubscriptionRequest, demonstrate this principle in action, cleanly integrating domain-specific request formats into the generic framework.

1.4. Conceptual Interaction Model

The relationships between these foundational traits form a cohesive and logical system. The following diagram illustrates their static dependencies and conceptual roles within the architecture.

graph TD
    subgraph Foundational Primitives
        Spec -- defines --> Event_Trait(Event Trait);
        Spec -- defines --> Topic_Type(Topic Type);
        Spec -- defines --> SubscriptionId_Type(SubscriptionId Type);
        Event_Trait -- "maps to" --> Topic_Type;
        SubscriptionRequest_Trait(SubscriptionRequest Trait) -- "converts to" --> Topic_Type;
        SubscriptionRequest_Trait -- "defines" --> SubscriptionId_Type;
    end
    Pubsub_Engine(Pubsub Engine) -- "is generic over" --> Spec;
    Subscriber -- "submits a" --> SubscriptionRequest_Trait;
    Pubsub_Engine -- "publishes an" --> Event_Trait;

Loading

Section 2: The Pubsub Engine: A High-Performance Local Event Bus

At the core of the new architecture is the Pubsub<S> engine, a high-performance, in-memory event bus located in crates/cdk-common/src/pub_sub/pubsub.rs. This component is responsible for the efficient management of local subscriptions and the fan-out of published events to all relevant subscribers. Its design prioritizes performance, thread safety, and robust resource management.

2.1. The Pubsub<S> Struct: Core Responsibilities

The Pubsub struct serves as the central coordinator for all local eventing activities.
The key fields of the Pubsub struct are:

  • inner: Arc<S>: An Arc-wrapped instance of a type that implements the Spec trait. This allows the generic Pubsub engine to delegate domain-specific behaviors, such as fetching initial state upon subscription, to the concrete implementation.

  • listeners_topics: TopicTree<S>: This is the high-performance indexing engine, defined as a type alias for Arc<RwLock<BTreeMap<(<T as Spec>::Topic, usize), Subscriber<T>>>>. The choice of data structures here is significant. The BTreeMap provides efficient, sorted key-value storage, enabling rapid range-based lookups for matching subscribers to a given topic. The composite key, (Topic, usize), is crucial for uniqueness; the usize component, populated by an atomic counter, ensures that multiple distinct subscriptions to the same topic can coexist in the map.

  • active_subscribers: Arc<AtomicUsize>: A thread-safe atomic counter provides a non-blocking mechanism for monitoring the total number of active subscriptions, which is useful for telemetry and system health checks.

A notable micro-architectural decision is the use of parking_lot::RwLock instead of the standard tokio::sync::RwLock. The tokio lock is async-aware and yields to the runtime scheduler when contended, which is ideal for long-held locks. However, the lock contention within the Pubsub engine is expected to be very brief—reads during event publication and writes during subscription are extremely fast map operations. For such short-lived critical sections, a spinning, blocking mutex like parking_lot::RwLock offers significantly better performance by avoiding the overhead of async context switching. This choice reflects a nuanced understanding of the workload and a commitment to performance optimization.

2.2. The Subscription Lifecycle and Resource Management

The subscribe method returns an ActiveSubscription<S> struct to the caller. This struct acts as a guard, holding the receiving end of a tokio::sync::mpsc channel for event delivery.

Crucially, ActiveSubscription<S> implements the Drop trait. When an instance of this struct goes out of scope (e.g., the client is no longer interested in the subscription), its drop implementation is automatically invoked. This implementation performs the necessary cleanup: it removes all of the subscription's associated entries from the listeners_topics BTreeMap and atomically decrements the active_subscribers counter. This automatic, deterministic cleanup mechanism guarantees that system resources are reclaimed promptly and prevents common issues like memory leaks or dangling subscribers.

2.3. Event Publication and Dispatch Flow

The process of publishing an event and dispatching it to all relevant subscribers is highly optimized. The architecture provides two distinct methods for publication: an asynchronous, non-blocking publish method, and a synchronous, blocking publish_now method. The publish method spawns a new task to handle the fan-out, immediately returning control to the caller, which is ideal for performance-sensitive contexts like request handlers. In contrast, publish_now performs the dispatch inline, which is suitable for contexts requiring sequential consistency, such as within a state machine or transaction.

The core logic for both is contained within the publish_internal function, as visualized in the following flowchart.

flowchart TD
    A[publish_now] --> B{Get read lock on listeners_topics};
    B --> C{event.get_topics};
    C --> D{For each topic};
    D --> E{Perform range query on B-Tree for topic};
    E --> F{For each matching subscriber};
    F --> G{Check if already sent to this subscriber};
    G -- No --> H[sender.send];
    G -- Yes --> F;
    H --> F;
    F -- No more subscribers for topic --> D;
    D -- No more topics --> I;
    I --> J[End];
Loading

This flow efficiently identifies all interested subscribers for an event's topics while ensuring that each unique subscriber receives the event only once, even if they are subscribed to multiple matching topics for that event.

Section 3: The Remote Consumer Framework: Bridging Distributed Systems

The most significant architectural innovation introduced by this refactoring is the comprehensive remote consumer framework, located in the new file crates/cdk-common/src/pub_sub/remote_consumer.rs. This framework provides a standardized, resilient, and highly efficient mechanism for a local component (e.g., a wallet) to subscribe to and consume events from a remote producer (e.g., a mint). It abstracts away the complexities of network communication, state management, and error handling.

3.1. The Transport Trait: A Verbatim Analysis

The lynchpin of the remote consumer's flexibility is the Transport trait. It completely abstracts the underlying network protocol, decoupling the high-level consumer logic from the low-level details of sending and receiving messages.

Verbatim Trait Definition:

pub trait Transport: Send + Sync {
    /// Spec
    type Spec: Spec;

    /// Create a new subscription name
    fn new_name(&self) -> <Self::Spec as Spec>::SubscriptionId;

    /// Opens a persistent connection and continuously streams events.
    /// For protocols that support server push (e.g. WebSocket, SSE).
    async fn stream(
        &self,
        subscribe_changes: mpsc::Receiver<StreamCtrl<Self::Spec>>,
        topics: Vec<SubscribeMessage<Self::Spec>>,
        reply_to: InternalRelay<Self::Spec>,
    ) -> Result<(), Error>;

    /// Performs a one-shot fetch of any currently available events.
    /// Called repeatedly by the consumer when streaming is not available.
    async fn poll(
        &self,
        topics: Vec<SubscribeMessage<Self::Spec>>,
        reply_to: InternalRelay<Self::Spec>,
    ) -> Result<(), Error>;
}

Source: crates/cdk-common/src/pub_sub/remote_consumer.rs 1

This trait establishes a simple yet powerful contract. The stream method is designed for efficient, persistent, server-push protocols like WebSockets, while the poll method provides a necessary fallback for traditional request-response protocols like HTTP. This dual-method approach is central to the framework's resilience. This design is a textbook example of the Bridge pattern, where the Consumer (the high-level abstraction) is permanently decoupled from the Transport (the implementation interface). This allows them to vary independently; a new network protocol can be supported by adding a new Transport implementation without any changes to the Consumer's sophisticated logic.

3.2. The Consumer<T> Struct: Advanced Subscription Management

The Consumer<T> struct is the high-level, client-side manager that orchestrates remote subscriptions. It introduces several advanced features that dramatically improve the system's efficiency and robustness.

  • Request Coalescing: The subscribe method intelligently manages upstream network connections. When multiple local clients subscribe to the same remote topic, the Consumer establishes only a single subscription with the remote producer. Incoming events for that topic are then efficiently fanned out to all interested local subscribers via the internal Pubsub engine. This is a critical optimization that minimizes network chatter, reduces server load, and conserves client-side resources.

  • Automatic Protocol Fallback and Resilience: The main connection management loop within the Consumer is designed for resilience. It first attempts to use the transport.stream() method. If this method returns an Error::NotSupported or fails repeatedly, the Consumer automatically and transparently transitions to a polling strategy, calling transport.poll() at regular intervals. This ensures that the client can maintain functionality even when connecting to servers that do not support WebSockets or when operating in network environments that prohibit long-lived connections.

  • Built-in Exponential Backoff: The connection loop incorporates a classic exponential backoff strategy for handling connection failures. Upon a failed connection attempt, it calculates a retry_at timestamp based on an incrementally increasing backoff duration, which is capped at a maximum value. This prevents a client from engaging in rapid, aggressive reconnection attempts that could overwhelm a temporarily unavailable server (a "thundering herd" problem). Building this resilience directly into the framework ensures that all remote subscriptions benefit from a standardized, robust error recovery mechanism.

  • Latest-Event Caching: The Consumer maintains a cached_events map that stores the most recent event received for each subscribed topic. When a new local subscriber is added, it is immediately served this cached event, fulfilling the "latest-on-subscribe" pattern without the latency of an additional network request. The cache is intelligently managed; an entry is evicted only when the last local subscriber for that topic unsubscribes, ensuring data freshness and efficient memory usage.

3.3. Remote Subscription State Management

Communication between the Consumer's main logic and its background transport task is managed via a simple, internal message-passing protocol defined by the StreamCtrl enum.

Enum Definition Snippet:

pub enum StreamCtrl<S>
where
    S: Spec + 'static,
{
    /// Add a subscription
    Subscribe(SubscribeMessage<S>),
    /// Desuscribe
    Unsubscribe(S::SubscriptionId),
    /// Exit the loop
    Stop,
}

Source: crates/cdk-common/src/pub_sub/remote_consumer.rs 1

This enum allows the Consumer to dynamically alter the set of active remote subscriptions on a live connection. For instance, when a new local subscriber requests a previously-unsubscribed topic, a StreamCtrl::Subscribe message is sent to the transport task, which then sends the appropriate protocol message over the network. This avoids the high cost of tearing down and re-establishing the entire network connection for minor changes to the subscription set.

3.4. Remote Consumer Architectural Blueprint

The following diagram illustrates the components and interactions within the remote consumer framework, showing how it mediates between local application code and a remote producer.

graph TD
    subgraph Local Client
        AppCode1[App Code 1] --> |subscribes| Consumer;
        AppCode2[App Code 2] --> |subscribes| Consumer;
    end

    subgraph Consumer Framework
        Consumer -- "uses" --> InternalPubsub(Internal Pubsub Engine);
        Consumer -- "manages" --> TransportTask(Background Transport Task);
        TransportTask -- "implements" --> TransportTrait(Transport Trait);
        Consumer -- "sends StreamCtrl msgs" --> TransportTask;
    end

    subgraph Network
        TransportTask -- "sends/receives over network" --> RemoteProducer(Remote Producer);
    end

    InternalPubsub -- "delivers events to" --> AppCode1;
    InternalPubsub -- "delivers events to" --> AppCode2;

Loading

Section 4: System-Wide Integration and Concrete Implementations

The true power of the new abstract framework is realized through its concrete implementations within the mint and wallet components. These implementations demonstrate how the generic primitives are adapted to solve specific domain problems, providing a clear separation between the reusable engine and the application-specific logic.

4.1. The Mint-Side MintPubSubSpec

For the mint, the abstract pub-sub framework is concretized in the new crates/cdk/src/mint/subscription.rs file.This file introduces MintPubSubSpec, which implements the Spec trait for the mint's eventing domain.

The most critical part of this implementation is the fetch_events method. When a new subscription is created, this method is invoked by the Pubsub engine. It intelligently parses the requested topics, distinguishing between ProofState requests and various quote status requests. It then queries the DynMintDatabase to retrieve the current state for those entities. The results are packaged into MintEvent objects and sent back to the new subscriber, fulfilling the "latest-on-subscribe" requirement. This is a perfect demonstration of the Spec trait's role in integrating domain-specific data sources (in this case, a database) into the generic eventing flow.

The PubSubManager on the mint side has been refactored into a lightweight wrapper around Pubsub<MintPubSubSpec>. It no longer contains complex management logic itself but instead provides a set of convenient, domain-specific helper methods like proof_state, mint_quote_payment, and melt_quote_status that simplify the process of publishing events from elsewhere in the mint's codebase.

4.2. The Wallet-Side SubscriptionClient

On the client side, the wallet's subscription logic has been completely overhauled to adopt the new remote consumer framework. The refactored crates/cdk/src/wallet/subscription/mod.rs introduces the SubscriptionClient, which serves as the wallet's concrete Transport implementation.

This SubscriptionClient elegantly implements the dual-protocol contract of the Transport trait:

  • poll() method: This method provides the HTTP fallback mechanism. When invoked by the Consumer, it makes direct, stateless HTTP requests to the mint using the existing http_client for various checks like get_mint_quote_status and post_check_state. The responses are then wrapped and forwarded to the Consumer's internal relay for local fan-out.

  • stream() method: This method, defined in the new ws.rs file, handles the preferred WebSocket protocol. It establishes a persistent connection to the mint's /v1/ws endpoint. It listens for StreamCtrl messages from the Consumer and translates them into the appropriate JSON-RPC subscribe or unsubscribe messages to be sent over the WebSocket. Conversely, it parses incoming WebSocket notifications from the mint and passes them to the Consumer's relay.

The top-level SubscriptionManager in the wallet is now a factory that manages a map of MintUrl to Consumer<SubscriptionClient> instances. This ensures that for any given mint, only one Consumer—and therefore only one underlying network connection (be it WebSocket or polling HTTP)—is ever created, centralizing all subscription management for that mint in a single, efficient component.

4.3. End-to-End Event Flow

The following sequence diagram illustrates the complete, end-to-end flow of a wallet subscribing to a mint quote status update, showcasing the seamless interaction between all components of the new architecture.

sequenceDiagram
    participant Wallet as Wallet App
    participant SubManager as Wallet SubscriptionManager
    participant Consumer as Consumer<SubscriptionClient>
    participant Transport as SubscriptionClient (Transport)
    participant Mint as Mint WebSocket Server
    participant MintPubsub as Mint PubSubManager

    Wallet->>SubManager: subscribe(mint_url, quote_id)
    SubManager->>Consumer: subscribe(quote_id)
    Consumer->>Transport: stream() connection initiated
    Transport->>Mint: WebSocket CONNECT
    Transport-->>Consumer: Connection OK
    Consumer->>Transport: StreamCtrl::Subscribe(quote_id)
    Transport->>Mint: {"method": "subscribe", "params":...}
    Mint->>MintPubsub: subscribe(quote_id)
    Note over Mint,MintPubsub: Mint fetches current quote state from DB
    MintPubsub-->>Mint: current_state_event
    Mint-->>Transport: Notification(current_state)
    Transport-->>Consumer: reply_to.send(event)
    Consumer-->>Wallet: event received
    loop Later
        Note over MintPubsub,Mint: Quote is paid
        MintPubsub->>MintPubsub: publish(paid_event)
        MintPubsub-->>Mint: paid_event
        Mint-->>Transport: Notification(paid_event)
        Transport-->>Consumer: reply_to.send(event)
        Consumer-->>Wallet: event received
    end

Loading

Section 5: A Comparative Analysis of Architectural Improvements

The refactoring of the pub-sub module is not merely an incremental improvement but a fundamental re-architecting of the system's communication layer. By contrasting the old and new designs, the strategic benefits in terms of decoupling, reusability, resilience, and performance become strikingly clear.

5.1. From Tightly Coupled to Decoupled and Reusable

The previous architecture, evidenced by the now-deleted cdk/src/pub_sub.rs file, was a monolithic implementation confined within the cdk crate. The Manager struct was tightly coupled to its internal indexing logic and the specific NotificationPayload type. This created a rigid system that was difficult to test in isolation, adapt for new use cases, or share across different parts of the platform.

The new architecture dismantles this monolith. The entire core pub-sub engine has been extracted into the cdk-common crate, establishing it as a foundational, reusable utility. The trait-based design completely decouples the engine from any specific domain, allowing it to be reused for both the mint and the wallet, and any future components, with zero modification to the core logic.

5.2. Table: Architectural Comparison

The following table provides a side-by-side comparison of the key architectural characteristics of the old and new systems, highlighting the strategic value of each change.

Feature Old Architecture (Deprecated) New Architecture (Refactored) Strategic Benefit
Core Logic Location cdk/src/pub_sub.rs cdk-common/src/pub_sub/ Reusability & Decoupling: Core logic is now a shared, common utility.
Design Paradigm Concrete Manager struct Generic, trait-based (Spec) Extensibility: New domains can be added without modifying the core engine.
Event Indexing Indexable trait, Index struct Event trait mapping to Topic Flexibility: Simplifies the contract for publishable objects.
Subscription Request IndexableParams wrapper SubscriptionRequest trait Separation of Concerns: Decouples request parsing from subscription logic.
Client-Side Logic Ad-hoc in wallet/subscription Standardized Consumer/Transport Robustness & Standardization: Provides a resilient, reusable client framework.
Network Protocol Tightly coupled (WS or HTTP) Abstracted via Transport trait Protocol Agnosticism: Wallet can use WS, HTTP, or future protocols seamlessly.
Resource Management ActiveSubscription with Drop ActiveSubscription with Drop (Consistent) RAII pattern ensures no resource leaks.
Testability Difficult to mock dependencies Easy via trait-based dependency injection Higher Quality: Enables comprehensive and isolated unit/integration testing.

5.3. Gains in System Resilience and Performance

The new architecture delivers tangible improvements in both system resilience and performance, particularly on the client side. These are not minor tweaks but foundational features that enable the construction of a more scalable and reliable distributed system.

The Consumer's automatic protocol fallback from WebSockets to HTTP polling makes the wallet client inherently more robust. It can gracefully handle mints with varying capabilities and adapt to restrictive network environments without requiring any changes to the application code. This resilience is further enhanced by the built-in exponential backoff mechanism for reconnections, which promotes system stability under adverse conditions.

Performance and efficiency are significantly boosted by the Consumer's advanced features. Request coalescing drastically reduces the number of outbound network requests and subscriptions, saving bandwidth and lessening the load on mint servers, especially in applications with many concurrent components subscribing to similar events. The latest-event caching mechanism enhances the user experience by reducing perceived latency for new subscriptions, providing immediate state feedback without requiring a network round-trip.

Conclusion

The refactoring of the publish-subscribe module successfully transforms it from a specific, monolithic implementation into a robust, scalable, and transport-agnostic cornerstone of the platform's distributed communication architecture. The new design is a masterclass in modern systems architecture, emphasizing abstraction through traits, clear separation of concerns, and built-in resilience. By providing a generic local event bus and a sophisticated remote consumer framework as reusable, foundational components, the new architecture not only resolves the limitations of the previous system but also provides a powerful and extensible platform for future development. This strategic investment in a clean, decoupled design will pay dividends in maintainability, testability, and the ability to evolve the system to meet new challenges rapidly.

@crodas crodas self-assigned this Sep 21, 2025
@crodas crodas changed the title Introduce a generic pubsub mod in cdk-common Introduce a generic pubsub mod in cdk-common Sep 21, 2025
@crodas crodas force-pushed the proto/improved-pubsub branch 20 times, most recently from 245fea0 to e6ad93f Compare September 28, 2025 12:11
@crodas crodas force-pushed the proto/improved-pubsub branch from e6ad93f to ec2e7b2 Compare September 28, 2025 23:20
@thesimplekid
Copy link
Collaborator

Does this address this issue? #1104

@crodas
Copy link
Collaborator Author

crodas commented Sep 29, 2025

Does this address this issue? #1104

We're still checking on the db. But we can check to the LN backend easily. I'll make it happen before setting this PR to reviewable. For context, that would happen on fetch_events

@crodas crodas force-pushed the proto/improved-pubsub branch 3 times, most recently from a1cb722 to 4333e67 Compare September 30, 2025 23:07
@crodas
Copy link
Collaborator Author

crodas commented Oct 2, 2025

My biggest concern with this is now that we don't us the send().await we don't have back pressure and we are not logging failed events if we do reach the channels max capacity.

@thesimplekid I deal with this in many places introducing a thin wapper on top of the Sender, for instance Subscriber<T>.

    /// Send a message
    pub fn send(&self, name: &T::SubscriptionName, event: T::Event) {
        let mut latest = if let Ok(reader) = self.latest.lock() {
            reader
        } else {
            let _ = self.inner.try_send((name.to_owned(), event));
            return;
        };

        if let Some(last_event) = latest.insert(name.clone(), event.clone()) {
            if last_event == event {
                return;
            }
        }

        let _ = self.inner.try_send((name.to_owned(), event));
    }
    ```

In this function I will add the logging and it can even have some sleep/retry mechanism

@crodas crodas force-pushed the proto/improved-pubsub branch 6 times, most recently from a71535c to 6d822aa Compare October 3, 2025 21:32
@thesimplekid
Copy link
Collaborator

@crodas can you rebase?

@crodas crodas force-pushed the proto/improved-pubsub branch 11 times, most recently from 7f07aca to c6327f6 Compare October 7, 2025 01:48
Copy link
Collaborator

@asmogo asmogo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only one little improvement suggestion.
I will test this together with FFI asap.

crodas and others added 7 commits October 7, 2025 21:17
…rough wallet/mint/WS/FFI

Refactor the pub/sub engine to a single Spec trait, move Event alongside it,
and propagate Arc-backed subscription IDs across the stack. This simplifies
generics, clarifies responsibilities, and preserves coalescing +
latest-on-subscribe semantics.

-   **Single source of truth:** `Spec` owns `Topic`, `Event`, `SubscriptionId`,
    `Context`, new_instance, and fetch_events.
-   **Lean & explicit API:** Remove Topic trait split;
    `Subscriber::send(Event)` carries sub-ID internally.
-   **Performance/ergonomics:** `Arc<SubscriptionId>` avoids heavy clones and
    makes channel/task hops trivial.

-   Introduce `pub_sub/typ.rs` with:
    -   trait `Spec`
    -   trait `Event` colocated with Spec.
-   Remove `pub_sub/event.rs` fold `Event` into `typ.rs`.
-   Make `Pubsub<S>` generic over `Spec` and store `Arc<S>`.
-   The subscriber holds `Arc<SubscriptionId>` and deduplicates the latest
    entry per subscription.
-   SubscriptionRequest: rename SubscriptionName → SubscriptionId; return
    `Arc<...>` from `subscription_name()`.
-   Remote consumer (Transport) now parameterized by `Spec`; control types
    updated:
    -   `StreamCtrl<S>`, `SubscribeMessage<S>`, internal caches keyed by
        `S::Topic`.
-   Mint/wallet:
    -   Mint: `MintPubSubSpec` (Context = `DynMintDatabase`),
        `PubSubManager(Pubsub<MintPubSubSpec>)`.
    -   Wallet: lightweight MintSubTopics Spec with `Context = ()`.
-   IDs go Arc end-to-end:
    -   cdk-axum WS maps `HashMap<Arc<SubId>, JoinHandle<()>>`, publisher sends
        `(Arc<SubId>, NotificationPayload)`.
    -   `subscription::{Params, WalletParams}` now use `Arc<...>`.
    -   cdk-ffi conversions & wallet glue updated.
    -   Integration tests updated for new types.

-   Coalescing unchanged: multiple local subs to the same topic are combined
    into a single remote sub.
-   Backfill via `Spec::fetch_events(topics, Subscriber)`; Subscriber enforces
    latest-only dedupe per subscription.

**Result:** a slimmer, more maintainable pub/sub core that’s easier to embed
across mint, wallet, transports, and FFI without sacrificing performance or
semantics.
@crodas crodas force-pushed the proto/improved-pubsub branch from 8c1b161 to 99d0646 Compare October 8, 2025 00:19
@thesimplekid thesimplekid merged commit cdd5935 into cashubtc:main Oct 8, 2025
99 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants