streamsql

package module
v0.10.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2025 License: Apache-2.0 Imports: 8 Imported by: 1

README

StreamSQL

GoDoc Go Report CI RELEASE codecov Mentioned in Awesome Go

English| 简体中文

StreamSQL is a lightweight, SQL-based stream processing engine for IoT edge, enabling efficient data processing and analysis on unbounded streams.

📖 Documentation | Similar to: Apache Flink

Features

  • Lightweight
    • Pure in-memory operations
    • No dependencies
  • Data processing with SQL syntax
    • Nested field access: Support dot notation syntax (device.info.name) for accessing nested structured data
  • Data analysis
    • Built-in multiple window types: sliding window, tumbling window, counting window
    • Built-in aggregate functions: MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc.
    • Support for group-by aggregation
    • Support for filtering conditions
  • High extensibility
    • Flexible function extension provided
    • Integration with the RuleGo ecosystem to expand input and output sources using RuleGo components
  • Integration with RuleGo
    • Utilize the rich and flexible input, output, and processing components of RuleGo to achieve data source access and integration with third-party systems

Installation

go get github.com/rulego/streamsql

Usage

StreamSQL supports two main processing modes for different business scenarios:

Non-Aggregation Mode - Real-time Data Transformation and Filtering

Suitable for scenarios requiring real-time response and low latency, where each data record is processed and output immediately.

Typical Use Cases:

  • Data Cleaning: Clean and standardize dirty data from IoT devices
  • Real-time Alerting: Monitor key metrics and alert immediately when thresholds are exceeded
  • Data Enrichment: Add calculated fields and business labels to raw data
  • Format Conversion: Convert data to formats required by downstream systems
  • Data Routing: Route data to different processing channels based on content
package main

import (
	"fmt"
	"time"
	"github.com/rulego/streamsql"
)

func main() {
	// Create StreamSQL instance
	ssql := streamsql.New()
	defer ssql.Stop()

	// Non-aggregation SQL: Real-time data transformation and filtering
	// Feature: Each input data is processed immediately, no need to wait for windows
	rsql := `SELECT deviceId, 
	                UPPER(deviceType) as device_type,
	                temperature * 1.8 + 32 as temp_fahrenheit,
	                CASE WHEN temperature > 30 THEN 'hot'
	                     WHEN temperature < 15 THEN 'cold'
	                     ELSE 'normal' END as temp_category,
	                CONCAT(location, '-', deviceId) as full_identifier,
	                NOW() as processed_time
	         FROM stream 
	         WHERE temperature > 0 AND STARTSWITH(deviceId, 'sensor')`

	err := ssql.Execute(rsql)
	if err != nil {
		panic(err)
	}

	// Handle real-time transformation results
    ssql.AddSink(func(results []map[string]interface{}) {
        fmt.Printf("Real-time result: %+v\n", results)
    })

	// Simulate sensor data input
	sensorData := []map[string]interface{}{
		{
			"deviceId":     "sensor001",
			"deviceType":   "temperature", 
			"temperature":  25.0,
			"location":     "warehouse-A",
		},
		{
			"deviceId":     "sensor002",
			"deviceType":   "humidity",
			"temperature":  32.5,
			"location":     "warehouse-B", 
		},
		{
			"deviceId":     "pump001",  // Will be filtered out
			"deviceType":   "actuator",
			"temperature":  20.0,
			"location":     "factory",
		},
	}

	// Process data one by one, each will output results immediately
	for _, data := range sensorData {
		ssql.Emit(data)
        //changedData,err:=ssql.EmitSync(data) //Synchronize to obtain processing results
		time.Sleep(100 * time.Millisecond) // Simulate real-time data arrival
	}

	time.Sleep(500 * time.Millisecond) // Wait for processing completion
}
Aggregation Mode - Windowed Statistical Analysis

Suitable for scenarios requiring statistical analysis and batch processing, collecting data over a period of time for aggregated computation.

Typical Use Cases:

  • Monitoring Dashboard: Display real-time statistical charts of device operational status
  • Performance Analysis: Analyze key metrics like QPS, latency, etc.
  • Anomaly Detection: Detect data anomalies based on statistical models
  • Report Generation: Generate various business reports periodically
  • Trend Analysis: Analyze data trends and patterns
package main

import (
	"context"
	"fmt"
	"time"

	"math/rand"
	"sync"
	"github.com/rulego/streamsql"
)

// StreamSQL Usage Example
// This example demonstrates the complete workflow of StreamSQL: from instance creation to data processing and result handling
func main() {
	// Step 1: Create StreamSQL Instance
	// StreamSQL is the core component of the stream SQL processing engine, managing the entire stream processing lifecycle
	ssql := streamsql.New()
    defer ssql.Stop()
	// Step 2: Define Stream SQL Query Statement
	// This SQL statement showcases StreamSQL's core capabilities:
	// - SELECT: Choose output fields and aggregation functions
	// - FROM stream: Specify the data source as stream data
	// - WHERE: Filter condition, excluding device3 data
	// - GROUP BY: Group by deviceId, combined with tumbling window for aggregation
	// - TumblingWindow('5s'): 5-second tumbling window, triggers computation every 5 seconds
	// - avg(), min(): Aggregation functions for calculating average and minimum values
	// - window_start(), window_end(): Window functions to get window start and end times
	rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," +
		"window_start() as start,window_end() as end FROM  stream  where deviceId!='device3' group by deviceId,TumblingWindow('5s')"
	
	// Step 3: Execute SQL Statement and Start Stream Analysis Task
	// The Execute method parses SQL, builds execution plan, initializes window manager and aggregators
	err := ssql.Execute(rsql)
	if err != nil {
		panic(err)
	}
	
	// Step 4: Setup Test Environment and Concurrency Control
	var wg sync.WaitGroup
	wg.Add(1)
	// Set 30-second test timeout to prevent infinite running
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	
	// Step 5: Start Data Producer Goroutine
	// Simulate real-time data stream, continuously feeding data into StreamSQL
	go func() {
		defer wg.Done()
		// Create ticker to trigger data generation every second
		ticker := time.NewTicker(1 * time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				// Generate 10 random test data points per second, simulating high-frequency data stream
				// This data density tests StreamSQL's real-time processing capability
				for i := 0; i < 10; i++ {
					// Construct device data containing deviceId, temperature, and humidity
					randomData := map[string]interface{}{
						"deviceId":    fmt.Sprintf("device%d", rand.Intn(2)+1), // Randomly select device1 or device2
						"temperature": 20.0 + rand.Float64()*10,                // Temperature range: 20-30 degrees
						"humidity":    50.0 + rand.Float64()*20,                // Humidity range: 50-70%
					}
					// Add data to stream, triggering StreamSQL's real-time processing
                    // Emit distributes data to corresponding windows and aggregators
                    ssql.Emit(randomData)
				}

			case <-ctx.Done():
				// Timeout or cancellation signal, stop data generation
				return
			}
		}
	}()

	// Step 6: Setup Result Processing Pipeline
	resultChan := make(chan interface{})
	// Add computation result callback function (Sink)
    // When window triggers computation, results are output through this callback
    ssql.AddSink(func(results []map[string]interface{}) {
        resultChan <- results
    })
	
	// Step 7: Start Result Consumer Goroutine
	// Count received results for effect verification
	resultCount := 0
	go func() {
		for result := range resultChan {
			// Print results when window computation is triggered (every 5 seconds)
			// This demonstrates StreamSQL's window-based aggregation results
			fmt.Printf("Window Result [%s]: %v\n", time.Now().Format("15:04:05.000"), result)
			resultCount++
		}
	}()
	
	// Step 8: Wait for Processing Completion
	// Wait for data producer goroutine to finish (30-second timeout or manual cancellation)
	wg.Wait()
	
	// Step 9: Display Final Statistics
	// Show total number of window results received during the test period
	fmt.Printf("\nTotal window results received: %d\n", resultCount)
	fmt.Println("StreamSQL processing completed successfully!")
}
Nested Field Access

StreamSQL supports querying nested structured data using dot notation (.) syntax to access nested fields:

// Nested field access example
package main

import (
	"fmt"
	"time"
	"github.com/rulego/streamsql"
)

func main() {
	ssql := streamsql.New()
	defer ssql.Stop()

	// SQL query using nested fields - supports dot notation syntax for accessing nested structures
	rsql := `SELECT device.info.name as device_name, 
	                device.location,
	                AVG(sensor.temperature) as avg_temp,
	                COUNT(*) as sensor_count,
	                window_start() as start,
	                window_end() as end
	         FROM stream 
	         WHERE device.info.type = 'temperature'
	         GROUP BY device.location, TumblingWindow('5s')`

	err := ssql.Execute(rsql)
	if err != nil {
		panic(err)
	}

	// Handle aggregation results
    ssql.AddSink(func(results []map[string]interface{}) {
        fmt.Printf("Aggregation result: %+v\n", results)
    })

	// Add nested structured data
	nestedData := map[string]interface{}{
		"device": map[string]interface{}{
			"info": map[string]interface{}{
				"name": "temperature-sensor-001",
				"type": "temperature",
			},
			"location": "smart-greenhouse-A",
		},
		"sensor": map[string]interface{}{
			"temperature": 25.5,
			"humidity":    60.2,
		},
		"timestamp": time.Now().Unix(),
	}

	ssql.Emit(nestedData)
}

Functions

StreamSQL supports a variety of function types, including mathematical, string, conversion, aggregate, analytic, window, and more. Documentation

Concepts

Processing Modes

StreamSQL supports two main processing modes:

Aggregation Mode (Windowed Processing)

Used when the SQL query contains aggregate functions (SUM, AVG, COUNT, etc.) or GROUP BY clauses. Data is collected in windows and aggregated results are output when windows are triggered.

Non-Aggregation Mode (Real-time Processing)

Used for immediate data transformation and filtering without aggregation operations. Each input record is processed and output immediately, providing ultra-low latency for real-time scenarios like data cleaning, enrichment, and filtering.

Windows

Since stream data is unbounded, it cannot be processed as a whole. Windows provide a mechanism to divide unbounded data into a series of bounded data segments for computation. StreamSQL includes the following types of windows:

  • Sliding Window

    • Definition: A time-based window that slides forward at fixed time intervals. For example, it slides every 10 seconds.
    • Characteristics: The size of the window is fixed, but the starting point of the window is continuously updated over time. It is suitable for real-time statistical analysis of data within continuous time periods.
    • Application Scenario: In intelligent transportation systems, the vehicle traffic is counted every 10 seconds over the past 1 minute.
  • Tumbling Window

    • Definition: A time-based window that does not overlap and is completely independent. For example, a window is generated every 1 minute.
    • Characteristics: The size of the window is fixed, and the windows do not overlap with each other. It is suitable for overall analysis of data within fixed time periods.
    • Application Scenario: In smart agriculture monitoring systems, the temperature and humidity of the farmland are counted every hour within that hour.
  • Count Window

    • Definition: A window based on the number of data records, where the window size is determined by the number of data records. For example, a window is generated every 100 data records.
    • Characteristics: The size of the window is not related to time but is divided based on the volume of data. It is suitable for segmenting data based on the amount of data.
    • Application Scenario: In industrial IoT, an aggregation calculation is performed every time 100 device status data records are processed.
  • Session Window

    • Definition: A dynamic window based on data activity. When the interval between data exceeds a specified timeout, the current session ends and triggers the window.
    • Characteristics: Window size changes dynamically, automatically dividing sessions based on data arrival intervals. When data arrives continuously, the session continues; when the data interval exceeds the timeout, the session ends and triggers the window.
    • Application Scenario: In user behavior analysis, maintain a session when users operate continuously, and close the session and count operations within that session when users stop operating for more than 5 minutes.
Stream
  • Definition: A continuous sequence of data that is generated in an unbounded manner, typically from sensors, log systems, user behaviors, etc.
  • Characteristics: Stream data is real-time, dynamic, and unbounded, requiring timely processing and analysis.
  • Application Scenario: Real-time data streams generated by IoT devices, such as temperature sensor data and device status data.
Time Semantics

StreamSQL supports two time concepts that determine how windows are divided and triggered:

Event Time
  • Definition: Event time refers to the actual time when data was generated, usually recorded in a field within the data itself (such as event_time, timestamp, order_time, etc.).
  • Characteristics:
    • Windows are divided based on timestamp field values in the data
    • Even if data arrives late, it can be correctly counted into the corresponding window based on event time
    • Uses Watermark mechanism to handle out-of-order and late data
    • Results are accurate but may have delays (need to wait for late data)
  • Use Cases:
    • Scenarios requiring precise temporal analysis
    • Scenarios where data may arrive out of order or delayed
    • Historical data replay and analysis
  • Configuration: Use WITH (TIMESTAMP='field_name', TIMEUNIT='ms') to specify the event time field
  • Example:
    SELECT deviceId, COUNT(*) as cnt
    FROM stream
    GROUP BY deviceId, TumblingWindow('1m')
    WITH (TIMESTAMP='eventTime', TIMEUNIT='ms')
    
Processing Time
  • Definition: Processing time refers to the time when data arrives at the StreamSQL processing system, i.e., the current time when the system receives the data.
  • Characteristics:
    • Windows are divided based on the time data arrives at the system (time.Now())
    • Regardless of the time field value in the data, it is counted into the current window based on arrival time
    • Uses system clock (Timer) to trigger windows
    • Low latency but results may be inaccurate (cannot handle out-of-order and late data)
  • Use Cases:
    • Real-time monitoring and alerting scenarios
    • Scenarios with high latency requirements and relatively low accuracy requirements
    • Scenarios where data arrives in order and delay is controllable
  • Configuration: Default when WITH (TIMESTAMP=...) is not specified
  • Example:
    SELECT deviceId, COUNT(*) as cnt
    FROM stream
    GROUP BY deviceId, TumblingWindow('1m')
    -- No WITH clause specified, defaults to processing time
    
Event Time vs Processing Time Comparison
Feature Event Time Processing Time
Time Source Timestamp field in data System current time
Window Division Based on event timestamp Based on data arrival time
Late Data Handling Supported (Watermark mechanism) Not supported
Out-of-Order Handling Supported (Watermark mechanism) Not supported
Result Accuracy Accurate May be inaccurate
Processing Latency Higher (need to wait for late data) Low (real-time trigger)
Configuration WITH (TIMESTAMP='field') Default (no WITH clause)
Use Cases Precise temporal analysis, historical replay Real-time monitoring, low latency requirements
Window Time
  • Window Start Time

    • Event Time Windows: The starting time point of the window, aligned to window boundaries based on event time (e.g., aligned to minute or hour boundaries).
    • Processing Time Windows: The starting time point of the window, based on the time data arrives at the system.
    • Example: For an event-time-based tumbling window TumblingWindow('5m'), the window start time aligns to multiples of 5 minutes (e.g., 10:00, 10:05, 10:10).
  • Window End Time

    • Event Time Windows: The ending time point of the window, usually the window start time plus the window duration. Windows trigger when watermark >= window_end.
    • Processing Time Windows: The ending time point of the window, based on the time data arrives at the system plus the window duration. Windows trigger when the system clock reaches the end time.
    • Example: For a tumbling window with a duration of 1 minute, if the window start time is 10:00, then the window end time is 10:01.
Watermark Mechanism (Event Time Windows Only)
  • Definition: Watermark indicates "events with timestamps less than this time should not arrive anymore", used to determine when windows can trigger.
  • Calculation Formula: Watermark = max(event_time) - MaxOutOfOrderness
  • Window Trigger Condition: Windows trigger when watermark >= window_end
  • Configuration Parameters:
    • MAXOUTOFORDERNESS: Maximum allowed out-of-order time for tolerating data disorder (default: 0, no out-of-order allowed)
    • ALLOWEDLATENESS: Time window can accept late data after triggering (default: 0, no late data accepted)
    • IDLETIMEOUT: Timeout for advancing Watermark based on processing time when data source is idle (default: 0, disabled)
  • Example:
    SELECT deviceId, COUNT(*) as cnt
    FROM stream
    GROUP BY deviceId, TumblingWindow('5m')
    WITH (
        TIMESTAMP='eventTime',
        TIMEUNIT='ms',
        MAXOUTOFORDERNESS='5s',  -- Tolerate 5 seconds of out-of-order
        ALLOWEDLATENESS='2s',     -- Accept 2 seconds of late data after window triggers
        IDLETIMEOUT='5s'          -- Advance watermark based on processing time after 5s of no data
    )
    

Contribution Guidelines

Pull requests and issues are welcome. Please ensure that the code conforms to Go standards and include relevant test cases.

License

Apache License 2.0

Documentation

Overview

Package streamsql is a lightweight, SQL-based IoT edge stream processing engine.

StreamSQL provides efficient unbounded data stream processing and analysis capabilities, supporting multiple window types, aggregate functions, custom functions, and seamless integration with the RuleGo ecosystem.

Core Features

• Lightweight design - Pure in-memory operations, no external dependencies • SQL syntax support - Process stream data using familiar SQL syntax • Multiple window types - Sliding, tumbling, counting, and session windows • Event time and processing time - Support both time semantics for accurate stream processing • Watermark mechanism - Handle out-of-order and late-arriving data with configurable tolerance • Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc. • Plugin-based custom functions - Runtime dynamic registration, supports 8 function types • RuleGo ecosystem integration - Extend input/output sources using RuleGo components

Getting Started

Basic stream data processing:

package main

import (
	"fmt"
	"math/rand"
	"time"
	"github.com/rulego/streamsql"
)

func main() {
	// Create StreamSQL instance
	ssql := streamsql.New()

	// Define SQL query - Calculate temperature average by device ID every 5 seconds
	sql := `SELECT deviceId,
		AVG(temperature) as avg_temp,
		MIN(humidity) as min_humidity,
		window_start() as start,
		window_end() as end
	FROM stream
	WHERE deviceId != 'device3'
	GROUP BY deviceId, TumblingWindow('5s')`

	// Execute SQL, create stream processing task
	err := ssql.Execute(sql)
	if err != nil {
		panic(err)
	}

	// Add result processing callback
	ssql.AddSink(func(result []map[string]interface{}) {
		fmt.Printf("Aggregation result: %v\n", result)
	})

	// Simulate sending stream data
	go func() {
		ticker := time.NewTicker(1 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				// Generate random device data
				data := map[string]interface{}{
					"deviceId":    fmt.Sprintf("device%d", rand.Intn(3)+1),
					"temperature": 20.0 + rand.Float64()*10,
					"humidity":    50.0 + rand.Float64()*20,
				}
				ssql.Emit(data)
			}
		}
	}()

	// Run for 30 seconds
	time.Sleep(30 * time.Second)
}

Window Functions

StreamSQL supports multiple window types:

// Tumbling window - Independent window every 5 seconds
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')

// Sliding window - 30-second window size, slides every 10 seconds
SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')

// Counting window - One window per 100 records
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)

// Session window - Automatically closes session after 5-minute timeout
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')

Event Time vs Processing Time

StreamSQL supports two time semantics for window processing:

## Processing Time (Default)

Processing time uses the system clock when data arrives. Windows are triggered based on data arrival time:

// Processing time window (default)
SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5m')
// Windows are triggered every 5 minutes based on when data arrives

## Event Time

Event time uses timestamps embedded in the data itself. Windows are triggered based on event timestamps, allowing correct handling of out-of-order and late-arriving data:

// Event time window - Use 'order_time' field as event timestamp
SELECT COUNT(*) as order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP='order_time')

// Event time with integer timestamp (Unix milliseconds)
SELECT AVG(temperature) FROM stream
GROUP BY TumblingWindow('1m')
WITH (TIMESTAMP='event_time', TIMEUNIT='ms')

## Watermark and Late Data Handling

Event time windows use watermark mechanism to handle out-of-order and late data:

// Configure max out-of-orderness (tolerate 5 seconds of out-of-order data)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
	TIMESTAMP='order_time',
	MAXOUTOFORDERNESS='5s'  // Watermark = max(event_time) - 5s
)

// Configure allowed lateness (accept late data for 2 seconds after window closes)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
	TIMESTAMP='order_time',
	ALLOWEDLATENESS='2s'  // Window stays open for 2s after trigger
)

// Combine both configurations
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
	TIMESTAMP='order_time',
	MAXOUTOFORDERNESS='5s',  // Tolerate 5s out-of-order before trigger
	ALLOWEDLATENESS='2s'     // Accept 2s late data after trigger
)

// Configure idle source mechanism (advance watermark based on processing time when data source is idle)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (
	TIMESTAMP='order_time',
	IDLETIMEOUT='5s'  // If no data arrives within 5s, watermark advances based on processing time
)

Key concepts: • MaxOutOfOrderness: Affects watermark calculation, delays window trigger to tolerate out-of-order data • AllowedLateness: Keeps window open after trigger to accept late data and update results • IdleTimeout: When data source is idle (no data arrives within timeout), watermark advances based on processing time to ensure windows can close • Watermark: Indicates that no events with timestamp less than watermark are expected

Custom Functions

StreamSQL supports plugin-based custom functions with runtime dynamic registration:

// Register temperature conversion function
functions.RegisterCustomFunction(
	"fahrenheit_to_celsius",
	functions.TypeConversion,
	"Temperature conversion",
	"Fahrenheit to Celsius",
	1, 1,
	func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
		f, _ := functions.ConvertToFloat64(args[0])
		return (f - 32) * 5 / 9, nil
	},
)

// Use immediately in SQL
sql := `SELECT deviceId,
	AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
FROM stream GROUP BY deviceId, TumblingWindow('5s')`

Supported custom function types: • TypeMath - Mathematical calculation functions • TypeString - String processing functions • TypeConversion - Type conversion functions • TypeDateTime - Date and time functions • TypeAggregation - Aggregate functions • TypeAnalytical - Analytical functions • TypeWindow - Window functions • TypeCustom - General custom functions

Log Configuration

StreamSQL provides flexible log configuration options:

// Set log level
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))

// Output to file
logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))

// Disable logging (production environment)
ssql := streamsql.New(streamsql.WithDiscardLog())

RuleGo Integration

StreamSQL provides deep integration with the RuleGo rule engine through two dedicated components for stream data processing:

• streamTransform (x/streamTransform) - Stream transformer, handles non-aggregation SQL queries • streamAggregator (x/streamAggregator) - Stream aggregator, handles aggregation SQL queries

Basic integration example:

package main

import (
	"github.com/rulego/rulego"
	"github.com/rulego/rulego/api/types"
	// Register StreamSQL components
	_ "github.com/rulego/rulego-components/external/streamsql"
)

func main() {
	// Rule chain configuration
	ruleChainJson := `{
		"ruleChain": {"id": "rule01"},
		"metadata": {
			"nodes": [{
				"id": "transform1",
				"type": "x/streamTransform",
				"configuration": {
					"sql": "SELECT deviceId, temperature * 1.8 + 32 as temp_f FROM stream WHERE temperature > 20"
				}
			}, {
				"id": "aggregator1",
				"type": "x/streamAggregator",
				"configuration": {
					"sql": "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('5s')"
				}
			}],
			"connections": [{
				"fromId": "transform1",
				"toId": "aggregator1",
				"type": "Success"
			}]
		}
	}`

	// Create rule engine
	ruleEngine, _ := rulego.New("rule01", []byte(ruleChainJson))

	// Send data
	data := `{"deviceId":"sensor01","temperature":25.5}`
	msg := types.NewMsg(0, "TELEMETRY", types.JSON, types.NewMetadata(), data)
	ruleEngine.OnMsg(msg)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*Streamsql)

Option defines the configuration option type for StreamSQL

func WithBufferSizes

func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option

WithBufferSizes sets custom buffer sizes

func WithCustomPerformance

func WithCustomPerformance(config types.PerformanceConfig) Option

WithCustomPerformance uses custom performance configuration

func WithDiscardLog

func WithDiscardLog() Option

WithDiscardLog disables log output

func WithHighPerformance

func WithHighPerformance() Option

WithHighPerformance uses high-performance configuration Suitable for scenarios requiring maximum throughput

func WithLogLevel

func WithLogLevel(level logger.Level) Option

WithLogLevel sets the log level

func WithLowLatency

func WithLowLatency() Option

WithLowLatency uses low-latency configuration Suitable for real-time interactive applications, minimizing latency

func WithMonitoring

func WithMonitoring(updateInterval time.Duration, enableDetailedStats bool) Option

WithMonitoring enables detailed monitoring

func WithOverflowStrategy

func WithOverflowStrategy(strategy string, blockTimeout time.Duration) Option

WithOverflowStrategy sets the overflow strategy

func WithWorkerConfig

func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option

WithWorkerConfig sets the worker pool configuration

func WithZeroDataLoss

func WithZeroDataLoss() Option

WithZeroDataLoss uses zero data loss configuration Suitable for critical business data, ensuring no data loss

type Streamsql

type Streamsql struct {
	// contains filtered or unexported fields
}

Streamsql is the main interface for the StreamSQL streaming engine. It encapsulates core functionality including SQL parsing, stream processing, and window management.

Usage example:

ssql := streamsql.New()
err := ssql.Execute("SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')")
ssql.Emit(map[string]interface{}{"temperature": 25.5})

func New

func New(options ...Option) *Streamsql

New creates a new StreamSQL instance. Supports configuration through optional Option parameters.

Parameters:

  • options: Variable configuration options for customizing StreamSQL behavior

Returns:

  • *Streamsql: Newly created StreamSQL instance

Examples:

// Create default instance
ssql := streamsql.New()

// Create high performance instance
ssql := streamsql.New(streamsql.WithHighPerformance())

// Create zero data loss instance
ssql := streamsql.New(streamsql.WithZeroDataLoss())

func (*Streamsql) AddSink

func (s *Streamsql) AddSink(sink func([]map[string]interface{}))

AddSink directly adds result processing callback functions. Convenience wrapper for Stream().AddSink() for cleaner API calls.

Parameters:

  • sink: Result processing function, receives []map[string]interface{} type result data

Examples:

// Directly add result processing
ssql.AddSink(func(results []map[string]interface{}) {
    fmt.Printf("Processing results: %v\n", results)
})

// Add multiple processors
ssql.AddSink(func(results []map[string]interface{}) {
    // Save to database
    saveToDatabase(results)
})
ssql.AddSink(func(results []map[string]interface{}) {
    // Send to message queue
    sendToQueue(results)
})

func (*Streamsql) Emit

func (s *Streamsql) Emit(data map[string]interface{})

Emit adds data to the stream processing pipeline. Accepts type-safe map[string]interface{} format data.

Parameters:

  • data: Data to add, must be map[string]interface{} type

Examples:

// Add device data
ssql.Emit(map[string]interface{}{
    "deviceId": "sensor001",
    "temperature": 25.5,
    "humidity": 60.0,
    "timestamp": time.Now(),
})

// Add user behavior data
ssql.Emit(map[string]interface{}{
    "userId": "user123",
    "action": "click",
    "page": "/home",
})

func (*Streamsql) EmitSync

func (s *Streamsql) EmitSync(data map[string]interface{}) (map[string]interface{}, error)

EmitSync processes data synchronously, returning results immediately. Only applicable for non-aggregation queries, aggregation queries will return an error. Accepts type-safe map[string]interface{} format data.

Parameters:

  • data: Data to process, must be map[string]interface{} type

Returns:

  • map[string]interface{}: Processed result data, returns nil if filter conditions don't match
  • error: Processing error

Examples:

result, err := ssql.EmitSync(map[string]interface{}{
    "deviceId": "sensor001",
    "temperature": 25.5,
})
if err != nil {
    log.Printf("processing error: %v", err)
} else if result != nil {
    // Use processed result immediately (result is map[string]interface{} type)
    fmt.Printf("Processing result: %v\n", result)
}

func (*Streamsql) Execute

func (s *Streamsql) Execute(sql string) error

Execute parses and executes SQL queries, creating corresponding stream processing pipelines. This is the core method of StreamSQL, responsible for converting SQL into actual stream processing logic.

Supported SQL syntax:

  • SELECT clause: Select fields and aggregate functions
  • FROM clause: Specify data source (usually 'stream')
  • WHERE clause: Data filtering conditions
  • GROUP BY clause: Grouping fields and window functions
  • HAVING clause: Aggregate result filtering
  • LIMIT clause: Limit result count
  • DISTINCT: Result deduplication

Window functions:

  • TumblingWindow('5s'): Tumbling window
  • SlidingWindow('30s', '10s'): Sliding window
  • CountingWindow(100): Counting window
  • SessionWindow('5m'): Session window

Parameters:

  • sql: SQL query statement to execute

Returns:

  • error: Returns error if SQL parsing or execution fails

Examples:

// Basic aggregation query
err := ssql.Execute("SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')")

// Query with filtering conditions
err := ssql.Execute("SELECT * FROM stream WHERE temperature > 30")

// Complex window aggregation
err := ssql.Execute(`
    SELECT deviceId,
           AVG(temperature) as avg_temp,
           MAX(humidity) as max_humidity
    FROM stream
    WHERE deviceId != 'test'
    GROUP BY deviceId, SlidingWindow('1m', '30s')
    HAVING avg_temp > 25
    LIMIT 100
`)

func (*Streamsql) GetDetailedStats

func (s *Streamsql) GetDetailedStats() map[string]interface{}

GetDetailedStats returns detailed performance statistics

func (*Streamsql) GetStats

func (s *Streamsql) GetStats() map[string]int64

GetStats returns stream processing statistics

func (*Streamsql) IsAggregationQuery

func (s *Streamsql) IsAggregationQuery() bool

IsAggregationQuery checks if the current query is an aggregation query

func (*Streamsql) PrintTable added in v0.10.1

func (s *Streamsql) PrintTable()

PrintTable prints results to console in table format, similar to database output. Displays column names first, then data rows.

Supported data formats:

  • []map[string]interface{}: Multiple rows
  • map[string]interface{}: Single row
  • Other types: Direct print

Example:

// Print results in table format
ssql.PrintTable()

// Output format:
// +--------+----------+
// | device | max_temp |
// +--------+----------+
// | aa     | 30.0     |
// | bb     | 22.0     |
// +--------+----------+

func (*Streamsql) Stop

func (s *Streamsql) Stop()

Stop stops the stream processor and releases related resources. After calling this method, the stream processor will stop receiving and processing new data.

Recommended to call this method for cleanup before application exit:

defer ssql.Stop()

Note: StreamSQL instance cannot be restarted after stopping, create a new instance.

func (*Streamsql) Stream

func (s *Streamsql) Stream() *stream.Stream

Stream returns the underlying stream processor instance. Provides access to lower-level stream processing functionality.

Returns:

  • *stream.Stream: Underlying stream processor instance, returns nil if SQL not executed

Common use cases:

  • Add result processing callbacks
  • Get result channel
  • Manually control stream processing lifecycle

Examples:

// Add result processing callback
ssql.Stream().AddSink(func(results []map[string]interface{}) {
    fmt.Printf("Processing results: %v\n", results)
})

// Get result channel
resultChan := ssql.Stream().GetResultsChan()
go func() {
    for result := range resultChan {
        // Process result
    }
}()

func (*Streamsql) ToChannel

func (s *Streamsql) ToChannel() <-chan []map[string]interface{}

ToChannel converts query results to channel output Returns a read-only channel for receiving query results

Notes:

  • Consumer must continuously read from channel to prevent stream processing blocking
  • Channel transmits batch result data

Directories

Path Synopsis
Package aggregator provides data aggregation functionality for StreamSQL.
Package aggregator provides data aggregation functionality for StreamSQL.
Package condition provides condition evaluation functionality for StreamSQL.
Package condition provides condition evaluation functionality for StreamSQL.
examples
non-aggregation command
Package expr provides expression parsing and evaluation capabilities for StreamSQL.
Package expr provides expression parsing and evaluation capabilities for StreamSQL.
Package functions provides a comprehensive function registry and execution framework for StreamSQL.
Package functions provides a comprehensive function registry and execution framework for StreamSQL.
Package logger provides logging functionality for StreamSQL.
Package logger provides logging functionality for StreamSQL.
Package rsql provides SQL parsing and analysis capabilities for StreamSQL.
Package rsql provides SQL parsing and analysis capabilities for StreamSQL.
Package stream provides the core stream processing engine for StreamSQL.
Package stream provides the core stream processing engine for StreamSQL.
Package types provides core type definitions and data structures for StreamSQL.
Package types provides core type definitions and data structures for StreamSQL.
utils
Package window provides windowing functionality for StreamSQL stream processing.
Package window provides windowing functionality for StreamSQL stream processing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL