mercure

package module
v0.21.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2025 License: AGPL-3.0 Imports: 35 Imported by: 5

README

Protocol and Reference Implementation

Mercure is a protocol for pushing data updates to web browsers and other HTTP clients in a convenient, fast, reliable, and battery-efficient way. It is especially useful to publish async and real-time updates of resources served through web APIs, to reactive web and mobile apps.

Awesome Artifact HUB PkgGoDev CI Coverage Status Go Report Card

Subscriptions Schema

The protocol is maintained in this repository and is also available as an Internet-Draft.

A reference, production-grade, implementation of a Mercure hub (the server) is also available in this repository. It's free software (AGPL) written in Go. It is provided along with a library that can be used in any Go application to implement the Mercure protocol directly (without a hub) and an official Docker image.

In addition, a managed and high-scalability version of the Mercure.rocks hub is available on Mercure.rocks.

Contributing

See CONTRIBUTING.md.

See license information.

Credits

Created by Kévin Dunglas. Graphic design by Laury Sorriaux. Sponsored by Les-Tilleuls.coop.

Documentation

Overview

Package mercure helps implement the Mercure protocol (https://mercure.rocks) in Go projects. It provides an implementation of a Mercure hub as an HTTP handler.

Example
package main

import (
	"context"
	"log"
	"net/http"

	"github.com/dunglas/mercure"
)

func main() {
	ctx := context.Background()

	h, err := mercure.NewHub(
		ctx,
		mercure.WithPublisherJWT([]byte("!ChangeMe!"), "HS256"),
		mercure.WithSubscriberJWT([]byte("!ChangeMe!"), "HS256"),
	)
	if err != nil {
		log.Fatal(err)
	}

	defer func() {
		if err := h.Stop(ctx); err != nil {
			panic(err)
		}
	}()

	http.Handle("/.well-known/mercure", h)
	log.Panic(http.ListenAndServe(":8080", nil))
}

Index

Examples

Constants

View Source
const (
	DefaultWriteTimeout    = 600 * time.Second
	DefaultDispatchTimeout = 5 * time.Second
	DefaultHeartbeat       = 40 * time.Second
)
View Source
const (
	DefaultTopicSelectorStoreCacheMaxEntriesPerShard = 10_000
	DefaultTopicSelectorStoreCacheShardCount         = uint64(256)
)

Let's say that a topic selector is 100 bytes on average, a cache with 10,000 entries per shard and 256 shards will use about 256 * 10,000 * 100 = 256MB of RAM.

nolint:godox TODO: gather stats to find the best default values.

View Source
const BoltDefaultCleanupFrequency = 0.3
View Source
const DefaultSubscriberListCacheSize = 100_000

DefaultSubscriberListCacheSize is the default size of the skipfilter cache.

Let's say update topics take 100 bytes on average, a cache with 100,000 entries will use about 10MB.

View Source
const EarliestLastEventID = "earliest"

EarliestLastEventID is the reserved value representing the earliest available event id.

Variables

View Source
var (
	// ErrInvalidAuthorizationHeader is returned when the Authorization header is invalid.
	ErrInvalidAuthorizationHeader = errors.New(`invalid "Authorization" HTTP header`)
	// ErrInvalidAuthorizationQuery is returned when the authorization query parameter is invalid.
	ErrInvalidAuthorizationQuery = errors.New(`invalid "authorization" Query parameter`)
	// ErrNoOrigin is returned when the cookie authorization mechanism is used and no Origin nor Referer headers are presents.
	ErrNoOrigin = errors.New(`an "Origin" or a "Referer" HTTP header must be present to use the cookie-based authorization mechanism`)
	// ErrOriginNotAllowed is returned when the Origin is not allowed to post updates.
	ErrOriginNotAllowed = errors.New("origin not allowed to post updates")
	// ErrInvalidJWT is returned when the JWT is invalid.
	ErrInvalidJWT = errors.New("invalid JWT")
)
View Source
var ErrClosedTransport = errors.New("hub: read/write on closed Transport")

ErrClosedTransport is returned by the Transport's Dispatch and AddSubscriber methods after a call to Close.

View Source
var ErrUnexpectedSigningMethod = errors.New("unexpected signing method")

ErrUnexpectedSigningMethod is returned when the signing JWT method is not supported.

View Source
var ErrUnsupportedProtocolVersion = errors.New("compatibility mode only supports protocol version 7")

ErrUnsupportedProtocolVersion is returned when the version passed is unsupported.

View Source
var SubscriberContextKey subscriberContextKeyType //nolint:gochecknoglobals
View Source
var UpdateContextKey updateContextKeyType //nolint:gochecknoglobals

Functions

func NewSlogHandler added in v0.21.0

func NewSlogHandler(handler slog.Handler) slog.Handler

INTERNAL: NewSlogHandler returns a log/slog.Handler that automatically appends "update" and "subscriber" context, if applicable.

This function will be removed when https://github.com/caddyserver/caddy/pull/7346 will be available.

Types

type BoltTransport

type BoltTransport struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

BoltTransport implements the TransportInterface using the Bolt database.

func NewBoltTransport

func NewBoltTransport(
	subscriberList *SubscriberList,
	logger *slog.Logger,
	path string,
	bucketName string,
	size uint64,
	cleanupFrequency float64,
) (*BoltTransport, error)

NewBoltTransport creates a new BoltTransport.

func (*BoltTransport) AddSubscriber

func (t *BoltTransport) AddSubscriber(ctx context.Context, s *LocalSubscriber) error

AddSubscriber adds a new subscriber to the transport.

func (*BoltTransport) Close

func (t *BoltTransport) Close(_ context.Context) (err error)

Close closes the Transport.

func (*BoltTransport) Dispatch

func (t *BoltTransport) Dispatch(ctx context.Context, update *Update) error

Dispatch dispatches an update to all subscribers and persists it in Bolt DB.

func (*BoltTransport) GetSubscribers

func (t *BoltTransport) GetSubscribers(_ context.Context) (string, []*Subscriber, error)

GetSubscribers get the list of active subscribers.

func (*BoltTransport) RemoveSubscriber added in v0.14.0

func (t *BoltTransport) RemoveSubscriber(_ context.Context, s *LocalSubscriber) error

RemoveSubscriber removes a new subscriber from the transport.

type Event

type Event struct {
	// The updates' data, encoded in the sever-sent event format: every line starts with the string "data: "
	// https://www.w3.org/TR/eventsource/#dispatchMessage
	Data string

	// The globally unique identifier corresponding to update
	ID string

	// The event type, will be attached to the "event" field
	Type string

	// The reconnection time
	Retry uint64
}

Event is the actual Server Sent Event that will be dispatched.

func (*Event) String

func (e *Event) String() string

String serializes the event in a "text/event-stream" representation.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub stores channels with clients currently subscribed and allows to dispatch updates.

func NewHub

func NewHub(ctx context.Context, options ...Option) (*Hub, error)

NewHub creates a new Hub instance.

func (*Hub) Demo added in v0.14.0

func (h *Hub) Demo(w http.ResponseWriter, r *http.Request)

Demo exposes INSECURE Demo endpoints to test discovery and authorization mechanisms. Add a query parameter named "body" to define the content to return in the response's body. Add a query parameter named "jwt" set a "mercureAuthorization" cookie containing this token. The Content-Type header will automatically be set according to the URL's extension.

func (*Hub) Publish added in v0.21.0

func (h *Hub) Publish(ctx context.Context, update *Update) error

Publish broadcasts the given update to all subscribers. The id field of the Update instance can be updated by the underlying Transport.

func (*Hub) PublishHandler

func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request)

PublishHandler allows publisher to broadcast updates to all subscribers.

func (*Hub) ServeHTTP

func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Hub) Stop added in v0.11.1

func (h *Hub) Stop(ctx context.Context) error

Stop stops the hub.

func (*Hub) SubscribeHandler

func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request)

SubscribeHandler creates a keep alive connection and sends the events to the subscribers.

func (*Hub) SubscriptionHandler

func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request)

func (*Hub) SubscriptionsHandler

func (h *Hub) SubscriptionsHandler(w http.ResponseWriter, r *http.Request)

type LocalSubscriber added in v0.17.0

type LocalSubscriber struct {
	Subscriber
	// contains filtered or unexported fields
}

LocalSubscriber represents a client subscribed to a list of topics on the current hub.

func NewLocalSubscriber added in v0.17.0

func NewLocalSubscriber(lastEventID string, logger *slog.Logger, topicSelectorStore *TopicSelectorStore) *LocalSubscriber

NewLocalSubscriber creates a new subscriber.

func (*LocalSubscriber) Disconnect added in v0.17.0

func (s *LocalSubscriber) Disconnect()

Disconnect disconnects the subscriber.

func (*LocalSubscriber) Dispatch added in v0.17.0

func (s *LocalSubscriber) Dispatch(ctx context.Context, u *Update, fromHistory bool) bool

Dispatch an update to the subscriber. Security checks must (topics matching) be done before calling Dispatch, for instance by calling Match.

func (*LocalSubscriber) HistoryDispatched added in v0.17.0

func (s *LocalSubscriber) HistoryDispatched(responseLastEventID string)

HistoryDispatched must be called when all messages coming from the history have been dispatched.

func (*LocalSubscriber) Ready added in v0.17.0

func (s *LocalSubscriber) Ready(ctx context.Context) (n int)

Ready flips the ready flag to true and flushes queued live updates returning number of events flushed.

func (*LocalSubscriber) Receive added in v0.17.0

func (s *LocalSubscriber) Receive() <-chan *Update

Receive returns a chan when incoming updates are dispatched.

type LocalTransport

type LocalTransport struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalTransport implements the TransportInterface without database and simply broadcast the live Updates.

func NewLocalTransport

func NewLocalTransport(sl *SubscriberList) *LocalTransport

NewLocalTransport creates a new LocalTransport.

func (*LocalTransport) AddSubscriber

func (t *LocalTransport) AddSubscriber(ctx context.Context, s *LocalSubscriber) error

AddSubscriber adds a new subscriber to the transport.

func (*LocalTransport) Close

func (t *LocalTransport) Close(_ context.Context) (err error)

Close closes the Transport.

func (*LocalTransport) Dispatch

func (t *LocalTransport) Dispatch(ctx context.Context, update *Update) error

Dispatch dispatches an update to all subscribers.

func (*LocalTransport) GetSubscribers

func (t *LocalTransport) GetSubscribers(_ context.Context) (string, []*Subscriber, error)

GetSubscribers gets the list of active subscribers.

func (*LocalTransport) RemoveSubscriber added in v0.14.0

func (t *LocalTransport) RemoveSubscriber(_ context.Context, s *LocalSubscriber) error

RemoveSubscriber removes a subscriber from the transport.

type Metrics

type Metrics interface {
	// SubscriberConnected collects metrics about subscriber connections.
	SubscriberConnected(s *LocalSubscriber)
	// SubscriberDisconnected collects metrics about subscriber disconnections.
	SubscriberDisconnected(s *LocalSubscriber)
	// UpdatePublished collects metrics about update publications.
	UpdatePublished(u *Update)
}

type NopMetrics

type NopMetrics struct{}

func (NopMetrics) SubscriberConnected

func (NopMetrics) SubscriberConnected(_ *LocalSubscriber)

func (NopMetrics) SubscriberDisconnected

func (NopMetrics) SubscriberDisconnected(_ *LocalSubscriber)

func (NopMetrics) UpdatePublished

func (NopMetrics) UpdatePublished(_ *Update)

type Option

type Option func(o *opt) error

Option instances allow to configure the library.

func WithAllowedHosts

func WithAllowedHosts(hosts []string) Option

WithAllowedHosts sets the allowed hosts.

func WithAnonymous

func WithAnonymous() Option

WithAnonymous allows subscribers with no valid JWT.

func WithCORSOrigins

func WithCORSOrigins(origins []string) Option

WithCORSOrigins sets the allowed CORS origins.

func WithCookieName added in v0.14.0

func WithCookieName(cookieName string) Option

WithCookieName sets the name of the authorization cookie (defaults to "mercureAuthorization").

func WithDebug

func WithDebug() Option

WithDebug enables the debug mode.

func WithDemo

func WithDemo() Option

WithDemo enables the demo.

func WithDispatchTimeout

func WithDispatchTimeout(timeout time.Duration) Option

WithDispatchTimeout sets maximum dispatch duration of an update.

func WithHeartbeat

func WithHeartbeat(interval time.Duration) Option

WithHeartbeat sets the frequency of the heartbeat, disabled by default.

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger sets the logger to use.

func WithMetrics

func WithMetrics(m Metrics) Option

WithMetrics enables collection of Prometheus metrics.

func WithProtocolVersionCompatibility added in v0.14.0

func WithProtocolVersionCompatibility(protocolVersionCompatibility int) Option

WithProtocolVersionCompatibility sets the version of the Mercure protocol to be backward compatible with (only version 7 is supported).

func WithPublishOrigins

func WithPublishOrigins(origins []string) Option

WithPublishOrigins sets the origins allowed to publish updates.

func WithPublisherJWT

func WithPublisherJWT(key []byte, alg string) Option

WithPublisherJWT sets the JWT key and the signing algorithm to use for publishers.

func WithPublisherJWTKeyFunc added in v0.15.11

func WithPublisherJWTKeyFunc(keyfunc jwt.Keyfunc) Option

WithPublisherJWTKeyFunc sets the function to use to parse and verify the publisher JWT.

func WithSubscriberJWT

func WithSubscriberJWT(key []byte, alg string) Option

WithSubscriberJWT sets the JWT key and the signing algorithm to use for subscribers.

func WithSubscriberJWTKeyFunc added in v0.15.11

func WithSubscriberJWTKeyFunc(keyfunc jwt.Keyfunc) Option

WithSubscriberJWTKeyFunc sets the function to use to parse and verify the subscriber JWT.

func WithSubscriptions

func WithSubscriptions() Option

WithSubscriptions allows to dispatch updates when subscriptions are created or terminated.

func WithTopicSelectorStore

func WithTopicSelectorStore(tss *TopicSelectorStore) Option

WithTopicSelectorStore sets the TopicSelectorStore instance to use.

func WithTransport

func WithTransport(t Transport) Option

WithTransport sets the transport to use.

func WithUI added in v0.12.0

func WithUI() Option

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) Option

WithWriteTimeout sets maximum duration before closing the connection, defaults to 600s, set to 0 to disable.

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics store Hub collected metrics.

func NewPrometheusMetrics

func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics

NewPrometheusMetrics creates a Prometheus metrics collector. This method must be called only one time, or it will panic.

func (*PrometheusMetrics) SubscriberConnected

func (m *PrometheusMetrics) SubscriberConnected(_ *LocalSubscriber)

func (*PrometheusMetrics) SubscriberDisconnected

func (m *PrometheusMetrics) SubscriberDisconnected(_ *LocalSubscriber)

func (*PrometheusMetrics) UpdatePublished

func (m *PrometheusMetrics) UpdatePublished(_ *Update)

type Subscriber

type Subscriber struct {
	ID                     string
	EscapedID              string
	Claims                 *claims
	EscapedTopics          []string
	RequestLastEventID     string
	SubscribedTopics       []string
	SubscribedTopicRegexps []*regexp.Regexp
	AllowedPrivateTopics   []string
	AllowedPrivateRegexps  []*regexp.Regexp
	// contains filtered or unexported fields
}

Subscriber represents a client subscribed to a list of topics on a remote or on the current hub.

func NewSubscriber

func NewSubscriber(logger *slog.Logger, topicSelectorStore *TopicSelectorStore) *Subscriber

func (*Subscriber) LogValue added in v0.21.0

func (s *Subscriber) LogValue() slog.Value

func (*Subscriber) Match added in v0.14.0

func (s *Subscriber) Match(u *Update) bool

Match checks if the current subscriber can receive the given update.

func (*Subscriber) MatchTopics added in v0.14.0

func (s *Subscriber) MatchTopics(topics []string, private bool) bool

MatchTopics checks if the current subscriber can access to at least one of the given topics.

func (*Subscriber) SetTopics added in v0.14.0

func (s *Subscriber) SetTopics(subscribedTopics, allowedPrivateTopics []string)

SetTopics compiles topic selector regexps.

type SubscriberList added in v0.14.0

type SubscriberList struct {
	// contains filtered or unexported fields
}

func NewSubscriberList added in v0.14.0

func NewSubscriberList(cacheSize int) *SubscriberList

func (*SubscriberList) Add added in v0.14.0

func (sl *SubscriberList) Add(s *LocalSubscriber)

func (*SubscriberList) Len added in v0.14.0

func (sl *SubscriberList) Len() int

func (*SubscriberList) MatchAny added in v0.14.0

func (sl *SubscriberList) MatchAny(u *Update) []*LocalSubscriber

func (*SubscriberList) Remove added in v0.14.0

func (sl *SubscriberList) Remove(s *LocalSubscriber)

func (*SubscriberList) Walk added in v0.14.0

func (sl *SubscriberList) Walk(start uint64, callback func(s *LocalSubscriber) bool) uint64

type TopicSelectorStore

type TopicSelectorStore struct {
	// contains filtered or unexported fields
}

TopicSelectorStore caches compiled templates to improve memory and CPU usage.

func NewTopicSelectorStoreCache added in v0.21.0

func NewTopicSelectorStoreCache(maxEntriesPerShard int, shardCount uint64) (*TopicSelectorStore, error)

NewTopicSelectorStoreCache creates a TopicSelectorStore with a cache.

type TopicSelectorStoreCache added in v0.14.0

type TopicSelectorStoreCache interface {
	Get(key string) (any, bool)
	Set(key string, value any, n int64) bool
}

type Transport

type Transport interface {
	// Dispatch dispatches an update to all subscribers.
	Dispatch(ctx context.Context, u *Update) error

	// AddSubscriber adds a new subscriber to the transport.
	AddSubscriber(ctx context.Context, s *LocalSubscriber) error

	// RemoveSubscriber removes a subscriber from the transport.
	RemoveSubscriber(ctx context.Context, s *LocalSubscriber) error

	// Close closes the Transport.
	Close(ctx context.Context) error
}

Transport provides methods to dispatch and persist updates.

type TransportError added in v0.12.2

type TransportError struct {
	// contains filtered or unexported fields
}

TransportError is returned when the Transport's DSN is invalid.

func (*TransportError) Error added in v0.12.2

func (e *TransportError) Error() string

func (*TransportError) Unwrap added in v0.12.2

func (e *TransportError) Unwrap() error

type TransportSubscribers

type TransportSubscribers interface {
	// GetSubscribers gets the last event ID and the list of active subscribers at this time.
	GetSubscribers(ctx context.Context) (string, []*Subscriber, error)
}

TransportSubscribers provides a method to retrieve the list of active subscribers.

type TransportTopicSelectorStore added in v0.17.0

type TransportTopicSelectorStore interface {
	SetTopicSelectorStore(store *TopicSelectorStore)
}

TransportTopicSelectorStore provides a method to pass the TopicSelectorStore to the transport.

type Update

type Update struct {
	// The Server-Sent Event to send.
	Event

	// The topics' Internationalized Resource Identifier (RFC3987) (will most likely be URLs).
	// The first one is the canonical IRI, while next ones are alternate IRIs.
	Topics []string

	// Private updates can only be dispatched to subscribers authorized to receive them.
	Private bool

	// To print debug information
	Debug bool
}

Update represents an update to send to subscribers.

func (*Update) AssignUUID added in v0.21.0

func (u *Update) AssignUUID()

AssignUUID generates a new UUID an assign it to the given update if no ID is already set.

func (*Update) LogValue added in v0.21.0

func (u *Update) LogValue() slog.Value

Directories

Path Synopsis
caddy module
cmd
mercure command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL