Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughThis PR introduces comprehensive Redis executor support to the dagu system, adding DAG-level and step-level Redis configuration with automatic inheritance, a full executor implementation supporting command, pipeline, and Lua script modes, connection pooling for worker deployments, distributed lock capabilities, and UI components to display executor-based commands. Changes
Sequence DiagramsequenceDiagram
participant DAG as DAG Config
participant StepInf as Step Inference
participant ExecFactory as Executor Factory
participant PoolMgr as Global Pool Manager
participant Client as Redis Client
participant Redis as Redis Server
DAG->>StepInf: Pass DAG with Redis config
StepInf->>StepInf: Infer step type as 'redis'
StepInf->>StepInf: Merge DAG Redis defaults into step config
ExecFactory->>ExecFactory: Create RedisExecutor with step config
ExecFactory->>PoolMgr: Request client (GetOrCreateClient)
alt Client cached
PoolMgr->>PoolMgr: Increment ref count
PoolMgr-->>ExecFactory: Return existing client
else Client not cached
PoolMgr->>Client: Create new Redis client
Client->>Redis: Ping (validate connection)
Redis-->>Client: Pong
PoolMgr->>PoolMgr: Store with ref count = 1
PoolMgr-->>ExecFactory: Return new client
end
ExecFactory->>ExecFactory: Execute (command/pipeline/script)
ExecFactory->>Client: EVAL/EXEC/GET/etc. (via command type)
Client->>Redis: Send command
Redis-->>Client: Return result
ExecFactory->>ExecFactory: Format result (JSON/CSV/raw)
ExecFactory->>ExecFactory: Emit ExecutionMetrics
ExecFactory->>PoolMgr: Release client (ReleaseClient)
PoolMgr->>PoolMgr: Decrement ref count (keep alive)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/core/spec/step.go (1)
1140-1154: Redis inference is too broad and can hijack normal steps.With DAG-level Redis defaults set, any step without explicit
typebecomesredis, including shell-command steps. That can break mixed DAGs. Consider inferring Redis only when the step has executor config present (e.g.,config:) or other Redis-specific indicators.🔧 Tighten Redis inference to config-bearing steps
@@ - } else if ctx.dag.Redis != nil { - result.ExecutorConfig.Type = "redis" + } else if ctx.dag.Redis != nil && len(result.ExecutorConfig.Config) > 0 { + result.ExecutorConfig.Type = "redis" } }
🤖 Fix all issues with AI agents
In `@go.mod`:
- Around line 84-85: The go-redis dependency must be bumped to a secure release:
update the module github.com/redis/go-redis/v9 to v9.7.3 or later in go.mod
(reference: module path "github.com/redis/go-redis/v9") and then run module
tooling to apply the change (e.g. go get github.com/redis/go-redis/[email protected] &&
go mod tidy) so go.sum is refreshed and the vulnerable version is removed.
In `@internal/runtime/builtin/redis/global_pool.go`:
- Around line 160-178: The hashConfig function currently builds an identifier
from connection fields but omits credentials and TLS/max-retry settings causing
different configs to collide; update hashConfig to incorporate Username and
Password, MaxRetries, and a deterministic fingerprint of TLS settings (e.g.
presence of TLSConfig, TLS InsecureSkipVerify and any CA/cert identifier) into
the identifier before hashing, then compute and return a short cryptographic
hash (sha256/sha1 truncated) of that full identifier to avoid exposing raw
secrets while ensuring unique keys per auth/TLS configuration; reference the
hashConfig function and the cfg fields Username, Password, MaxRetries and
TLS-related fields when making the change.
In `@internal/runtime/builtin/redis/lock.go`:
- Around line 43-49: The Acquire method of LockManager currently allows an empty
cfg.Lock which yields a shared key ("dagu:lock:") and causes cross-DAG
collisions; add a guard at the start of LockManager.Acquire that validates
m.cfg.Lock (e.g., check for zero-length or only-whitespace via
strings.TrimSpace) and return a clear error (and nil release func) when it's
empty, so callers must supply a non-empty lock name; update any places that
build the Redis key (e.g., where "dagu:lock:"+m.cfg.Lock is used) to assume a
validated cfg.Lock.
In `@internal/runtime/builtin/redis/pipeline.go`:
- Around line 121-223: The queueCommand method currently ignores the caller
context and uses context.Background(), which prevents cancellation/deadline
propagation; change the first parameter from _ to ctx in func (e
*PipelineExecutor) queueCommand(ctx context.Context, ...) and replace every
context.Background() call inside queueCommand (all Redis calls like pipe.Get,
Set, HSet, ZAdd, Publish, etc.) with ctx so pipeline operations honor the
caller's context and can be cancelled or timed out correctly.
In `@internal/runtime/builtin/redis/race_test.go`:
- Around line 28-57: The test spawns goroutines that call require.NoError (which
calls FailNow) inside each goroutine; replace these with non-fatal error
collection by creating an errors channel (e.g., errCh) before the loop, have
each goroutine send any error from executor.NewExecutor(ctx, step) or
exec.Run(ctx) into errCh instead of calling require.NoError, close errCh after
wg.Wait(), and then range over errCh in the main goroutine to assert failures
(using require.NoError or require.Len) so all errors are reported from the main
test goroutine; reference symbols: the goroutine body creating core.Step,
executor.NewExecutor, exec.SetStdout, exec.Run, and wg.Wait.
- Around line 77-103: Move the require.NoError call out of the goroutine: call
redisexec.ParseConfig(...) and assert NoError before spawning the goroutine,
then pass the prebuilt cfg into the goroutine; inside the goroutine capture
runtime errors from GetOrCreateClient, client.Ping(...).Result(), and
ReleaseClient by sending them to an error channel (or storing them in a slice
protected by a mutex), wait for wg.Wait(), then range over the collected errors
and call require.NoError for each to fail the test from the main goroutine;
reference functions: redisexec.ParseConfig, pm.GetOrCreateClient, client.Ping,
pm.ReleaseClient and the wg used for concurrency.
In `@internal/runtime/builtin/redis/script.go`:
- Around line 26-78: The Execute method currently always calls getScript which
fails for SHA-only configs; modify Execute (and related logic around ScriptSHA,
calculateSHA1, client.EvalSha, and client.Eval) so it does not require script
content when a ScriptSHA is provided: only call getScript (and calculateSHA1)
when Script or ScriptFile is present or when you need to fall back to EVAL;
attempt EvalSha using e.cfg.ScriptSHA as-is if provided, and if EvalSha returns
NOSCRIPT and no script content is available, return a clear error saying the
script body is required to fallback to EVAL; if EvalSha returns NOSCRIPT and
getScript successfully provides content, call client.Eval with that script as
the fallback.
🧹 Nitpick comments (9)
ui/src/features/dags/components/dag-details/DAGStepTableRow.tsx (1)
13-60: Consider centralizinggetExecutorCommandto avoid drift.
The helper is duplicated inNodeStatusTableRow.tsx; a shared util would keep behavior aligned across views.internal/runtime/builtin/redis/command.go (3)
35-43: Consider handling KeepTTL option for SET command.The
Configstruct has aKeepTTLfield, but the SET command implementation doesn't use it. WhenKeepTTLis true, the existing TTL should be preserved.♻️ Suggested enhancement
case "SET": ttl := time.Duration(h.cfg.TTL) * time.Second + if h.cfg.KeepTTL { + return h.client.SetArgs(ctx, h.cfg.Key, h.cfg.Value, redis.SetArgs{ + KeepTTL: true, + }).Result() + } if h.cfg.NX { return h.client.SetNX(ctx, h.cfg.Key, h.cfg.Value, ttl).Result() }
563-588: Consider adding a safeguard for SCAN on large datasets.The
executeScanmethod collects all matching keys into memory without any limit. On databases with millions of keys, this could cause excessive memory usage and long execution times.♻️ Suggested safeguard
func (h *CommandHandler) executeScan(ctx context.Context) ([]string, error) { var allKeys []string var cursor uint64 count := int64(h.cfg.Count) if count == 0 { count = 100 } match := h.cfg.Match if match == "" { match = "*" } + + const maxKeys = 100000 // Configurable limit for { + if len(allKeys) >= maxKeys { + return allKeys, fmt.Errorf("scan result limit reached (%d keys)", maxKeys) + } keys, nextCursor, err := h.client.Scan(ctx, cursor, match, count).Result() if err != nil { return nil, err } allKeys = append(allKeys, keys...) cursor = nextCursor if cursor == 0 { break } } return allKeys, nil }
602-620: Consider handling unsigned integer types intoInt64.The
toInt64function doesn't handleuint,uint32, oruint64types. Values unmarshaled from JSON or received from other sources could be unsigned, causing conversion errors.♻️ Suggested enhancement
func toInt64(v any) (int64, error) { switch val := v.(type) { case int: return int64(val), nil case int32: return int64(val), nil case int64: return val, nil + case uint: + return int64(val), nil + case uint32: + return int64(val), nil + case uint64: + return int64(val), nil case float64: return int64(val), nil case float32: return int64(val), nil case string: return strconv.ParseInt(val, 10, 64) default: return 0, fmt.Errorf("cannot convert %T to int64", v) } }internal/runtime/builtin/redis/redis_test.go (2)
274-290: Consider usingt.Cleanup()for more reliable test cleanup.The cleanup step ignores errors, which is fine, but using
t.Cleanup()would ensure cleanup runs even if the test fails or panics earlier.♻️ Suggested improvement
+ // Register cleanup first + t.Cleanup(func() { + delStep := core.Step{ + Name: "test-del", + ExecutorConfig: core.ExecutorConfig{ + Type: "redis", + Config: map[string]any{ + "host": host, + "command": "DEL", + "key": "test:integration:key", + }, + }, + } + delExec, _ := newRedisExecutor(ctx, delStep) + _ = delExec.Run(ctx) + }) + // SET a value setStep := core.Step{
420-427: Consider adding test cases forjsonlandcsvoutput formats.The test only covers
jsonandrawformats, but the implementation supportsjsonlandcsvas well. Adding these would improve coverage of the output formatting logic.internal/runtime/builtin/redis/config.go (1)
166-227: Validation is comprehensive but could validate TLS certificate pairs.The validation covers all critical fields. However, if
TLSCertis provided,TLSKeyshould also be required (and vice versa) to prevent misconfiguration.♻️ Suggested enhancement
+ // Validate TLS certificate pair + if (c.TLSCert != "" && c.TLSKey == "") || (c.TLSCert == "" && c.TLSKey != "") { + return fmt.Errorf("both tlsCert and tlsKey must be provided together") + } + // Must have a command, script, or pipeline if c.Command == "" && c.Script == "" && c.ScriptFile == "" && len(c.Pipeline) == 0 {internal/runtime/builtin/redis/result.go (1)
273-304: Consider removing unused specialized write methods or consolidating withwriteCSV.
WriteZSet,WriteHash, andWriteListduplicate logic already present inwriteCSV. If these methods aren't used externally, they could be removed to reduce maintenance burden.If these are intended for external use, consider having
writeCSVdelegate to these methods to avoid duplication.internal/runtime/builtin/redis/client.go (1)
145-149: Explicitly set a minimum TLS version.
LeavingMinVersionunset makes the negotiated protocol depend on defaults. Consider pinning TLS 1.2+ (or TLS 1.3 if your servers support it) to enforce a baseline.🔒 Suggested hardening
- tlsConfig := &tls.Config{ - InsecureSkipVerify: cfg.TLSSkipVerify, //nolint:gosec // User-controlled option - } + tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS12, + InsecureSkipVerify: cfg.TLSSkipVerify, //nolint:gosec // User-controlled option + }
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1588 +/- ##
==========================================
- Coverage 64.70% 62.60% -2.10%
==========================================
Files 259 270 +11
Lines 28899 30255 +1356
==========================================
+ Hits 18698 18942 +244
- Misses 8523 9624 +1101
- Partials 1678 1689 +11
... and 6 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.