informerwatcher

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

README

Casbin Informer Watcher

CI Go Report Card Go Reference License Release

Casbin Informer Watcher is a Kubernetes informer-based watcher for Casbin. This watcher enables real-time policy synchronization across multiple Casbin enforcer instances by watching Kubernetes Custom Resource Definitions (CRDs).

Features

  • Real-time Updates: Uses Kubernetes informers to watch CRD changes without periodic polling
  • Event-Driven: Reacts immediately to create, update, and delete events
  • Concurrency-Safe: Safe to use with SyncedEnforcer in multi-threaded environments
  • Graceful Reconnection: Handles disconnections and resource version drift automatically
  • Incremental Updates: Supports both full policy reload and incremental policy updates
  • GitOps Compatible: Changes applied via GitOps become effective quickly across all instances

Installation

go get github.com/casbin/casbin-informer-watcher

Usage

Basic Example
package main

import (
	"log"

	"github.com/casbin/casbin/v2"
	informerwatcher "github.com/casbin/casbin-informer-watcher"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	// Load Kubernetes configuration
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		log.Fatalf("Failed to load kubeconfig: %v", err)
	}

	// Create dynamic client
	client, err := dynamic.NewForConfig(config)
	if err != nil {
		log.Fatalf("Failed to create dynamic client: %v", err)
	}

	// Define the GVR for your policy CRD
	gvr := schema.GroupVersionResource{
		Group:    "casbin.org",
		Version:  "v1",
		Resource: "policies",
	}

	// Create the watcher
	watcher, err := informerwatcher.NewWatcher(client, gvr, "default", informerwatcher.WatcherOptions{})
	if err != nil {
		log.Fatalf("Failed to create watcher: %v", err)
	}
	defer watcher.Close()

	// Initialize the enforcer
	e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv")
	if err != nil {
		log.Fatalf("Failed to create enforcer: %v", err)
	}

	// Set the watcher for the enforcer
	err = e.SetWatcher(watcher)
	if err != nil {
		log.Fatalf("Failed to set watcher: %v", err)
	}

	// By default, the watcher's callback is automatically set to the
	// enforcer's LoadPolicy() in the SetWatcher() call.
	// You can change it by explicitly setting a callback.
	err = watcher.SetUpdateCallback(informerwatcher.DefaultUpdateCallback(e))
	if err != nil {
		log.Fatalf("Failed to set callback: %v", err)
	}

	log.Println("Watcher is running and monitoring policy changes...")
	select {} // Keep the program running
}
Advanced Example with Custom Options
package main

import (
	"log"
	"time"

	"github.com/casbin/casbin/v2"
	informerwatcher "github.com/casbin/casbin-informer-watcher"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	// Load Kubernetes configuration
	config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	if err != nil {
		log.Fatalf("Failed to load kubeconfig: %v", err)
	}

	// Create dynamic client
	client, err := dynamic.NewForConfig(config)
	if err != nil {
		log.Fatalf("Failed to create dynamic client: %v", err)
	}

	// Define the GVR for your policy CRD
	gvr := schema.GroupVersionResource{
		Group:    "casbin.org",
		Version:  "v1",
		Resource: "policies",
	}

	// Create watcher with custom options
	options := informerwatcher.WatcherOptions{
		LocalID:      "instance-1",          // Custom instance identifier
		IgnoreSelf:   true,                  // Ignore updates from this instance
		ResyncPeriod: 30 * time.Second,      // Resync period with API server
	}

	watcher, err := informerwatcher.NewWatcher(client, gvr, "default", options)
	if err != nil {
		log.Fatalf("Failed to create watcher: %v", err)
	}
	defer watcher.Close()

	// Initialize the enforcer
	e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv")
	if err != nil {
		log.Fatalf("Failed to create enforcer: %v", err)
	}

	// Set the watcher
	err = e.SetWatcher(watcher)
	if err != nil {
		log.Fatalf("Failed to set watcher: %v", err)
	}

	// Custom callback that logs updates
	customCallback := func(msg string) {
		log.Printf("Policy update received: %s\n", msg)
		informerwatcher.DefaultUpdateCallback(e)(msg)
	}

	err = watcher.SetUpdateCallback(customCallback)
	if err != nil {
		log.Fatalf("Failed to set callback: %v", err)
	}

	log.Println("Watcher is running with custom configuration...")
	select {} // Keep the program running
}
With SyncedEnforcer
package main

import (
	"log"

	"github.com/casbin/casbin/v2"
	informerwatcher "github.com/casbin/casbin-informer-watcher"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	// Setup client and GVR (same as basic example)
	config, _ := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
	client, _ := dynamic.NewForConfig(config)

	gvr := schema.GroupVersionResource{
		Group:    "casbin.org",
		Version:  "v1",
		Resource: "policies",
	}

	watcher, _ := informerwatcher.NewWatcher(client, gvr, "default", informerwatcher.WatcherOptions{})
	defer watcher.Close()

	// Use SyncedEnforcer for concurrency-safe operations
	e, err := casbin.NewSyncedEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv")
	if err != nil {
		log.Fatalf("Failed to create synced enforcer: %v", err)
	}

	err = e.SetWatcher(watcher)
	if err != nil {
		log.Fatalf("Failed to set watcher: %v", err)
	}

	log.Println("SyncedEnforcer is running with watcher...")
	select {}
}

Configuration Options

WatcherOptions
  • LocalID (string): Unique identifier for this watcher instance. Auto-generated if not provided.
  • IgnoreSelf (bool): If true, ignores updates triggered by this watcher instance. Default: false.
  • ResyncPeriod (time.Duration): Period for the informer to resync with the API server. Default: 30 seconds.
  • OptionalUpdateCallback (func(string)): Optional callback function set during initialization.

How It Works

  1. The watcher uses Kubernetes informers to monitor Custom Resource Definitions (CRDs) that represent Casbin policies.
  2. When a CRD is created, updated, or deleted, the informer triggers the corresponding event handler.
  3. The event handler processes the change and invokes the registered callback function.
  4. The callback typically triggers the enforcer to reload its policy or apply incremental updates.
  5. All running instances with the same watcher configuration receive the same updates, keeping policies synchronized.

Supported Update Types

The watcher supports all standard Casbin policy update operations:

  • Update: Full policy reload
  • UpdateForAddPolicy: Add a single policy rule
  • UpdateForRemovePolicy: Remove a single policy rule
  • UpdateForAddPolicies: Add multiple policy rules
  • UpdateForRemovePolicies: Remove multiple policy rules
  • UpdateForRemoveFilteredPolicy: Remove filtered policy rules
  • UpdateForUpdatePolicy: Update a single policy rule
  • UpdateForUpdatePolicies: Update multiple policy rules
  • UpdateForSavePolicy: Save policy to storage

Custom Resource Definition (CRD) Example

You'll need to define a CRD for your Casbin policies. Here's a basic example:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: policies.casbin.org
spec:
  group: casbin.org
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                policy:
                  type: string
  scope: Namespaced
  names:
    plural: policies
    singular: policy
    kind: Policy

Testing

Run the test suite:

go test -v ./...

Run tests with coverage:

go test -v -coverprofile=coverage.out ./...
go tool cover -html=coverage.out

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

See Also

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultUpdateCallback

func DefaultUpdateCallback(e casbin.IEnforcer) func(string)

DefaultUpdateCallback returns the default callback function for policy updates.

func NewWatcher

func NewWatcher(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, options WatcherOptions) (persist.Watcher, error)

NewWatcher creates a new Watcher instance.

Types

type MSG

type MSG struct {
	Method      UpdateType `json:"method"`
	ID          string     `json:"id"`
	Sec         string     `json:"sec,omitempty"`
	Ptype       string     `json:"ptype,omitempty"`
	OldRule     []string   `json:"oldRule,omitempty"`
	OldRules    [][]string `json:"oldRules,omitempty"`
	NewRule     []string   `json:"newRule,omitempty"`
	NewRules    [][]string `json:"newRules,omitempty"`
	FieldIndex  int        `json:"fieldIndex,omitempty"`
	FieldValues []string   `json:"fieldValues,omitempty"`
}

MSG represents a policy update message.

func (*MSG) MarshalBinary

func (m *MSG) MarshalBinary() ([]byte, error)

MarshalBinary implements binary marshaling for MSG.

func (*MSG) UnmarshalBinary

func (m *MSG) UnmarshalBinary(data []byte) error

UnmarshalBinary implements binary unmarshaling for MSG.

type UpdateType

type UpdateType string

UpdateType represents the type of policy update.

const (
	Update                        UpdateType = "Update"
	UpdateForAddPolicy            UpdateType = "UpdateForAddPolicy"
	UpdateForRemovePolicy         UpdateType = "UpdateForRemovePolicy"
	UpdateForRemoveFilteredPolicy UpdateType = "UpdateForRemoveFilteredPolicy"
	UpdateForSavePolicy           UpdateType = "UpdateForSavePolicy"
	UpdateForAddPolicies          UpdateType = "UpdateForAddPolicies"
	UpdateForRemovePolicies       UpdateType = "UpdateForRemovePolicies"
	UpdateForUpdatePolicy         UpdateType = "UpdateForUpdatePolicy"
	UpdateForUpdatePolicies       UpdateType = "UpdateForUpdatePolicies"
)

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher implements the persist.Watcher interface for Kubernetes CRD-based policy updates.

func (*Watcher) Close

func (w *Watcher) Close()

Close stops the watcher and releases resources.

func (*Watcher) SetUpdateCallback

func (w *Watcher) SetUpdateCallback(callback func(string)) error

SetUpdateCallback sets the callback function to be called when a policy update is detected.

func (*Watcher) Update

func (w *Watcher) Update() error

Update triggers an update notification to other instances.

func (*Watcher) UpdateForAddPolicies

func (w *Watcher) UpdateForAddPolicies(sec string, ptype string, rules ...[]string) error

UpdateForAddPolicies triggers an update for AddPolicies.

func (*Watcher) UpdateForAddPolicy

func (w *Watcher) UpdateForAddPolicy(sec string, ptype string, params ...string) error

UpdateForAddPolicy triggers an update for AddPolicy.

func (*Watcher) UpdateForPolicy

func (w *Watcher) UpdateForPolicy(method UpdateType, sec string, ptype string, params []string, rules [][]string, fieldIndex int) error

UpdateForPolicy is a helper method for triggering policy updates.

func (*Watcher) UpdateForPolicyWithFieldIndex

func (w *Watcher) UpdateForPolicyWithFieldIndex(method UpdateType, sec string, ptype string, fieldIndex int, fieldValues ...string) error

UpdateForPolicyWithFieldIndex is a helper method for filtered policy updates.

func (*Watcher) UpdateForRemoveFilteredPolicy

func (w *Watcher) UpdateForRemoveFilteredPolicy(sec string, ptype string, fieldIndex int, fieldValues ...string) error

UpdateForRemoveFilteredPolicy triggers an update for RemoveFilteredPolicy.

func (*Watcher) UpdateForRemovePolicies

func (w *Watcher) UpdateForRemovePolicies(sec string, ptype string, rules ...[]string) error

UpdateForRemovePolicies triggers an update for RemovePolicies.

func (*Watcher) UpdateForRemovePolicy

func (w *Watcher) UpdateForRemovePolicy(sec string, ptype string, params ...string) error

UpdateForRemovePolicy triggers an update for RemovePolicy.

func (*Watcher) UpdateForSavePolicy

func (w *Watcher) UpdateForSavePolicy(sec string, ptype string, params ...string) error

UpdateForSavePolicy triggers an update for SavePolicy.

func (*Watcher) UpdateForUpdatePolicies

func (w *Watcher) UpdateForUpdatePolicies(sec string, ptype string, oldRules, newRules [][]string) error

UpdateForUpdatePolicies triggers an update for UpdatePolicies.

func (*Watcher) UpdateForUpdatePolicy

func (w *Watcher) UpdateForUpdatePolicy(sec string, ptype string, oldRule, newRule []string) error

UpdateForUpdatePolicy triggers an update for UpdatePolicy.

type WatcherOptions

type WatcherOptions struct {
	// LocalID is a unique identifier for this watcher instance.
	// If empty, a UUID will be generated.
	LocalID string

	// IgnoreSelf determines whether to ignore updates triggered by this watcher instance.
	IgnoreSelf bool

	// ResyncPeriod is the period for the informer to resync with the API server.
	// Default is 30 seconds.
	ResyncPeriod time.Duration

	// OptionalUpdateCallback is an optional callback function that can be set during initialization.
	OptionalUpdateCallback func(string)
}

WatcherOptions configures the Watcher behavior.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL