Skip to content

refactor: isolate manifest decoding and standardize local transport#2035

Merged
yottahmd merged 4 commits intomainfrom
refactor/manifest-decoder-sock-writer
Apr 25, 2026
Merged

refactor: isolate manifest decoding and standardize local transport#2035
yottahmd merged 4 commits intomainfrom
refactor/manifest-decoder-sock-writer

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 25, 2026

Summary

  • isolate manifest decoding into a dedicated decoder component and extend loader characterization coverage
  • standardize unix socket request handling on the net/http client and server stack
  • streamline DAG run status persistence with buffered JSON encoding and regression tests

Testing

  • go test ./internal/runtime ./internal/core/spec ./internal/cmn/sock ./internal/persis/filedagrun -count=1

Summary by CodeRabbit

  • Refactor

    • Socket client/server now use standard HTTP client/server internals for robust request handling, timeouts, idle handling, and graceful shutdown; server logs panics with stack traces.
    • Spec loader centralized around a manifest decoder for clearer decoding and validation.
    • JSON writer now uses a persistent encoder and buffered writer for efficient, consistent serialization.
  • Bug Fixes

    • Improved timeout detection/propagation and ensured handler panics result in 500 responses.
    • Guaranteed newline-delimited JSON output formatting.
  • Tests

    • Added/refactored tests for handler recovery, response headers, server timeouts, manifest decoding, client init, and writer format.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 25, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6987b12d-d583-4cc5-b1dc-8f8337f61259

📥 Commits

Reviewing files that changed from the base of the PR and between bbb6da0 and 75a311d.

📒 Files selected for processing (1)
  • internal/core/spec/loader.go

📝 Walkthrough

Walkthrough

Replaces manual UNIX-socket request/response handling with net/http-based client and server; centralizes YAML manifest decoding into an unexported manifestDecoder; and switches file dag-run persistence to a persistent json.Encoder with buffered I/O and unified flush+sync.

Changes

Cohort / File(s) Summary
Socket communication
internal/cmn/sock/client.go, internal/cmn/sock/server.go, internal/cmn/sock/server_internal_test.go, internal/cmn/sock/server_test.go, internal/cmn/sock/client_internal_test.go
Client now holds a reusable *http.Client with a custom Transport dialing the unix socket; request path normalization and timeout wrapping updated. Server replaces manual accept/read loop with http.Server, adds httpHandler panic recovery (logs stack), sets ReadHeaderTimeout/IdleTimeout/BaseContext, and uses graceful Shutdown. Tests adapted to use httptest, validate header propagation, panic recovery, server config, and client initialization.
Manifest decoding
internal/core/spec/loader.go, internal/core/spec/manifest_decoder.go, internal/core/spec/loader_internal_test.go
Introduces unexported manifestDecoder and reorganizes loader helpers: centralized YAML/unmarshal and DAG assembly helpers, empty-input handling, alias validation (labels vs tags), mapstructure decode with union hook, and extraction of raw handler_on/defaults. Adds unit tests for decoding edge cases and decoder instance sharing.
File persistence (dag runs)
internal/persis/filedagrun/writer.go, internal/persis/filedagrun/writer_test.go
Refactors persistent writes to use a long-lived json.Encoder over a bufio.Writer; Open initializes buffer+encoder, writes use encoder.Encode, and flush+sync consolidated into flushAndSyncLocked. Tests verify newline-delimited JSON format and trailing newline.

Sequence Diagram(s)

sequenceDiagram
    participant Client as HTTP Client
    participant Transport as Custom Transport
    participant Socket as Unix Socket
    participant Server as HTTP Server
    participant Handler as App Handler
    participant Writer as File Writer

    Client->>Transport: Do(http://unix{path})
    activate Transport
    Transport->>Socket: DialContext -> unix socket
    Socket->>Server: HTTP request over socket
    deactivate Transport

    activate Server
    Server->>Handler: ServeHTTP(w, r)
    activate Handler

    alt handler succeeds
        Handler->>Server: write headers & body
        Server->>Client: response
        Handler->>Writer: encoder.Encode(status)
        Writer-->>Handler: flush & sync
    else handler panics
        Handler->>Handler: recover()
        Handler->>Server: WriteHeader(500) + body
        Server->>Client: 500 Internal Server Error
    end

    deactivate Handler
    deactivate Server
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main refactoring work: isolating manifest decoding into a dedicated component and standardizing Unix socket transport using net/http, which align with the primary changes across loader.go, manifest_decoder.go, client.go, and server.go.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/manifest-decoder-sock-writer

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (7)
internal/persis/filedagrun/writer_test.go (1)

111-127: Solid regression test for newline-delimited JSON output.

Reading the file back via os.ReadFile and asserting the trailing \n byte directly validates the new json.Encoder.Encode behavior. The require.NotEmpty guard before indexing data[len(data)-1] keeps the test safe.

Optional nit: you could additionally json.Unmarshal(bytes.TrimRight(data, "\n"), &decoded) to assert the line is well-formed JSON, which would catch any future regression that produces a stray newline mid-record. Not blocking.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/filedagrun/writer_test.go` around lines 111 - 127, The test
"WritesNewlineDelimitedJSON" currently asserts the file ends with '\n'; add a
JSON validity check by trimming the trailing newline from the read bytes (use
bytes.TrimRight(data, "\n")) and then json.Unmarshal the trimmed bytes into a
temporary variable (e.g. map[string]interface{} or the expected struct) and
require no error to ensure the line is well-formed JSON; locate this logic after
os.ReadFile(writerPath) and before the final assertion, referencing the test
name, NewWriter, writer.write and writer.close to find the correct spot.
internal/core/spec/manifest_decoder.go (2)

75-82: Use errors.New instead of fmt.Errorf without format args.

fmt.Errorf is unnecessary here since there are no format directives. Some golangci-lint configurations (e.g., errorlint/gosimple-adjacent rules) flag this; even without a linter, errors.New is the idiomatic choice.

♻️ Proposed change
 func validateManifestAliases(input map[string]any) error {
 	if _, hasLabels := input["labels"]; hasLabels {
 		if _, hasTags := input["tags"]; hasTags {
-			return fmt.Errorf("labels and deprecated tags cannot both be set")
+			return errors.New("labels and deprecated tags cannot both be set")
 		}
 	}
 	return nil
 }

Note: fmt would still be used elsewhere if other helpers are added, otherwise drop the import.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/spec/manifest_decoder.go` around lines 75 - 82, In
validateManifestAliases replace the call to fmt.Errorf("labels and deprecated
tags cannot both be set") with errors.New("labels and deprecated tags cannot
both be set") to avoid using fmt.Errorf without formatting directives; ensure
the errors package is imported and remove fmt if it becomes unused (or leave fmt
if still needed elsewhere).

16-24: Optional: hoist the decoder/hook to package scope to avoid per-call allocation.

manifestDecoder is stateless (the decodeHook is a pure closure with no captured state), but newManifestDecoder() is called from both unmarshalData and decode in loader.go, which run on every DAG / base-config / sub-document load. Each call allocates a fresh manifestDecoder plus a fresh TypedUnionDecodeHook closure. Not a correctness issue, but you can share a single instance:

♻️ Proposed change
-type manifestDecoder struct {
-	decodeHook mapstructure.DecodeHookFunc
-}
-
-func newManifestDecoder() *manifestDecoder {
-	return &manifestDecoder{
-		decodeHook: TypedUnionDecodeHook(),
-	}
-}
+type manifestDecoder struct {
+	decodeHook mapstructure.DecodeHookFunc
+}
+
+var defaultManifestDecoder = &manifestDecoder{
+	decodeHook: TypedUnionDecodeHook(),
+}
+
+func newManifestDecoder() *manifestDecoder {
+	return defaultManifestDecoder
+}

This keeps the existing call sites unchanged. Skip if you anticipate per-decoder configuration knobs being added soon.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/spec/manifest_decoder.go` around lines 16 - 24, manifestDecoder
and its TypedUnionDecodeHook are stateless but are being allocated on every call
from unmarshalData and decode; instead create a single package-level instance
(e.g., package var defaultManifestDecoder = &manifestDecoder{decodeHook:
TypedUnionDecodeHook()}) and have newManifestDecoder() return that shared
instance (or eliminate newManifestDecoder and have callers use the package var)
so you avoid per-call allocations while keeping existing call sites unchanged;
reference manifestDecoder, newManifestDecoder, TypedUnionDecodeHook,
unmarshalData, and decode when updating the code.
internal/cmn/sock/server.go (3)

61-66: Consider explicit timeouts on the embedded http.Server.

http.Server is created without ReadHeaderTimeout, ReadTimeout, WriteTimeout, or IdleTimeout. Because handlers can run arbitrarily long (TestShutdownWaitsForActiveHandlers confirms this), a misbehaving or stuck handler will hold the connection — and a graceful Shutdown — indefinitely. For a local unix socket with trusted internal callers this is generally acceptable, but a small ReadHeaderTimeout is cheap protection against slowloris-style stalls if the socket file's permissions ever broaden. Optional.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/sock/server.go` around lines 61 - 66, The http.Server instance
created in httpServer (with Handler: srv.httpHandler(ctx)) lacks explicit
timeouts; set sensible timeouts on the server (e.g., ReadHeaderTimeout,
ReadTimeout, WriteTimeout, IdleTimeout) in the http.Server literal to protect
against slowloris or hung handlers while preserving
TestShutdownWaitsForActiveHandlers behavior — choose modest values such as a
short ReadHeaderTimeout (a few seconds) and longer Read/Write/Idle timeouts
(tens of seconds) so active handlers can still finish but stalled connections
are bounded.

122-135: Minor: the listener != nil branch in Shutdown is effectively dead.

install sets listener and httpServer together, and clear unsets them together, so httpServer == nil && listener != nil is not reachable through the current code paths. The standalone listener.Close() block can be folded away (calling httpServer.Shutdown already closes the listener for you). Keeping it as defensive code is fine; just flagging in case you want to drop it now that the lifecycle is symmetric.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/sock/server.go` around lines 122 - 135, The listener != nil
branch in Shutdown is effectively unreachable because install and clear manage
httpServer and listener together; remove the standalone listener.Close() block
(the httpServer.Shutdown(ctx) call already closes the listener) to avoid dead
code, or alternatively keep it but add a clear comment referencing Shutdown,
install and clear to explain it is defensive; update references to
isClosedServerError and logger.Error accordingly so only the httpServer.Shutdown
path remains for normal shutdown.

87-98: Simplify the post-Serve switch — the err == nil cases are unreachable.

http.Server.Serve always returns a non-nil error, so the err == nil conditions can never trigger. Flatten the logic to an explicit check for graceful shutdown:

♻️ Suggested simplification
-	err = httpServer.Serve(listener)
-	switch {
-	case err == nil && srv.quit.Load():
-		return ErrServerRequestedShutdown
-	case isClosedServerError(err) && srv.quit.Load():
-		return ErrServerRequestedShutdown
-	case err == nil:
-		return nil
-	default:
-		return err
-	}
+	err = httpServer.Serve(listener)
+	if isClosedServerError(err) && srv.quit.Load() {
+		return ErrServerRequestedShutdown
+	}
+	return err
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/sock/server.go` around lines 87 - 98, http.Server.Serve always
returns a non-nil error, so remove the unreachable err == nil branches and
simplify the switch: after calling httpServer.Serve(listener), check if
srv.quit.Load() is true or if isClosedServerError(err) is true and in that case
return ErrServerRequestedShutdown; otherwise return err. Update the logic
referencing httpServer.Serve, isClosedServerError, srv.quit.Load, and
ErrServerRequestedShutdown accordingly.
internal/cmn/sock/client.go (1)

60-82: Consider caching the http.Client / http.Transport on Client.

httpClient() allocates a fresh http.Transport per Request call, and defer client.CloseIdleConnections() immediately tears down any idle connection. This defeats keep-alive pooling and is wasteful even for occasional calls like status polling. Since Client is immutable after NewClient, the transport can be built once and reused.

Also, http.Client.Timeout already covers the full request (dial + write + headers + body), so the inner dialer Timeout: defaultTimeout is redundant.

♻️ Suggested refactor
 type Client struct {
-	addr string
+	addr   string
+	client *http.Client
 }

 func NewClient(addr string) *Client {
-	return &Client{addr: addr}
+	c := &Client{addr: addr}
+	c.client = &http.Client{
+		Transport: &http.Transport{
+			DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
+				dialer := &net.Dialer{Timeout: defaultTimeout}
+				conn, err := dialer.DialContext(ctx, "unix", c.addr)
+				if err != nil {
+					return nil, wrapTimeout("dial unix socket", err)
+				}
+				return conn, nil
+			},
+			DisableCompression: true,
+		},
+		Timeout: defaultTimeout,
+	}
+	return c
 }

Then Request uses cl.client directly and drops httpClient() / CloseIdleConnections().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/sock/client.go` around lines 60 - 82, The current httpClient()
builds a new http.Transport per call and Request immediately calls
client.CloseIdleConnections(), preventing connection reuse; also the dialer's
Timeout duplicates http.Client.Timeout. Modify the Client struct to cache a
single *http.Client (and its Transport) created once in NewClient, remove
httpClient() and the defer client.CloseIdleConnections() from Request so Request
reuses cl.client, and remove the redundant Timeout on net.Dialer (rely on
http.Client.Timeout) when constructing the transport.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/cmn/sock/client.go`:
- Around line 60-82: The current httpClient() builds a new http.Transport per
call and Request immediately calls client.CloseIdleConnections(), preventing
connection reuse; also the dialer's Timeout duplicates http.Client.Timeout.
Modify the Client struct to cache a single *http.Client (and its Transport)
created once in NewClient, remove httpClient() and the defer
client.CloseIdleConnections() from Request so Request reuses cl.client, and
remove the redundant Timeout on net.Dialer (rely on http.Client.Timeout) when
constructing the transport.

In `@internal/cmn/sock/server.go`:
- Around line 61-66: The http.Server instance created in httpServer (with
Handler: srv.httpHandler(ctx)) lacks explicit timeouts; set sensible timeouts on
the server (e.g., ReadHeaderTimeout, ReadTimeout, WriteTimeout, IdleTimeout) in
the http.Server literal to protect against slowloris or hung handlers while
preserving TestShutdownWaitsForActiveHandlers behavior — choose modest values
such as a short ReadHeaderTimeout (a few seconds) and longer Read/Write/Idle
timeouts (tens of seconds) so active handlers can still finish but stalled
connections are bounded.
- Around line 122-135: The listener != nil branch in Shutdown is effectively
unreachable because install and clear manage httpServer and listener together;
remove the standalone listener.Close() block (the httpServer.Shutdown(ctx) call
already closes the listener) to avoid dead code, or alternatively keep it but
add a clear comment referencing Shutdown, install and clear to explain it is
defensive; update references to isClosedServerError and logger.Error accordingly
so only the httpServer.Shutdown path remains for normal shutdown.
- Around line 87-98: http.Server.Serve always returns a non-nil error, so remove
the unreachable err == nil branches and simplify the switch: after calling
httpServer.Serve(listener), check if srv.quit.Load() is true or if
isClosedServerError(err) is true and in that case return
ErrServerRequestedShutdown; otherwise return err. Update the logic referencing
httpServer.Serve, isClosedServerError, srv.quit.Load, and
ErrServerRequestedShutdown accordingly.

In `@internal/core/spec/manifest_decoder.go`:
- Around line 75-82: In validateManifestAliases replace the call to
fmt.Errorf("labels and deprecated tags cannot both be set") with
errors.New("labels and deprecated tags cannot both be set") to avoid using
fmt.Errorf without formatting directives; ensure the errors package is imported
and remove fmt if it becomes unused (or leave fmt if still needed elsewhere).
- Around line 16-24: manifestDecoder and its TypedUnionDecodeHook are stateless
but are being allocated on every call from unmarshalData and decode; instead
create a single package-level instance (e.g., package var defaultManifestDecoder
= &manifestDecoder{decodeHook: TypedUnionDecodeHook()}) and have
newManifestDecoder() return that shared instance (or eliminate
newManifestDecoder and have callers use the package var) so you avoid per-call
allocations while keeping existing call sites unchanged; reference
manifestDecoder, newManifestDecoder, TypedUnionDecodeHook, unmarshalData, and
decode when updating the code.

In `@internal/persis/filedagrun/writer_test.go`:
- Around line 111-127: The test "WritesNewlineDelimitedJSON" currently asserts
the file ends with '\n'; add a JSON validity check by trimming the trailing
newline from the read bytes (use bytes.TrimRight(data, "\n")) and then
json.Unmarshal the trimmed bytes into a temporary variable (e.g.
map[string]interface{} or the expected struct) and require no error to ensure
the line is well-formed JSON; locate this logic after os.ReadFile(writerPath)
and before the final assertion, referencing the test name, NewWriter,
writer.write and writer.close to find the correct spot.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 959b70c9-4293-4c1c-ad70-cc6351b1f227

📥 Commits

Reviewing files that changed from the base of the PR and between 4d2bc99 and 1be75c7.

📒 Files selected for processing (9)
  • internal/cmn/sock/client.go
  • internal/cmn/sock/server.go
  • internal/cmn/sock/server_internal_test.go
  • internal/cmn/sock/server_test.go
  • internal/core/spec/loader.go
  • internal/core/spec/loader_internal_test.go
  • internal/core/spec/manifest_decoder.go
  • internal/persis/filedagrun/writer.go
  • internal/persis/filedagrun/writer_test.go

@yottahmd
Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 25, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
internal/cmn/sock/server.go (1)

105-115: Include a stack trace when logging panics.

The recovery path logs the panic value but not where it came from, which makes triaging real production panics harder once you switch to a standard net/http pipeline. Capturing debug.Stack() keeps the original signal you'd otherwise have gotten from an uncaught panic.

♻️ Proposed change
-	"log/slog"
+	"log/slog"
+	"runtime/debug"
@@
 		defer func() {
 			if recovered := recover(); recovered != nil {
-				logger.Error(ctx, "Socket handler panicked", slog.Any("panic", recovered))
+				logger.Error(ctx, "Socket handler panicked",
+					slog.Any("panic", recovered),
+					slog.String("stack", string(debug.Stack())),
+				)
 				http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
 			}
 		}()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/sock/server.go` around lines 105 - 115, The panic recovery in
Server.httpHandler currently logs only the recovered value; modify the defer to
capture the goroutine stack (runtime/debug.Stack()) when a panic occurs and
include it in the error log alongside the panic value so you can trace where it
came from (update the logger.Error call in httpHandler to pass both the
recovered panic and the stack, e.g. slog.Any("panic", recovered) and
slog.Any("stack", string(stack))). Also add the runtime/debug import if missing
and keep srv.handlerFunc(w, r) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/cmn/sock/server.go`:
- Around line 105-115: The panic recovery in Server.httpHandler currently logs
only the recovered value; modify the defer to capture the goroutine stack
(runtime/debug.Stack()) when a panic occurs and include it in the error log
alongside the panic value so you can trace where it came from (update the
logger.Error call in httpHandler to pass both the recovered panic and the stack,
e.g. slog.Any("panic", recovered) and slog.Any("stack", string(stack))). Also
add the runtime/debug import if missing and keep srv.handlerFunc(w, r)
unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9a7f57f0-a0a2-4665-98a2-bc70d8ce64e6

📥 Commits

Reviewing files that changed from the base of the PR and between 1be75c7 and de2d93a.

📒 Files selected for processing (8)
  • internal/cmn/sock/client.go
  • internal/cmn/sock/client_internal_test.go
  • internal/cmn/sock/server.go
  • internal/cmn/sock/server_internal_test.go
  • internal/core/spec/loader_internal_test.go
  • internal/core/spec/manifest_decoder.go
  • internal/persis/filedagrun/writer.go
  • internal/persis/filedagrun/writer_test.go
✅ Files skipped from review due to trivial changes (1)
  • internal/cmn/sock/client.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • internal/persis/filedagrun/writer_test.go
  • internal/core/spec/manifest_decoder.go
  • internal/core/spec/loader_internal_test.go
  • internal/cmn/sock/server_internal_test.go

@yottahmd
Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 25, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (4)
internal/persis/filedagrun/writer_test.go (1)

115-135: LGTM — solid regression test for the NDJSON contract.

The subtest correctly validates both invariants required by parseStatusFileWithContext (line-oriented reads in internal/persis/filedagrun/attempt.go): the payload is valid JSON and the file ends with '\n'. Imports, lifecycle, and assertion choices align with the rest of the file.

One small optional refinement: since decoded is only used to confirm parseability, you could either assert a meaningful field on it (e.g. the dagRunID) to also guard against accidental schema drift, or replace the unmarshal with json.Valid to make intent explicit.

♻️ Optional: assert a field to guard against schema regressions
-		var decoded map[string]any
-		require.NoError(t, json.Unmarshal(bytes.TrimRight(data, "\n"), &decoded))
-
-		assert.Equal(t, byte('\n'), data[len(data)-1])
+		var decoded exec.DAGRunStatus
+		require.NoError(t, json.Unmarshal(bytes.TrimRight(data, "\n"), &decoded))
+		assert.Equal(t, dagRunID, decoded.DAGRunID)
+		assert.Equal(t, byte('\n'), data[len(data)-1])
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/persis/filedagrun/writer_test.go` around lines 115 - 135, Test
currently only checks that NDJSON is valid JSON; strengthen it to guard against
schema drift by asserting a meaningful field equals the created dagRunID. In the
subtest WritesNewlineDelimitedJSON (variables: dagRunID, decoded), after
unmarshalling into decoded verify decoded["dagRunID"] (or the actual JSON key
produced by transform.NewStatusBuilder) equals dagRunID in addition to the
existing newline and parseability checks so the test fails if the payload schema
changes; adjust the JSON key if necessary to match the produced status payload.
internal/cmn/sock/server.go (2)

33-37: srv.listener tracking is now redundant.

After this refactor, the listener's lifecycle is fully owned by http.ServerhttpServer.Shutdown(ctx) closes the registered listener for you, and Serve only uses listener as a local variable to pass into httpServer.Serve. The srv.listener field is no longer read by Shutdown and only exists to be assigned and re-cleared. You could drop the field (and the listener parameter on install/clear) and key the install/clear guard solely on httpServer, which would simplify the locking surface a bit.

Also applies to: 124-141, 143-165

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/sock/server.go` around lines 33 - 37, The srv.listener field is
now redundant because httpServer owns the listener lifecycle; remove the
listener field and the listener parameter from install/clear, and change all
code that set/cleared srv.listener to instead guard on srv.httpServer (with the
existing mu) so install returns error if srv.httpServer != nil and clear only
clears/sets httpServer=nil. Update logic in Serve and Shutdown to create a local
listener variable and pass it into srv.httpServer.Serve, and rely on
srv.httpServer.Shutdown(ctx) to close it; remove any reads of srv.listener and
the re-clear behavior that only touched that field.

105-121: Re-panic http.ErrAbortHandler to preserve standard library semantics.

net/http treats a panic with http.ErrAbortHandler as a signal to silently abort the response without logging. The current custom defer recover() intercepts this, logging it at error level and writing a 500 response to the client. If any handler uses that idiom, it will produce spurious errors and incorrect behavior.

Suggested adjustment
 		defer func() {
 			if recovered := recover(); recovered != nil {
+				if recovered == http.ErrAbortHandler {
+					panic(recovered)
+				}
 				logger.Error(
 					ctx,
 					"Socket handler panicked",
 					slog.Any("panic", recovered),
 					slog.String("stack", string(debug.Stack())),
 				)
 				http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
 			}
 		}()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/sock/server.go` around lines 105 - 121, The current panic
recovery in Server.httpHandler catches all panics (including
http.ErrAbortHandler) causing incorrect logging/500 responses; update the defer
to re-panic when recovered equals http.ErrAbortHandler so standard library
semantics are preserved. Specifically, inside the deferred func in httpHandler,
after obtaining recovered := recover(), if recovered == http.ErrAbortHandler
then call panic(recovered) (or return before logging/HttpError), otherwise
proceed to log the panic and write the 500 response; this change should be
applied around the existing logger.Error and http.Error calls and keeps
srv.handlerFunc(w, r) behavior intact.
internal/cmn/sock/server_internal_test.go (1)

50-63: Consider also asserting BaseContext.

Optional: since newHTTPServer is the single source of truth for http.Server configuration, this test could additionally assert that httpServer.BaseContext != nil and that invoking it returns the supplied context. That would lock down the BaseContext wiring against future regressions alongside the timeout values.

 	httpServer := srv.newHTTPServer(context.Background())
 	require.Equal(t, defaultTimeout, httpServer.ReadHeaderTimeout)
 	require.Equal(t, idleTimeout, httpServer.IdleTimeout)
+	require.NotNil(t, httpServer.BaseContext)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/sock/server_internal_test.go` around lines 50 - 63, Add
assertions in TestNewHTTPServerConfiguresTimeouts to verify Server.BaseContext
wiring: create a ctx := context.Background(), pass it into
srv.newHTTPServer(ctx), assert httpServer.BaseContext != nil, then invoke
httpServer.BaseContext(nil) and assert it returns the same ctx (use require.Same
or equivalent) so the BaseContext preserves the supplied context.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/cmn/sock/server_internal_test.go`:
- Around line 50-63: Add assertions in TestNewHTTPServerConfiguresTimeouts to
verify Server.BaseContext wiring: create a ctx := context.Background(), pass it
into srv.newHTTPServer(ctx), assert httpServer.BaseContext != nil, then invoke
httpServer.BaseContext(nil) and assert it returns the same ctx (use require.Same
or equivalent) so the BaseContext preserves the supplied context.

In `@internal/cmn/sock/server.go`:
- Around line 33-37: The srv.listener field is now redundant because httpServer
owns the listener lifecycle; remove the listener field and the listener
parameter from install/clear, and change all code that set/cleared srv.listener
to instead guard on srv.httpServer (with the existing mu) so install returns
error if srv.httpServer != nil and clear only clears/sets httpServer=nil. Update
logic in Serve and Shutdown to create a local listener variable and pass it into
srv.httpServer.Serve, and rely on srv.httpServer.Shutdown(ctx) to close it;
remove any reads of srv.listener and the re-clear behavior that only touched
that field.
- Around line 105-121: The current panic recovery in Server.httpHandler catches
all panics (including http.ErrAbortHandler) causing incorrect logging/500
responses; update the defer to re-panic when recovered equals
http.ErrAbortHandler so standard library semantics are preserved. Specifically,
inside the deferred func in httpHandler, after obtaining recovered := recover(),
if recovered == http.ErrAbortHandler then call panic(recovered) (or return
before logging/HttpError), otherwise proceed to log the panic and write the 500
response; this change should be applied around the existing logger.Error and
http.Error calls and keeps srv.handlerFunc(w, r) behavior intact.

In `@internal/persis/filedagrun/writer_test.go`:
- Around line 115-135: Test currently only checks that NDJSON is valid JSON;
strengthen it to guard against schema drift by asserting a meaningful field
equals the created dagRunID. In the subtest WritesNewlineDelimitedJSON
(variables: dagRunID, decoded), after unmarshalling into decoded verify
decoded["dagRunID"] (or the actual JSON key produced by
transform.NewStatusBuilder) equals dagRunID in addition to the existing newline
and parseability checks so the test fails if the payload schema changes; adjust
the JSON key if necessary to match the produced status payload.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 55369467-2d22-4266-a9ec-3abf7719328c

📥 Commits

Reviewing files that changed from the base of the PR and between de2d93a and bbb6da0.

📒 Files selected for processing (7)
  • internal/cmn/sock/client_internal_test.go
  • internal/cmn/sock/server.go
  • internal/cmn/sock/server_internal_test.go
  • internal/cmn/sock/server_test.go
  • internal/core/spec/loader_internal_test.go
  • internal/persis/filedagrun/writer.go
  • internal/persis/filedagrun/writer_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/cmn/sock/client_internal_test.go
  • internal/core/spec/loader_internal_test.go
  • internal/persis/filedagrun/writer.go

@yottahmd
Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 25, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@yottahmd yottahmd merged commit fb6fbf6 into main Apr 25, 2026
10 checks passed
@yottahmd yottahmd deleted the refactor/manifest-decoder-sock-writer branch April 25, 2026 14:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant