Skip to content

feat: add Redis integration support#1588

Merged
yottahmd merged 15 commits intomainfrom
redis-step
Jan 18, 2026
Merged

feat: add Redis integration support#1588
yottahmd merged 15 commits intomainfrom
redis-step

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Jan 18, 2026

Summary by CodeRabbit

  • New Features
    • Added Redis executor supporting command execution, pipelines, and Lua scripts.
    • Introduced DAG-level Redis configuration that steps inherit by default.
    • Added distributed lock functionality for Redis operations.
    • Enhanced UI to display executor-derived commands in DAG details.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jan 18, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

📝 Walkthrough

Walkthrough

This PR introduces comprehensive Redis executor support to the dagu system, adding DAG-level and step-level Redis configuration with automatic inheritance, a full executor implementation supporting command, pipeline, and Lua script modes, connection pooling for worker deployments, distributed lock capabilities, and UI components to display executor-based commands.

Changes

Cohort / File(s) Summary
Dependency Management
go.mod
Added indirect dependencies for go-rendezvous and redis/go-redis/v9 to support Redis client connectivity.
DAG Core Configuration
internal/core/dag.go, internal/core/spec/dag.go
Added RedisConfig type with connection parameters (URL, host, port, credentials, TLS, sentinel/cluster modes) and integrated into DAG struct. Introduced buildRedis function to transform DAG-level Redis config during build phase.
Step Executor Inference & Merging
internal/core/spec/step.go
Implemented Redis executor type inference when DAG-level Redis config exists; added mergeRedisConfig to inherit DAG defaults into step configs with proper zero-value detection for strings, ints, bools, and slices.
DAG Specification Tests
internal/core/spec/builder_test.go, internal/core/spec/dag_test.go
Comprehensive test coverage for Redis inheritance behavior, step-level config overrides, type inference, and config-to-core transformation via buildRedis.
Redis Configuration System
internal/runtime/builtin/redis/config.go, internal/runtime/builtin/redis/config_schema.go, internal/runtime/builtin/redis/config_test.go
Full configuration module with Config struct supporting all Redis connection modes (standalone, sentinel, cluster), command execution options (command, pipeline, script), validation logic, and JSON schema registration. Helper functions for context-based pool manager access.
Redis Client Management
internal/runtime/builtin/redis/client.go, internal/runtime/builtin/redis/connection.go, internal/runtime/builtin/redis/global_pool.go
Client factory supporting URL-based and mode-specific instantiation with TLS support; retry logic with exponential backoff; global pool manager with reference counting, lazy client creation, and atomic lifecycle management for worker deployments.
Redis Command Execution
internal/runtime/builtin/redis/command.go, internal/runtime/builtin/redis/pipeline.go, internal/runtime/builtin/redis/script.go
CommandHandler dispatches 40+ Redis commands across data structures (strings, hashes, lists, sets, sorted sets, pub/sub, streams); PipelineExecutor orchestrates transactional pipelines with WATCH support; ScriptExecutor handles Lua script execution with EVALSHA/EVAL fallback and built-in script library (rate limiting, atomic increment, CAS, locking).
Result Formatting & Locking
internal/runtime/builtin/redis/result.go, internal/runtime/builtin/redis/lock.go
ResultWriter formats Redis results to JSON/JSONL/raw/CSV with nil-value handling; LockManager provides distributed locking with acquire/release, TTL management, and Lua-based safe unlock semantics.
Executor Implementation & Registration
internal/runtime/builtin/redis/redis.go, internal/runtime/builtin/redis/builtin.go
Main RedisExecutor implementing the Executor interface with per-step or pool-backed client modes, command/pipeline/script dispatch, metric collection, timeout handling, and lock acquisition; registered as "redis" builtin during initialization.
Test Coverage
internal/runtime/builtin/redis/redis_test.go, internal/runtime/builtin/redis/race_test.go
Extensive integration tests covering Redis commands, output formats, timeouts, Kill/Close behavior, and global pool lifecycle; race-condition tests for concurrent executor/pool access.
UI Command Display
ui/src/features/dags/components/dag-details/DAGStepTableRow.tsx, ui/src/features/dags/components/dag-details/NodeStatusTableRow.tsx
Helper function getExecutorCommand to extract displayable commands from executor configs when step.commands is absent; fallback rendering with Code icon and truncation for executor-based step display.

Sequence Diagram

sequenceDiagram
    participant DAG as DAG Config
    participant StepInf as Step Inference
    participant ExecFactory as Executor Factory
    participant PoolMgr as Global Pool Manager
    participant Client as Redis Client
    participant Redis as Redis Server

    DAG->>StepInf: Pass DAG with Redis config
    StepInf->>StepInf: Infer step type as 'redis'
    StepInf->>StepInf: Merge DAG Redis defaults into step config
    
    ExecFactory->>ExecFactory: Create RedisExecutor with step config
    ExecFactory->>PoolMgr: Request client (GetOrCreateClient)
    
    alt Client cached
        PoolMgr->>PoolMgr: Increment ref count
        PoolMgr-->>ExecFactory: Return existing client
    else Client not cached
        PoolMgr->>Client: Create new Redis client
        Client->>Redis: Ping (validate connection)
        Redis-->>Client: Pong
        PoolMgr->>PoolMgr: Store with ref count = 1
        PoolMgr-->>ExecFactory: Return new client
    end
    
    ExecFactory->>ExecFactory: Execute (command/pipeline/script)
    ExecFactory->>Client: EVAL/EXEC/GET/etc. (via command type)
    Client->>Redis: Send command
    Redis-->>Client: Return result
    
    ExecFactory->>ExecFactory: Format result (JSON/CSV/raw)
    ExecFactory->>ExecFactory: Emit ExecutionMetrics
    
    ExecFactory->>PoolMgr: Release client (ReleaseClient)
    PoolMgr->>PoolMgr: Decrement ref count (keep alive)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 55.91% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding Redis integration support to the codebase, which aligns with the comprehensive Redis implementation across multiple files.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@yottahmd yottahmd changed the title feat: redis step feat: support Redis integration Jan 18, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/core/spec/step.go (1)

1140-1154: Redis inference is too broad and can hijack normal steps.

With DAG-level Redis defaults set, any step without explicit type becomes redis, including shell-command steps. That can break mixed DAGs. Consider inferring Redis only when the step has executor config present (e.g., config:) or other Redis-specific indicators.

🔧 Tighten Redis inference to config-bearing steps
@@
-		} else if ctx.dag.Redis != nil {
-			result.ExecutorConfig.Type = "redis"
+		} else if ctx.dag.Redis != nil && len(result.ExecutorConfig.Config) > 0 {
+			result.ExecutorConfig.Type = "redis"
 		}
 	}
🤖 Fix all issues with AI agents
In `@go.mod`:
- Around line 84-85: The go-redis dependency must be bumped to a secure release:
update the module github.com/redis/go-redis/v9 to v9.7.3 or later in go.mod
(reference: module path "github.com/redis/go-redis/v9") and then run module
tooling to apply the change (e.g. go get github.com/redis/go-redis/[email protected] &&
go mod tidy) so go.sum is refreshed and the vulnerable version is removed.

In `@internal/runtime/builtin/redis/global_pool.go`:
- Around line 160-178: The hashConfig function currently builds an identifier
from connection fields but omits credentials and TLS/max-retry settings causing
different configs to collide; update hashConfig to incorporate Username and
Password, MaxRetries, and a deterministic fingerprint of TLS settings (e.g.
presence of TLSConfig, TLS InsecureSkipVerify and any CA/cert identifier) into
the identifier before hashing, then compute and return a short cryptographic
hash (sha256/sha1 truncated) of that full identifier to avoid exposing raw
secrets while ensuring unique keys per auth/TLS configuration; reference the
hashConfig function and the cfg fields Username, Password, MaxRetries and
TLS-related fields when making the change.

In `@internal/runtime/builtin/redis/lock.go`:
- Around line 43-49: The Acquire method of LockManager currently allows an empty
cfg.Lock which yields a shared key ("dagu:lock:") and causes cross-DAG
collisions; add a guard at the start of LockManager.Acquire that validates
m.cfg.Lock (e.g., check for zero-length or only-whitespace via
strings.TrimSpace) and return a clear error (and nil release func) when it's
empty, so callers must supply a non-empty lock name; update any places that
build the Redis key (e.g., where "dagu:lock:"+m.cfg.Lock is used) to assume a
validated cfg.Lock.

In `@internal/runtime/builtin/redis/pipeline.go`:
- Around line 121-223: The queueCommand method currently ignores the caller
context and uses context.Background(), which prevents cancellation/deadline
propagation; change the first parameter from _ to ctx in func (e
*PipelineExecutor) queueCommand(ctx context.Context, ...) and replace every
context.Background() call inside queueCommand (all Redis calls like pipe.Get,
Set, HSet, ZAdd, Publish, etc.) with ctx so pipeline operations honor the
caller's context and can be cancelled or timed out correctly.

In `@internal/runtime/builtin/redis/race_test.go`:
- Around line 28-57: The test spawns goroutines that call require.NoError (which
calls FailNow) inside each goroutine; replace these with non-fatal error
collection by creating an errors channel (e.g., errCh) before the loop, have
each goroutine send any error from executor.NewExecutor(ctx, step) or
exec.Run(ctx) into errCh instead of calling require.NoError, close errCh after
wg.Wait(), and then range over errCh in the main goroutine to assert failures
(using require.NoError or require.Len) so all errors are reported from the main
test goroutine; reference symbols: the goroutine body creating core.Step,
executor.NewExecutor, exec.SetStdout, exec.Run, and wg.Wait.
- Around line 77-103: Move the require.NoError call out of the goroutine: call
redisexec.ParseConfig(...) and assert NoError before spawning the goroutine,
then pass the prebuilt cfg into the goroutine; inside the goroutine capture
runtime errors from GetOrCreateClient, client.Ping(...).Result(), and
ReleaseClient by sending them to an error channel (or storing them in a slice
protected by a mutex), wait for wg.Wait(), then range over the collected errors
and call require.NoError for each to fail the test from the main goroutine;
reference functions: redisexec.ParseConfig, pm.GetOrCreateClient, client.Ping,
pm.ReleaseClient and the wg used for concurrency.

In `@internal/runtime/builtin/redis/script.go`:
- Around line 26-78: The Execute method currently always calls getScript which
fails for SHA-only configs; modify Execute (and related logic around ScriptSHA,
calculateSHA1, client.EvalSha, and client.Eval) so it does not require script
content when a ScriptSHA is provided: only call getScript (and calculateSHA1)
when Script or ScriptFile is present or when you need to fall back to EVAL;
attempt EvalSha using e.cfg.ScriptSHA as-is if provided, and if EvalSha returns
NOSCRIPT and no script content is available, return a clear error saying the
script body is required to fallback to EVAL; if EvalSha returns NOSCRIPT and
getScript successfully provides content, call client.Eval with that script as
the fallback.
🧹 Nitpick comments (9)
ui/src/features/dags/components/dag-details/DAGStepTableRow.tsx (1)

13-60: Consider centralizing getExecutorCommand to avoid drift.
The helper is duplicated in NodeStatusTableRow.tsx; a shared util would keep behavior aligned across views.

internal/runtime/builtin/redis/command.go (3)

35-43: Consider handling KeepTTL option for SET command.

The Config struct has a KeepTTL field, but the SET command implementation doesn't use it. When KeepTTL is true, the existing TTL should be preserved.

♻️ Suggested enhancement
 	case "SET":
 		ttl := time.Duration(h.cfg.TTL) * time.Second
+		if h.cfg.KeepTTL {
+			return h.client.SetArgs(ctx, h.cfg.Key, h.cfg.Value, redis.SetArgs{
+				KeepTTL: true,
+			}).Result()
+		}
 		if h.cfg.NX {
 			return h.client.SetNX(ctx, h.cfg.Key, h.cfg.Value, ttl).Result()
 		}

563-588: Consider adding a safeguard for SCAN on large datasets.

The executeScan method collects all matching keys into memory without any limit. On databases with millions of keys, this could cause excessive memory usage and long execution times.

♻️ Suggested safeguard
 func (h *CommandHandler) executeScan(ctx context.Context) ([]string, error) {
 	var allKeys []string
 	var cursor uint64
 	count := int64(h.cfg.Count)
 	if count == 0 {
 		count = 100
 	}
 	match := h.cfg.Match
 	if match == "" {
 		match = "*"
 	}
+	
+	const maxKeys = 100000 // Configurable limit
 
 	for {
+		if len(allKeys) >= maxKeys {
+			return allKeys, fmt.Errorf("scan result limit reached (%d keys)", maxKeys)
+		}
 		keys, nextCursor, err := h.client.Scan(ctx, cursor, match, count).Result()
 		if err != nil {
 			return nil, err
 		}
 		allKeys = append(allKeys, keys...)
 		cursor = nextCursor
 		if cursor == 0 {
 			break
 		}
 	}
 	return allKeys, nil
 }

602-620: Consider handling unsigned integer types in toInt64.

The toInt64 function doesn't handle uint, uint32, or uint64 types. Values unmarshaled from JSON or received from other sources could be unsigned, causing conversion errors.

♻️ Suggested enhancement
 func toInt64(v any) (int64, error) {
 	switch val := v.(type) {
 	case int:
 		return int64(val), nil
 	case int32:
 		return int64(val), nil
 	case int64:
 		return val, nil
+	case uint:
+		return int64(val), nil
+	case uint32:
+		return int64(val), nil
+	case uint64:
+		return int64(val), nil
 	case float64:
 		return int64(val), nil
 	case float32:
 		return int64(val), nil
 	case string:
 		return strconv.ParseInt(val, 10, 64)
 	default:
 		return 0, fmt.Errorf("cannot convert %T to int64", v)
 	}
 }
internal/runtime/builtin/redis/redis_test.go (2)

274-290: Consider using t.Cleanup() for more reliable test cleanup.

The cleanup step ignores errors, which is fine, but using t.Cleanup() would ensure cleanup runs even if the test fails or panics earlier.

♻️ Suggested improvement
+	// Register cleanup first
+	t.Cleanup(func() {
+		delStep := core.Step{
+			Name: "test-del",
+			ExecutorConfig: core.ExecutorConfig{
+				Type: "redis",
+				Config: map[string]any{
+					"host":    host,
+					"command": "DEL",
+					"key":     "test:integration:key",
+				},
+			},
+		}
+		delExec, _ := newRedisExecutor(ctx, delStep)
+		_ = delExec.Run(ctx)
+	})
+
 	// SET a value
 	setStep := core.Step{

420-427: Consider adding test cases for jsonl and csv output formats.

The test only covers json and raw formats, but the implementation supports jsonl and csv as well. Adding these would improve coverage of the output formatting logic.

internal/runtime/builtin/redis/config.go (1)

166-227: Validation is comprehensive but could validate TLS certificate pairs.

The validation covers all critical fields. However, if TLSCert is provided, TLSKey should also be required (and vice versa) to prevent misconfiguration.

♻️ Suggested enhancement
+	// Validate TLS certificate pair
+	if (c.TLSCert != "" && c.TLSKey == "") || (c.TLSCert == "" && c.TLSKey != "") {
+		return fmt.Errorf("both tlsCert and tlsKey must be provided together")
+	}
+
 	// Must have a command, script, or pipeline
 	if c.Command == "" && c.Script == "" && c.ScriptFile == "" && len(c.Pipeline) == 0 {
internal/runtime/builtin/redis/result.go (1)

273-304: Consider removing unused specialized write methods or consolidating with writeCSV.

WriteZSet, WriteHash, and WriteList duplicate logic already present in writeCSV. If these methods aren't used externally, they could be removed to reduce maintenance burden.

If these are intended for external use, consider having writeCSV delegate to these methods to avoid duplication.

internal/runtime/builtin/redis/client.go (1)

145-149: Explicitly set a minimum TLS version.
Leaving MinVersion unset makes the negotiated protocol depend on defaults. Consider pinning TLS 1.2+ (or TLS 1.3 if your servers support it) to enforce a baseline.

🔒 Suggested hardening
-	tlsConfig := &tls.Config{
-		InsecureSkipVerify: cfg.TLSSkipVerify, //nolint:gosec // User-controlled option
-	}
+	tlsConfig := &tls.Config{
+		MinVersion:         tls.VersionTLS12,
+		InsecureSkipVerify: cfg.TLSSkipVerify, //nolint:gosec // User-controlled option
+	}

Comment thread go.mod
Comment thread internal/runtime/builtin/redis/global_pool.go
Comment thread internal/runtime/builtin/redis/lock.go
Comment thread internal/runtime/builtin/redis/pipeline.go
Comment thread internal/runtime/builtin/redis/race_test.go
Comment thread internal/runtime/builtin/redis/race_test.go
Comment thread internal/runtime/builtin/redis/script.go
@yottahmd yottahmd changed the title feat: support Redis integration feat: add Redis integration support Jan 18, 2026
@yottahmd yottahmd merged commit fd2fad6 into main Jan 18, 2026
6 checks passed
@yottahmd yottahmd deleted the redis-step branch January 18, 2026 15:34
@codecov
Copy link
Copy Markdown

codecov Bot commented Jan 18, 2026

Codecov Report

❌ Patch coverage is 17.18289% with 1123 lines in your changes missing coverage. Please review.
✅ Project coverage is 62.60%. Comparing base (815fff9) to head (21f2c32).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
internal/runtime/builtin/redis/command.go 0.00% 408 Missing ⚠️
internal/runtime/builtin/redis/pipeline.go 0.00% 158 Missing ⚠️
internal/runtime/builtin/redis/redis.go 7.97% 127 Missing ⚠️
internal/runtime/builtin/redis/result.go 39.57% 108 Missing and 5 partials ⚠️
internal/runtime/builtin/redis/client.go 0.00% 94 Missing ⚠️
internal/runtime/builtin/redis/lock.go 0.00% 67 Missing ⚠️
internal/runtime/builtin/redis/script.go 11.26% 63 Missing ⚠️
internal/runtime/builtin/redis/global_pool.go 35.36% 47 Missing and 6 partials ⚠️
internal/runtime/builtin/redis/connection.go 0.00% 30 Missing ⚠️
internal/runtime/builtin/redis/config.go 90.47% 3 Missing and 3 partials ⚠️
... and 1 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1588      +/-   ##
==========================================
- Coverage   64.70%   62.60%   -2.10%     
==========================================
  Files         259      270      +11     
  Lines       28899    30255    +1356     
==========================================
+ Hits        18698    18942     +244     
- Misses       8523     9624    +1101     
- Partials     1678     1689      +11     
Files with missing lines Coverage Δ
internal/core/dag.go 69.71% <ø> (ø)
internal/core/spec/dag.go 87.79% <100.00%> (+0.29%) ⬆️
internal/runtime/builtin/redis/config_schema.go 100.00% <100.00%> (ø)
internal/core/spec/step.go 85.48% <88.57%> (+0.14%) ⬆️
internal/runtime/builtin/redis/config.go 90.47% <90.47%> (ø)
internal/runtime/builtin/redis/connection.go 0.00% <0.00%> (ø)
internal/runtime/builtin/redis/global_pool.go 35.36% <35.36%> (ø)
internal/runtime/builtin/redis/script.go 11.26% <11.26%> (ø)
internal/runtime/builtin/redis/lock.go 0.00% <0.00%> (ø)
internal/runtime/builtin/redis/client.go 0.00% <0.00%> (ø)
... and 4 more

... and 6 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 815fff9...21f2c32. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant