fleet

package module
v0.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2025 License: MIT Imports: 57 Imported by: 1

README

Fleet

Go Reference

Fleet is a distributed peer-to-peer communication framework written in Go. It enables automatic peer discovery, secure communication, distributed operations, and synchronized data across a network of nodes (agents).

Features

  • Automatic Peer Discovery - Discovers and connects to peers using the Spot protocol
  • Secure Communication - TLS-encrypted connections with certificate verification and optional TPM key support
  • Distributed Database - Synchronized key-value store (BoltDB) with automatic replication and conflict resolution
  • Remote Procedure Calls - High-level gob-serialized and binary RPC with multiple broadcast patterns
  • Distributed Locking - Consensus-based locks for cluster-wide resource coordination
  • Network Services - QUIC transport for efficient, multiplexed connections

Installation

go get github.com/KarpelesLab/fleet

Requires Go 1.24 or later.

Quick Start

package main

import (
    "context"
    "log"

    "github.com/KarpelesLab/fleet"
)

func main() {
    // Create a new agent
    agent := fleet.New()

    // Wait for the agent to be ready
    agent.WaitReady()

    // Register an RPC endpoint
    fleet.SetRpcEndpoint("hello", func(data any) (any, error) {
        return "Hello from " + agent.Id(), nil
    })

    // Call RPC on a peer
    peers := agent.GetPeers()
    if len(peers) > 0 {
        result, err := agent.RPC(context.Background(), peers[0].Id(), "hello", nil)
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Response: %v", result)
    }
}

Core Concepts

Agent

The Agent is the main component representing a node in the fleet. It manages peer connections, database synchronization, and communication.

// Access the global agent singleton
agent := fleet.Self()

// Get agent identity
id := agent.Id()
name, hostname := agent.Name()
division := agent.Division()
Peers

Peers represent remote nodes in the fleet.

// Get all connected peers
peers := agent.GetPeers()

// Get a specific peer
peer := agent.GetPeer("peer-id")

// Get peer by name
peer := agent.GetPeerByName("node-name")

// Check peer status
if peer.IsAlive() {
    log.Printf("Peer %s is alive", peer.Name())
}
RPC Communication

Fleet supports multiple RPC patterns:

ctx := context.Background()

// Direct RPC to a specific peer
result, err := agent.RPC(ctx, "peer-id", "endpoint", data)

// Broadcast to all peers (fire-and-forget)
err := agent.BroadcastRpc(ctx, "endpoint", data)

// Query all peers and collect responses
results, err := agent.AllRPC(ctx, "endpoint", data)

// Route to least-busy peer in a division
err := agent.AnyRpc(ctx, "division", "endpoint", data)

// Target peers by division
results, err := agent.DivisionRpc(ctx, "us-west", "endpoint", data)
Distributed Database

Fleet provides a synchronized key-value store across all peers:

// Store data (automatically replicated)
err := agent.DbSet("mykey", []byte("myvalue"))

// Retrieve data
value, err := agent.DbGet("mykey")

// Delete data
err := agent.DbDelete("mykey")

// Watch for changes
agent.DbWatch("mykey", func(key string, val []byte) {
    log.Printf("Key %s changed to %s", key, string(val))
})

// Get keys with prefix
keys := agent.DbKeys("app", "prefix:")

Buckets:

  • app - User application data (default)
  • global - System-wide data
  • local - Non-replicated local data
Distributed Locking

Consensus-based distributed locks:

// Try to acquire a lock (non-blocking)
lock, err := agent.TryLock("resource-name")
if err != nil {
    log.Printf("Could not acquire lock: %v", err)
}

// Acquire a lock (blocking with context)
lock, err := agent.Lock(ctx, "resource-name")
if err != nil {
    log.Fatal(err)
}
defer lock.Release()

// Use the locked resource...
Network Services

Create custom services on the fleet network:

// Create a service listener
listener, err := agent.AddService("myservice")
if err != nil {
    log.Fatal(err)
}

// Accept connections
for {
    conn, err := listener.Accept()
    if err != nil {
        break
    }
    go handleConnection(conn)
}

// Connect to a peer's service
conn, err := agent.Connect("peer-id", "myservice")
Metadata

Store and broadcast peer-specific metadata:

// Set metadata (broadcast to all peers)
agent.MetaSet("status", "online")
agent.MetaSet("version", "1.0.0")

// Read peer metadata
for _, peer := range agent.GetPeers() {
    meta := peer.Meta()
    log.Printf("Peer %s status: %v", peer.Name(), meta["status"])
}

Architecture

Application Layer (RPC endpoints, DB operations, locks)
                ↓
    Packet Protocol Layer (Binary packets)
                ↓
    Database Sync Layer (Versioned records)
                ↓
      Network Layer (Spot + QUIC transport)
                ↓
       TLS/Encryption Layer
                ↓
        Physical Network

Packet Types:

  • PacketHandshake - Initial peer identification
  • PacketAnnounce - Periodic status updates
  • PacketRpc - RPC requests/responses
  • PacketDbRecord - Database synchronization
  • PacketSeed - Cluster seed exchange
  • PacketLock* - Distributed locking operations
  • PacketPing/Pong - Health monitoring

Build & Test

# Build with formatting
make

# Install dependencies
make deps

# Run all tests
make test

# Run a specific test
go test -v github.com/KarpelesLab/fleet -run TestName

Dependencies

Key dependencies:

  • quic-go - QUIC protocol implementation
  • bbolt - BoltDB key-value store
  • spotlib - Spot protocol for peer discovery
  • cryptutil - Cryptographic utilities

License

MIT License - see LICENSE for details.

Documentation

Overview

Package fleet provides a distributed peer-to-peer communication framework. It enables automatic peer discovery, secure communication, distributed locks, synchronized database, and remote procedure calls across a network of nodes.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Package fleet provides a distributed peer-to-peer communication framework.

Index

Constants

View Source
const (
	// PacketMaxLen is the maximum allowed size of a packet (32MB)
	PacketMaxLen = 32 * 1024 * 1024 // 32MB

	// PacketHeaderSize is the size of the packet header (6 bytes)
	// 2 bytes for packet code, 4 bytes for packet length
	PacketHeaderSize = 6

	// PacketMaxBody is the maximum allowed size of the packet body
	PacketMaxBody = PacketMaxLen - PacketHeaderSize

	// PacketLegacy is used for backward-compatible gob-encoded packets
	PacketLegacy = 0xffff

	// PacketPing is sent to check peer connectivity and measure latency
	PacketPing = 0x1001

	// PacketPong is the response to a ping packet
	PacketPong = 0x3001

	// PacketLockReq is sent to request a distributed lock
	PacketLockReq = 0x1002

	// PacketLockRes is the response to a lock request (Aye or Nay)
	PacketLockRes = 0x3002

	// PacketLockConfirm is sent when a lock is confirmed across the network
	PacketLockConfirm = 0x1003

	// PacketLockRelease is sent when a lock is released
	PacketLockRelease = 0x1004

	// PacketSeed is used to exchange cluster seed data
	PacketSeed = 0x1005

	// PacketRpcBinReq is a binary-format RPC request
	PacketRpcBinReq = 0x1006

	// PacketRpcBinRes is a binary-format RPC response
	PacketRpcBinRes = 0x3006

	// PacketClose is sent when closing a connection gracefully
	PacketClose = 0x1fff

	// PacketCustom is the base value for custom packet types
	// Applications can define their own packet types in this range
	PacketCustom = 0xa000

	// PacketCustomMax is the maximum value for custom packet types
	PacketCustomMax = 0xafff

	// Response values for lock requests
	Aye = 1 // Lock approved
	Nay = 0 // Lock denied
)

Constants defining packet sizes, types, and protocol values

View Source
const UUID_SEEDID_SPACE = "da736663-83ec-46ef-9c29-3f9102c5c519"

UUID namespace for generating deterministic seed IDs

Variables

View Source
var (
	// ErrWriteQueueFull is returned when attempting to send data to a peer whose
	// write queue is already at capacity, typically due to network congestion.
	ErrWriteQueueFull = errors.New("peer write queue is full")

	// ErrPeerNoRoute is returned when attempting to send data to a peer that
	// cannot be reached, either because it doesn't exist or because no path
	// exists to that peer.
	ErrPeerNoRoute = errors.New("no route to peer")

	// ErrConnectionClosed is returned when attempting to use a connection that
	// has already been closed, either by this node or the remote peer.
	ErrConnectionClosed = errors.New("connection has been closed")

	// ErrInvalidLegacy is returned when attempting an operation that is not
	// supported on legacy peers (peers using older protocol versions).
	ErrInvalidLegacy = errors.New("invalid operation on legacy peer")

	// ErrInvalidLockName is returned when attempting to acquire a lock with an
	// invalid name, such as an empty string.
	ErrInvalidLockName = errors.New("invalid lock name")

	// ErrCancelledLock is returned when a lock acquisition request is cancelled
	// externally, typically due to a competing lock.
	ErrCancelledLock = errors.New("lock request has been cancelled")

	// ErrEndpointNameLen is returned when an RPC endpoint name exceeds the
	// maximum allowed length (65535 bytes).
	ErrEndpointNameLen = errors.New("RPC endpoint name length too long")
)

Error constants used throughout the fleet library. These provide standardized errors for common failure conditions.

Functions

func CallRpcEndpoint added in v0.3.23

func CallRpcEndpoint(e string, p any) (res any, err error)

CallRpcEndpoint invokes the named RPC endpoint on the local machine. This is used internally when receiving RPC requests from remote peers.

Parameters:

  • e: The endpoint name
  • p: The payload data

Returns:

  • The result of the RPC call
  • An error if the endpoint doesn't exist or the call fails

func Custom added in v0.5.12

func Custom(v uint16) uint16

Custom returns a packet ID for a given custom packet type. This function ensures that custom packet IDs are within the allowed range (0xa000 - 0xafff).

Typically used as:

var MyCustomPacket = fleet.Custom(0)
var MyOtherPacket = fleet.Custom(1)

Parameters:

  • v: The offset from the base custom packet ID (0-4095)

Returns:

  • A valid custom packet ID

Panics if the value would exceed the allowed range

func EnsureDir

func EnsureDir(c string) error

func GetSelfsignedCertificate added in v0.8.3

func GetSelfsignedCertificate(n string) (*tls.Certificate, error)

GetSelfsignedCertificate is a utility function that returns a self-signed certificate for any given host name

All generated certificates are cached, and calling this method multiple times with the same name will return the same certificate for a few days, and will then generate a new certificate.

func IsReady added in v0.11.8

func IsReady() bool

IsReady returns whether the fleet subsystem is fully initialized and ready. This checks both that an Agent exists and that it reports ready status.

Returns:

  • true if the fleet is fully initialized and ready, false otherwise

func SetCustomHandler added in v0.5.12

func SetCustomHandler(pc uint16, h CustomHandler)

SetCustomHandler registers a handler function for a custom packet type. This allows applications to extend the fleet protocol with custom packet types and handling logic.

The packet code should be in the range 0xa000-0xafff, which can be obtained using the Custom() function.

Parameters:

  • pc: The packet code to register the handler for
  • h: The handler function to call when this packet type is received

func SetRpcEndpoint

func SetRpcEndpoint(e string, f RpcEndpoint)

SetRpcEndpoint registers a callback function for a named RPC endpoint. When an RPC call is received for this endpoint, the callback will be invoked.

Parameters:

  • e: The endpoint name
  • f: The callback function to handle requests to this endpoint

func UniqueTimestamp added in v0.6.15

func UniqueTimestamp() uint64

UniqueTimestamp returns a uint64 timestamp in microsecond that is unique, so that even if called multiple times in the same millisecond each call will return a different value.

This can be safely called from multiple threads, it does not lock.

Types

type Agent

type Agent struct {
	IP string // IP address as seen from outside

	Events *emitter.Hub // Event hub for subscribers to listen to events

	// File operations
	GetFile GetFileFunc // Callback for retrieving files
	// contains filtered or unexported fields
}

Agent is the core component of the fleet system. It manages connections with peers, handles communication, and coordinates distributed operations.

func New added in v0.5.0

func New(opts ...AgentOption) *Agent

New initializes a basic agent with the provided options. Options can be used to configure various aspects of the agent.

func Self added in v0.5.0

func Self() *Agent

Self returns the global Agent instance that was created by New(). If no Agent has been created yet, this function will block until one is available.

IMPORTANT: Due to the blocking behavior, this function should not be used in init() functions. Only use it in goroutines or after an Agent has been explicitly created.

Returns:

  • The global Agent instance

func WithGetFile added in v0.5.0

func WithGetFile(f GetFileFunc, opts ...AgentOption) *Agent

WithGetFile creates a new agent with a custom file retrieval function. This is a convenience wrapper around New() that sets the GetFile callback.

func WithIssuer added in v0.5.0

func WithIssuer(url string, opts ...AgentOption) *Agent

func (*Agent) AddService added in v0.5.0

func (a *Agent) AddService(service string) (net.Listener, error)

func (*Agent) AllRPC added in v0.5.7

func (a *Agent) AllRPC(ctx context.Context, endpoint string, data any) ([]any, error)

func (*Agent) AllRpcRequest added in v0.9.5

func (a *Agent) AllRpcRequest(ctx context.Context, endpoint string, data []byte) ([]any, error)

func (*Agent) AltNames added in v0.11.19

func (a *Agent) AltNames() []string

AltNames will attempt to return alternative names from the certificate issued to this node

func (*Agent) AnyRpc added in v0.5.0

func (a *Agent) AnyRpc(ctx context.Context, division string, endpoint string, data any) error

func (*Agent) BroadcastPacket added in v0.5.12

func (a *Agent) BroadcastPacket(ctx context.Context, pc uint16, data []byte) error

func (*Agent) BroadcastRpc added in v0.5.0

func (a *Agent) BroadcastRpc(ctx context.Context, endpoint string, data any) error

BroadcastRpc broadcasts the given data to the specified endpoint on all connected peers. This method sends the same RPC call to all peers in the network but doesn't wait for responses. It's useful for notifications or updates that need to propagate to all nodes.

Parameters:

  • ctx: Context for the operation, which can be used for cancellation
  • endpoint: The name of the RPC endpoint to call on each peer
  • data: The data to send to each peer (will be serialized)

Returns an error if the operation fails, or nil on success.

func (*Agent) BroadcastRpcBin added in v0.9.5

func (a *Agent) BroadcastRpcBin(ctx context.Context, endpoint string, pkt []byte) (n int, err error)

func (*Agent) CacheDir added in v0.6.9

func (a *Agent) CacheDir() string

func (*Agent) Close added in v0.5.0

func (a *Agent) Close()

Close shuts down the agent, closing all connections and resources. This should be called when the agent is no longer needed.

func (*Agent) ConfigureTlsServer added in v0.5.0

func (a *Agent) ConfigureTlsServer(cfg *tls.Config)

func (*Agent) Connect added in v0.5.0

func (a *Agent) Connect(id string, service string) (net.Conn, error)

connect to given peer under specified protocol (if supported)

func (*Agent) CountPeers added in v0.6.8

func (a *Agent) CountPeers() int

func (*Agent) DbDelete added in v0.8.2

func (a *Agent) DbDelete(key string) error

DbDelete removes a value from the shared fleet database. This deletion will be propagated to all peers in the fleet.

Parameters:

  • key: The key to delete

Returns:

  • An error if the operation fails

func (*Agent) DbGet added in v0.5.0

func (a *Agent) DbGet(key string) ([]byte, error)

DbGet retrieves a value from the shared fleet database. This is the primary method for applications to get data from the synchronized database.

Parameters:

  • key: The key to retrieve

Returns:

  • The value as a byte slice
  • An error if the key doesn't exist or if the operation fails

func (*Agent) DbKeys added in v0.12.20

func (a *Agent) DbKeys(bucket, prefix []byte) func(yield func(k, v []byte) bool)

func (*Agent) DbSet added in v0.5.0

func (a *Agent) DbSet(key string, value []byte) error

DbSet stores a value in the shared fleet database. This value will be automatically synchronized to all peers in the fleet.

Parameters:

  • key: The key to store the value under
  • value: The data to store

Returns:

  • An error if the operation fails

func (*Agent) DbWatch added in v0.5.0

func (a *Agent) DbWatch(key string, cb func(string, []byte))

DbWatch registers a callback function to be called when a key is updated. The callback will be triggered whenever the specified key changes in the database.

Special features: - Using "*" as the key will trigger the callback for all key changes - If the value in the callback is nil, it indicates the key was deleted

Parameters:

  • key: The key to watch, or "*" for all keys
  • cb: Callback function that receives the key and its new value

func (*Agent) DebugLocks added in v0.6.21

func (a *Agent) DebugLocks(w io.Writer)

DebugLocks writes debugging information about all locks to the provided writer. This is useful for diagnosing lock-related issues.

Parameters:

  • w: The writer to output debug information to

func (*Agent) Dial added in v0.5.0

func (a *Agent) Dial(network, addr string) (net.Conn, error)

func (*Agent) DialContext added in v0.5.0

func (a *Agent) DialContext(c context.Context, network, addr string) (net.Conn, error)

func (*Agent) Division added in v0.11.22

func (a *Agent) Division() string

Division returns the division (locality) of the local node

func (*Agent) DivisionPrefixRpc added in v0.5.0

func (a *Agent) DivisionPrefixRpc(ctx context.Context, divMatch string, endpoint string, data any) error

func (*Agent) DivisionRpc added in v0.5.0

func (a *Agent) DivisionRpc(ctx context.Context, division int, endpoint string, data any) error

func (*Agent) DumpInfo added in v0.5.0

func (a *Agent) DumpInfo(w io.Writer)

func (*Agent) ExternalKey added in v0.11.12

func (a *Agent) ExternalKey() (crypto.PrivateKey, error)

ExternalKey returns the key associated with the cluster, if any. If this host hasn't joined a cluster or the cluster has no shared key, this will return fs.ErrNotExist

func (*Agent) GenInternalCert added in v0.5.0

func (a *Agent) GenInternalCert() (tls.Certificate, error)

func (*Agent) GetCA added in v0.5.0

func (a *Agent) GetCA() (*x509.CertPool, error)

func (*Agent) GetClientTlsConfig added in v0.5.0

func (a *Agent) GetClientTlsConfig() (*tls.Config, error)

func (*Agent) GetInternalCertificate added in v0.7.0

func (a *Agent) GetInternalCertificate(h *tls.ClientHelloInfo) (*tls.Certificate, error)

return internal certificate (cached)

func (*Agent) GetInternalTlsConfig added in v0.5.11

func (a *Agent) GetInternalTlsConfig() (*tls.Config, error)

func (*Agent) GetPeer added in v0.5.0

func (a *Agent) GetPeer(id string) *Peer

func (*Agent) GetPeerByName added in v0.5.0

func (a *Agent) GetPeerByName(name string) *Peer

func (*Agent) GetPeers added in v0.5.0

func (a *Agent) GetPeers() []*Peer

func (*Agent) GetPeersCount added in v0.6.12

func (a *Agent) GetPeersCount() uint32

GetPeersCount return the number of existing peers, connected or not. The value may be more than the number of entries GetPeers will return as some peers may be down or unavailable.

func (*Agent) GetPublicCertificate added in v0.7.0

func (a *Agent) GetPublicCertificate(h *tls.ClientHelloInfo) (*tls.Certificate, error)

func (*Agent) GetStatus added in v0.10.0

func (a *Agent) GetStatus() int

GetStatus returns the current status of the agent. 0 = initializing/waiting, 1 = ready

func (*Agent) GetStringSetting added in v0.10.7

func (a *Agent) GetStringSetting(v string) string

func (*Agent) GetTlsConfig added in v0.5.0

func (a *Agent) GetTlsConfig() (*tls.Config, error)

GetTlsConfig returns TLS config suitable for making public facing ssl servers.

func (*Agent) Id added in v0.5.0

func (a *Agent) Id() string

Id returns the id of the local node

func (*Agent) InternalKey added in v0.11.12

func (a *Agent) InternalKey() (crypto.Signer, error)

InternalKey returns the key associated with the local host, possibly a TPM key if the host has a functioning tpm.

func (*Agent) IsConnected added in v0.5.0

func (a *Agent) IsConnected(id string) bool

func (*Agent) KeyShake128 deprecated added in v0.5.4

func (a *Agent) KeyShake128(N []byte) (sha3.ShakeHash, error)

KeyShake128 uses PKCS8 private key blob as hash key

Deprecated: won't work with TPM machines

func (*Agent) KeyShake256 deprecated added in v0.5.4

func (a *Agent) KeyShake256(N []byte) (sha3.ShakeHash, error)

KeySha256 uses PKCS8 private key blob as hash key

Deprecated: won't work with TPM machines

func (*Agent) Lock added in v0.6.12

func (a *Agent) Lock(ctx context.Context, name string) (*LocalLock, error)

Lock acquires a distributed lock with the given name. This implements a consensus algorithm across fleet peers to ensure only one node can hold a particular lock at any time.

The consensus algorithm works as follows: - If >= (1/2+1) of nodes respond "aye", the lock is confirmed - If >= (1/3+1) of nodes respond "nay", the lock fails and is retried - If neither threshold is reached within a timeout, the lock is retried

Parameters:

  • ctx: The context for the operation, which can cancel the lock attempt
  • name: The name of the lock to acquire

Returns:

  • A LocalLock that can be used to release the lock
  • An error if the lock cannot be acquired

func (*Agent) MetaSet added in v0.6.10

func (a *Agent) MetaSet(key string, value any)

func (*Agent) Name added in v0.5.0

func (a *Agent) Name() (string, string)

Name returns the name and hostname of the local node

func (*Agent) NewRpcInstance added in v0.9.4

func (a *Agent) NewRpcInstance(name string) (RPC, error)

NewRpcInstance creates a new RPC instance for a specific named endpoint. This allows applications to create communication channels for specific purposes.

Parameters:

  • name: The endpoint name, which should be unique for this application

Returns:

  • An RPC interface for the specified endpoint
  • An error if the operation fails

func (*Agent) RPC added in v0.5.0

func (a *Agent) RPC(ctx context.Context, id string, endpoint string, data any) (any, error)

RPC sends an RPC request to a specific peer and waits for a response. This is the primary method for making remote procedure calls to other nodes in the fleet.

Parameters:

  • ctx: Context for the operation, which can control timeouts and cancellation
  • id: The ID of the target peer
  • endpoint: The name of the RPC endpoint to call on the peer
  • data: The data to send with the request (will be serialized)

Returns:

  • The response data from the peer (deserialized)
  • An error if the operation fails or the remote endpoint returns an error

func (*Agent) RoundTripper added in v0.5.0

func (a *Agent) RoundTripper() http.RoundTripper

func (*Agent) RpcRequest added in v0.9.5

func (a *Agent) RpcRequest(ctx context.Context, id string, endpoint string, data []byte) ([]byte, error)

func (*Agent) RpcSend added in v0.9.5

func (a *Agent) RpcSend(ctx context.Context, id string, endpoint string, data []byte) error

RpcSend sends a request but expects no response, failure will only reported if the request failed to be sent, and failure on the other side will not be reported

func (*Agent) SeedCrypt added in v0.5.0

func (a *Agent) SeedCrypt(in []byte) ([]byte, error)

SeedCrypt encrypts data using AES-GCM with a key derived from the seed. This provides authenticated encryption for sensitive data.

Parameters:

  • in: The plaintext data to encrypt

Returns:

  • The encrypted data (nonce + ciphertext)
  • An error if encryption fails

func (*Agent) SeedDecrypt added in v0.5.0

func (a *Agent) SeedDecrypt(in []byte) ([]byte, error)

SeedDecrypt decrypts data that was encrypted with SeedCrypt. This verifies and decrypts data encrypted by any peer with the same seed.

Parameters:

  • in: The encrypted data (nonce + ciphertext)

Returns:

  • The decrypted plaintext
  • An error if decryption fails

func (*Agent) SeedId added in v0.5.0

func (a *Agent) SeedId() uuid.UUID

SeedId returns the UUID identifier for the cluster seed. This is a deterministic UUID generated from the seed data.

Returns:

  • UUID derived from the seed

func (*Agent) SeedShake128 added in v0.5.4

func (a *Agent) SeedShake128(N []byte) sha3.ShakeHash

SeedShake128 creates a new cSHAKE-128 hash instance customized with the seed. This provides a deterministic pseudo-random function that's consistent across all peers with the same seed.

Parameters:

  • N: The function name/customization string

Returns:

  • A ShakeHash instance for generating deterministic output

func (*Agent) SeedShake256 added in v0.5.4

func (a *Agent) SeedShake256(N []byte) sha3.ShakeHash

SeedShake256 creates a new cSHAKE-256 hash instance customized with the seed. This provides a deterministic pseudo-random function that's consistent across all peers with the same seed, with higher security than SeedShake128.

Parameters:

  • N: The function name/customization string

Returns:

  • A ShakeHash instance for generating deterministic output

func (*Agent) SeedSign added in v0.5.0

func (a *Agent) SeedSign(in []byte) []byte

SeedSign creates an HMAC signature for the input data using SHA3-256. This is used to authenticate messages between peers.

Parameters:

  • in: The data to sign

Returns:

  • The HMAC signature

func (*Agent) SeedTlsConfig added in v0.5.0

func (a *Agent) SeedTlsConfig(c *tls.Config)

SeedTlsConfig configures a TLS config object with session ticket keys derived from the seed. This ensures all nodes in the fleet use the same ticket keys, allowing session resumption between different nodes.

Parameters:

  • c: The TLS config to modify

func (*Agent) SendPacketTo added in v0.5.12

func (a *Agent) SendPacketTo(ctx context.Context, target string, pc uint16, data []byte) error

func (*Agent) SendTo added in v0.5.0

func (a *Agent) SendTo(ctx context.Context, target string, pkt any) error

func (*Agent) Settings added in v0.10.0

func (a *Agent) Settings() (map[string]any, error)

Settings fetches the current settings from the global system and returns these if the system is initializing, this will block until initialization is done

func (*Agent) Spot added in v0.12.18

func (a *Agent) Spot() *spotlib.Client

func (*Agent) SwitchChannel added in v0.8.7

func (a *Agent) SwitchChannel(channel string) error

SwitchChannel signals all nodes in the fleet to switch to the given update channel. This allows coordinated switching between different versions or release tracks (e.g., stable, beta, development) across the entire fleet.

The change is propagated to all nodes through the distributed database. All nodes will detect the change and update their channel accordingly.

Warning: Attempting to switch to a non-existing channel will trigger errors across the fleet, as each node tries to switch to an invalid channel.

Parameters:

  • channel: The update channel to switch to

Returns:

  • An error if the operation fails, nil otherwise

func (*Agent) WaitReady added in v0.10.0

func (a *Agent) WaitReady()

WaitReady blocks until the agent is ready for operation (connected to peers). This is useful for applications that need to ensure the agent is fully operational before proceeding with operations that depend on peer connectivity.

type AgentOption added in v0.6.12

type AgentOption interface {
	// contains filtered or unexported methods
}

AgentOption defines the interface for options that can be passed to the New() function when creating a new Agent. This follows the functional options pattern, allowing flexible configuration of agents.

To implement a new option type, create a type that has an apply(*Agent) method, which will be called during agent initialization.

func WithDivision added in v0.12.26

func WithDivision(division string) AgentOption

WithDivision returns an AgentOption that sets the agent's division

func WithID added in v0.12.26

func WithID(id string) AgentOption

WithID returns an AgentOption that sets the agent's ID

func WithName added in v0.12.26

func WithName(name string) AgentOption

WithName returns an AgentOption that sets the agent's name

type CustomHandler added in v0.5.12

type CustomHandler func(p *Peer, data []byte) error

CustomHandler is a function type for handling custom packet types. Applications can register handlers for custom packet codes in the range 0xa000-0xafff to extend the fleet protocol with application-specific functionality.

Parameters:

  • p: The peer that sent the packet
  • data: The raw packet data

Returns:

  • An error if handling fails, nil otherwise

type DbStamp

type DbStamp time.Time

DbStamp represents a timestamp for database entries. It's used to track when entries were created or modified and to resolve conflicts when the same key is modified on different peers.

DbStamp is a type alias for time.Time with specialized binary serialization that is consistent across all peers in the fleet.

func DbNow

func DbNow() DbStamp

DbNow returns the current time as a DbStamp. This is used when creating or updating database entries.

Returns:

  • A DbStamp representing the current time

func DbZero

func DbZero() DbStamp

DbZero returns the Unix epoch (Jan 1, 1970) as a DbStamp. This is used as a default or initial value.

Returns:

  • A DbStamp representing the Unix epoch

func (DbStamp) After

func (t DbStamp) After(t2 DbStamp) bool

After compares two DbStamps and returns whether this timestamp is after the other one. This is used for conflict resolution.

Parameters:

  • t2: The timestamp to compare against

Returns:

  • true if this timestamp is after t2, false otherwise

func (DbStamp) Bytes added in v0.5.7

func (t DbStamp) Bytes() []byte

Bytes returns a binary representation of the timestamp. This is used for network transmission and storage. The format is 16 bytes: - 8 bytes: seconds since epoch (big-endian uint64) - 8 bytes: nanoseconds part (big-endian uint64)

Returns:

  • Binary representation of the timestamp

func (*DbStamp) GobDecode

func (t *DbStamp) GobDecode(data []byte) error

GobDecode implements the gob.GobDecoder interface. This allows DbStamp to be reconstructed from its gob encoding.

Parameters:

  • data: Gob-encoded representation of a timestamp

Returns:

  • An error if decoding fails, nil otherwise

func (DbStamp) GobEncode

func (t DbStamp) GobEncode() ([]byte, error)

GobEncode implements the gob.GobEncoder interface. This allows DbStamp to be used with gob encoding.

Returns:

  • Gob-encoded representation of the timestamp
  • An error if encoding fails

func (DbStamp) MarshalBinary

func (t DbStamp) MarshalBinary() ([]byte, error)

MarshalBinary implements the encoding.BinaryMarshaler interface. This allows DbStamp to be used with binary encoding functions.

Returns:

  • Binary representation of the timestamp
  • Nil error (this method never fails)

func (DbStamp) String

func (t DbStamp) String() string

String returns a human-readable representation of the timestamp.

Returns:

  • String representation of the timestamp

func (DbStamp) Unix

func (t DbStamp) Unix() int64

Unix returns the DbStamp as Unix time (seconds since epoch).

Returns:

  • Seconds since Unix epoch

func (DbStamp) UnixNano

func (t DbStamp) UnixNano() int64

UnixNano returns the DbStamp as Unix time in nanoseconds.

Returns:

  • Nanoseconds since Unix epoch

func (*DbStamp) UnmarshalBinary

func (t *DbStamp) UnmarshalBinary(data []byte) error

UnmarshalBinary implements the encoding.BinaryUnmarshaler interface. This allows DbStamp to be reconstructed from its binary representation.

Parameters:

  • data: Binary representation of a timestamp (16 bytes)

Returns:

  • An error if unmarshaling fails, nil otherwise

type DbWatchCallback added in v0.3.16

type DbWatchCallback func(string, []byte)

DbWatchCallback is a function type for callbacks triggered on database changes. It receives the key that changed and its new value (or nil if deleted).

type GetFileFunc added in v0.5.0

type GetFileFunc func(*Agent, string) ([]byte, error)

GetFileFunc is a callback function type that retrieves files for the agent. Used primarily for certificate and configuration file loading.

type LocalLock added in v0.6.12

type LocalLock struct {
	// contains filtered or unexported fields
}

LocalLock represents a successfully acquired distributed lock. This is the type returned to users when they acquire a lock, and it provides methods to release the lock when done.

func (*LocalLock) Release added in v0.6.12

func (lk *LocalLock) Release()

Release releases a previously acquired lock. This method is safe to call multiple times (only the first call has effect).

type Packet

type Packet any

Packet is the generic interface for all packets exchanged between peers. Concrete packet types are registered with gob for serialization.

type PacketAnnounce

type PacketAnnounce struct {
	Id   string         // Peer identifier
	Now  time.Time      // Current timestamp
	Idx  uint64         // Announcement index (monotonically increasing)
	NumG uint32         // Number of goroutines (for load balancing)
	AZ   string         // Availability zone/division
	Meta map[string]any // Custom metadata
}

PacketAnnounce is sent periodically by peers to announce their presence and status. It contains current state information about the peer.

type PacketDbRecord

type PacketDbRecord struct {
	TargetId string  // Target peer ID
	SourceId string  // Source peer ID
	Stamp    DbStamp // Timestamp for versioning
	Bucket   []byte  // Database bucket (typically "app")
	Key, Val []byte  // Database key and value
}

PacketDbRecord is used to synchronize database records between peers. It contains a database key-value pair along with a timestamp.

type PacketDbRequest added in v0.3.14

type PacketDbRequest struct {
	TargetId string // Target peer ID
	SourceId string // Source peer ID
	Bucket   []byte // Database bucket (typically "app")
	Key      []byte // Database key to retrieve
}

PacketDbRequest requests a specific database record from a peer. The peer will respond with a PacketDbRecord containing the requested data.

type PacketDbVersions added in v0.3.14

type PacketDbVersions struct {
	Info []*PacketDbVersionsEntry // List of available database entries
}

PacketDbVersions signals what database records are available in a peer. It's typically sent when a connection is established to synchronize databases.

type PacketDbVersionsEntry added in v0.3.14

type PacketDbVersionsEntry struct {
	Stamp  DbStamp // Timestamp for versioning
	Bucket []byte  // Database bucket (typically "app")
	Key    []byte  // Database key
}

PacketDbVersionsEntry represents a single database record in the versions list. It contains metadata about the record but not the actual value.

type PacketHandshake

type PacketHandshake struct {
	Id       string    // Unique identifier for the peer
	Name     string    // Human-readable name
	Division string    // Logical grouping/location (e.g., datacenter)
	Now      time.Time // Current time, used for clock synchronization

	// Version information for updates
	Git     string // Git commit or tag
	Build   string // Build timestamp
	Channel string // Update channel
}

PacketHandshake is sent when a peer connection is first established. It contains identifying information about the peer.

type PacketRpc

type PacketRpc struct {
	TargetId string   // Target peer ID
	SourceId string   // Source peer ID
	Endpoint string   // RPC endpoint/method name
	R        rchan.Id // Response channel ID
	Data     any      // Payload data
}

PacketRpc carries RPC requests between peers. It's used to invoke remote procedures and pass serialized data.

type PacketRpcResponse

type PacketRpcResponse struct {
	TargetId string   // Target peer ID
	SourceId string   // Source peer ID
	R        rchan.Id // Response channel ID (matches request)
	Data     any      // Response data
	Error    string   // Error message (if any)
	HasError bool     // Whether an error occurred
}

PacketRpcResponse carries RPC responses back to the requestor. It contains the result of an RPC call or an error message.

type Peer

type Peer struct {
	Ping time.Duration // Measured ping time (RTT) to peer
	// contains filtered or unexported fields
}

Peer represents a remote node in the fleet network. Each peer has a unique identity, can exchange messages, and maintains status information about the connection.

func (*Peer) Addr added in v0.11.20

func (p *Peer) Addr() net.Addr

Addr returns the network address of the peer. This implements the net.Addr interface for compatibility with networking code.

func (*Peer) Agent added in v0.5.12

func (p *Peer) Agent() *Agent

Agent returns the Agent object associated with this peer

func (*Peer) Close added in v0.3.19

func (p *Peer) Close(reason string) error

func (*Peer) Division added in v0.5.12

func (p *Peer) Division() string

Division returns this peer's division string

func (*Peer) Id added in v0.5.12

func (p *Peer) Id() string

Id returns the peer's internal ID, which is unique and can be used to send packets to this peer specifically in the future.

func (*Peer) IsAlive added in v0.12.3

func (p *Peer) IsAlive() bool

IsAlive checks if the peer is still considered alive based on recent announcements. A peer is considered alive if we've received an announcement within the last 5 minutes. This is used to filter out dead peers when getting the list of active peers.

func (*Peer) Meta added in v0.6.10

func (p *Peer) Meta() map[string]any

func (*Peer) Name added in v0.5.12

func (p *Peer) Name() string

Name returns this peer's name

func (*Peer) RemoteAddr added in v0.8.11

func (p *Peer) RemoteAddr() net.Addr

func (*Peer) Send

func (p *Peer) Send(ctx context.Context, pkt Packet) error

Send sends a high-level packet to the peer. This method serializes the packet using gob encoding and sends it through the legacy packet protocol.

Parameters:

  • ctx: Context for the operation (for cancellation and timeout)
  • pkt: The packet to send (must be registered with gob.Register)

Returns:

  • An error if the operation fails, nil otherwise

func (*Peer) String added in v0.6.29

func (p *Peer) String() string

func (*Peer) WritePacket added in v0.5.7

func (p *Peer) WritePacket(ctx context.Context, pc uint16, data []byte) error

WritePacket sends a binary packet with a specified packet code to the peer. This is a lower-level method that sends raw binary data with a packet type identifier.

Parameters:

  • ctx: Context for the operation (for cancellation and timeout)
  • pc: Packet code identifying the type of packet (see const.go for codes)
  • data: Raw binary data to send

Returns:

  • An error if the operation fails, nil otherwise

type RPC added in v0.9.4

type RPC interface {
	// All sends data to all other RPC instances on the fleet with the same name
	// and collects responses from all peers. This is useful for querying all
	// peers for information or triggering actions across the entire fleet.
	All(ctx context.Context, data []byte) ([]any, error)

	// Broadcast sends data to all other RPC instances on the fleet with the same name
	// but does not wait for responses. This is efficient for notifications or
	// updates that don't require confirmation.
	Broadcast(ctx context.Context, data []byte) error

	// Request sends data to a specific peer's RPC instance with the same name
	// and returns the response. This is for peer-to-peer communication when
	// you need a reply from a specific node.
	Request(ctx context.Context, id string, data []byte) ([]byte, error)

	// Send sends data to a specific peer's RPC instance with the same name
	// but ignores any response. This is useful for fire-and-forget messages.
	Send(ctx context.Context, id string, data []byte) error

	// Self returns the ID of the local peer, which can be used by other peers
	// to direct messages to this instance using Request() or Send().
	Self() string

	// ListOnlinePeers returns a list of currently connected peer names.
	// This helps in discovering available peers for communication.
	ListOnlinePeers() []string

	// CountAllPeers returns the total number of known peers, whether they're
	// currently connected or not. This gives a sense of the fleet size.
	CountAllPeers() int

	// Connect registers a callback function that will be called whenever this
	// RPC instance receives a message. The callback can process the message
	// and optionally return a response.
	Connect(cb func(context.Context, []byte) ([]byte, error))
}

RPC defines the interface for a named RPC communication channel. Applications can create RPC instances for specific endpoints, making it easy to send and receive messages between peers in a structured way.

type RpcEndpoint

type RpcEndpoint func(any) (any, error)

RpcEndpoint represents a callback function for the legacy RPC system. It receives arbitrary data and returns a response and/or error.

type TestOption added in v0.12.26

type TestOption struct {
	// contains filtered or unexported fields
}

TestOption defines an AgentOption that can be used for testing purposes. This allows easier agent configuration in tests without modifying the main codebase.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL