refactor: isolate manifest decoding and standardize local transport#2035
refactor: isolate manifest decoding and standardize local transport#2035
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughReplaces manual UNIX-socket request/response handling with net/http-based client and server; centralizes YAML manifest decoding into an unexported manifestDecoder; and switches file dag-run persistence to a persistent json.Encoder with buffered I/O and unified flush+sync. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as HTTP Client
participant Transport as Custom Transport
participant Socket as Unix Socket
participant Server as HTTP Server
participant Handler as App Handler
participant Writer as File Writer
Client->>Transport: Do(http://unix{path})
activate Transport
Transport->>Socket: DialContext -> unix socket
Socket->>Server: HTTP request over socket
deactivate Transport
activate Server
Server->>Handler: ServeHTTP(w, r)
activate Handler
alt handler succeeds
Handler->>Server: write headers & body
Server->>Client: response
Handler->>Writer: encoder.Encode(status)
Writer-->>Handler: flush & sync
else handler panics
Handler->>Handler: recover()
Handler->>Server: WriteHeader(500) + body
Server->>Client: 500 Internal Server Error
end
deactivate Handler
deactivate Server
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (7)
internal/persis/filedagrun/writer_test.go (1)
111-127: Solid regression test for newline-delimited JSON output.Reading the file back via
os.ReadFileand asserting the trailing\nbyte directly validates the newjson.Encoder.Encodebehavior. Therequire.NotEmptyguard before indexingdata[len(data)-1]keeps the test safe.Optional nit: you could additionally
json.Unmarshal(bytes.TrimRight(data, "\n"), &decoded)to assert the line is well-formed JSON, which would catch any future regression that produces a stray newline mid-record. Not blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/persis/filedagrun/writer_test.go` around lines 111 - 127, The test "WritesNewlineDelimitedJSON" currently asserts the file ends with '\n'; add a JSON validity check by trimming the trailing newline from the read bytes (use bytes.TrimRight(data, "\n")) and then json.Unmarshal the trimmed bytes into a temporary variable (e.g. map[string]interface{} or the expected struct) and require no error to ensure the line is well-formed JSON; locate this logic after os.ReadFile(writerPath) and before the final assertion, referencing the test name, NewWriter, writer.write and writer.close to find the correct spot.internal/core/spec/manifest_decoder.go (2)
75-82: Useerrors.Newinstead offmt.Errorfwithout format args.
fmt.Errorfis unnecessary here since there are no format directives. Somegolangci-lintconfigurations (e.g.,errorlint/gosimple-adjacent rules) flag this; even without a linter,errors.Newis the idiomatic choice.♻️ Proposed change
func validateManifestAliases(input map[string]any) error { if _, hasLabels := input["labels"]; hasLabels { if _, hasTags := input["tags"]; hasTags { - return fmt.Errorf("labels and deprecated tags cannot both be set") + return errors.New("labels and deprecated tags cannot both be set") } } return nil }Note:
fmtwould still be used elsewhere if other helpers are added, otherwise drop the import.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/core/spec/manifest_decoder.go` around lines 75 - 82, In validateManifestAliases replace the call to fmt.Errorf("labels and deprecated tags cannot both be set") with errors.New("labels and deprecated tags cannot both be set") to avoid using fmt.Errorf without formatting directives; ensure the errors package is imported and remove fmt if it becomes unused (or leave fmt if still needed elsewhere).
16-24: Optional: hoist the decoder/hook to package scope to avoid per-call allocation.
manifestDecoderis stateless (thedecodeHookis a pure closure with no captured state), butnewManifestDecoder()is called from bothunmarshalDataanddecodeinloader.go, which run on every DAG / base-config / sub-document load. Each call allocates a freshmanifestDecoderplus a freshTypedUnionDecodeHookclosure. Not a correctness issue, but you can share a single instance:♻️ Proposed change
-type manifestDecoder struct { - decodeHook mapstructure.DecodeHookFunc -} - -func newManifestDecoder() *manifestDecoder { - return &manifestDecoder{ - decodeHook: TypedUnionDecodeHook(), - } -} +type manifestDecoder struct { + decodeHook mapstructure.DecodeHookFunc +} + +var defaultManifestDecoder = &manifestDecoder{ + decodeHook: TypedUnionDecodeHook(), +} + +func newManifestDecoder() *manifestDecoder { + return defaultManifestDecoder +}This keeps the existing call sites unchanged. Skip if you anticipate per-decoder configuration knobs being added soon.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/core/spec/manifest_decoder.go` around lines 16 - 24, manifestDecoder and its TypedUnionDecodeHook are stateless but are being allocated on every call from unmarshalData and decode; instead create a single package-level instance (e.g., package var defaultManifestDecoder = &manifestDecoder{decodeHook: TypedUnionDecodeHook()}) and have newManifestDecoder() return that shared instance (or eliminate newManifestDecoder and have callers use the package var) so you avoid per-call allocations while keeping existing call sites unchanged; reference manifestDecoder, newManifestDecoder, TypedUnionDecodeHook, unmarshalData, and decode when updating the code.internal/cmn/sock/server.go (3)
61-66: Consider explicit timeouts on the embeddedhttp.Server.
http.Serveris created withoutReadHeaderTimeout,ReadTimeout,WriteTimeout, orIdleTimeout. Because handlers can run arbitrarily long (TestShutdownWaitsForActiveHandlersconfirms this), a misbehaving or stuck handler will hold the connection — and a gracefulShutdown— indefinitely. For a local unix socket with trusted internal callers this is generally acceptable, but a smallReadHeaderTimeoutis cheap protection against slowloris-style stalls if the socket file's permissions ever broaden. Optional.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/sock/server.go` around lines 61 - 66, The http.Server instance created in httpServer (with Handler: srv.httpHandler(ctx)) lacks explicit timeouts; set sensible timeouts on the server (e.g., ReadHeaderTimeout, ReadTimeout, WriteTimeout, IdleTimeout) in the http.Server literal to protect against slowloris or hung handlers while preserving TestShutdownWaitsForActiveHandlers behavior — choose modest values such as a short ReadHeaderTimeout (a few seconds) and longer Read/Write/Idle timeouts (tens of seconds) so active handlers can still finish but stalled connections are bounded.
122-135: Minor: thelistener != nilbranch inShutdownis effectively dead.
installsetslistenerandhttpServertogether, andclearunsets them together, sohttpServer == nil && listener != nilis not reachable through the current code paths. The standalonelistener.Close()block can be folded away (callinghttpServer.Shutdownalready closes the listener for you). Keeping it as defensive code is fine; just flagging in case you want to drop it now that the lifecycle is symmetric.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/sock/server.go` around lines 122 - 135, The listener != nil branch in Shutdown is effectively unreachable because install and clear manage httpServer and listener together; remove the standalone listener.Close() block (the httpServer.Shutdown(ctx) call already closes the listener) to avoid dead code, or alternatively keep it but add a clear comment referencing Shutdown, install and clear to explain it is defensive; update references to isClosedServerError and logger.Error accordingly so only the httpServer.Shutdown path remains for normal shutdown.
87-98: Simplify the post-Serveswitch — theerr == nilcases are unreachable.
http.Server.Servealways returns a non-nil error, so theerr == nilconditions can never trigger. Flatten the logic to an explicit check for graceful shutdown:♻️ Suggested simplification
- err = httpServer.Serve(listener) - switch { - case err == nil && srv.quit.Load(): - return ErrServerRequestedShutdown - case isClosedServerError(err) && srv.quit.Load(): - return ErrServerRequestedShutdown - case err == nil: - return nil - default: - return err - } + err = httpServer.Serve(listener) + if isClosedServerError(err) && srv.quit.Load() { + return ErrServerRequestedShutdown + } + return err🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/sock/server.go` around lines 87 - 98, http.Server.Serve always returns a non-nil error, so remove the unreachable err == nil branches and simplify the switch: after calling httpServer.Serve(listener), check if srv.quit.Load() is true or if isClosedServerError(err) is true and in that case return ErrServerRequestedShutdown; otherwise return err. Update the logic referencing httpServer.Serve, isClosedServerError, srv.quit.Load, and ErrServerRequestedShutdown accordingly.internal/cmn/sock/client.go (1)
60-82: Consider caching thehttp.Client/http.TransportonClient.
httpClient()allocates a freshhttp.TransportperRequestcall, anddefer client.CloseIdleConnections()immediately tears down any idle connection. This defeats keep-alive pooling and is wasteful even for occasional calls like status polling. SinceClientis immutable afterNewClient, the transport can be built once and reused.Also,
http.Client.Timeoutalready covers the full request (dial + write + headers + body), so the inner dialerTimeout: defaultTimeoutis redundant.♻️ Suggested refactor
type Client struct { - addr string + addr string + client *http.Client } func NewClient(addr string) *Client { - return &Client{addr: addr} + c := &Client{addr: addr} + c.client = &http.Client{ + Transport: &http.Transport{ + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + dialer := &net.Dialer{Timeout: defaultTimeout} + conn, err := dialer.DialContext(ctx, "unix", c.addr) + if err != nil { + return nil, wrapTimeout("dial unix socket", err) + } + return conn, nil + }, + DisableCompression: true, + }, + Timeout: defaultTimeout, + } + return c }Then
Requestusescl.clientdirectly and dropshttpClient()/CloseIdleConnections().🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/sock/client.go` around lines 60 - 82, The current httpClient() builds a new http.Transport per call and Request immediately calls client.CloseIdleConnections(), preventing connection reuse; also the dialer's Timeout duplicates http.Client.Timeout. Modify the Client struct to cache a single *http.Client (and its Transport) created once in NewClient, remove httpClient() and the defer client.CloseIdleConnections() from Request so Request reuses cl.client, and remove the redundant Timeout on net.Dialer (rely on http.Client.Timeout) when constructing the transport.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/cmn/sock/client.go`:
- Around line 60-82: The current httpClient() builds a new http.Transport per
call and Request immediately calls client.CloseIdleConnections(), preventing
connection reuse; also the dialer's Timeout duplicates http.Client.Timeout.
Modify the Client struct to cache a single *http.Client (and its Transport)
created once in NewClient, remove httpClient() and the defer
client.CloseIdleConnections() from Request so Request reuses cl.client, and
remove the redundant Timeout on net.Dialer (rely on http.Client.Timeout) when
constructing the transport.
In `@internal/cmn/sock/server.go`:
- Around line 61-66: The http.Server instance created in httpServer (with
Handler: srv.httpHandler(ctx)) lacks explicit timeouts; set sensible timeouts on
the server (e.g., ReadHeaderTimeout, ReadTimeout, WriteTimeout, IdleTimeout) in
the http.Server literal to protect against slowloris or hung handlers while
preserving TestShutdownWaitsForActiveHandlers behavior — choose modest values
such as a short ReadHeaderTimeout (a few seconds) and longer Read/Write/Idle
timeouts (tens of seconds) so active handlers can still finish but stalled
connections are bounded.
- Around line 122-135: The listener != nil branch in Shutdown is effectively
unreachable because install and clear manage httpServer and listener together;
remove the standalone listener.Close() block (the httpServer.Shutdown(ctx) call
already closes the listener) to avoid dead code, or alternatively keep it but
add a clear comment referencing Shutdown, install and clear to explain it is
defensive; update references to isClosedServerError and logger.Error accordingly
so only the httpServer.Shutdown path remains for normal shutdown.
- Around line 87-98: http.Server.Serve always returns a non-nil error, so remove
the unreachable err == nil branches and simplify the switch: after calling
httpServer.Serve(listener), check if srv.quit.Load() is true or if
isClosedServerError(err) is true and in that case return
ErrServerRequestedShutdown; otherwise return err. Update the logic referencing
httpServer.Serve, isClosedServerError, srv.quit.Load, and
ErrServerRequestedShutdown accordingly.
In `@internal/core/spec/manifest_decoder.go`:
- Around line 75-82: In validateManifestAliases replace the call to
fmt.Errorf("labels and deprecated tags cannot both be set") with
errors.New("labels and deprecated tags cannot both be set") to avoid using
fmt.Errorf without formatting directives; ensure the errors package is imported
and remove fmt if it becomes unused (or leave fmt if still needed elsewhere).
- Around line 16-24: manifestDecoder and its TypedUnionDecodeHook are stateless
but are being allocated on every call from unmarshalData and decode; instead
create a single package-level instance (e.g., package var defaultManifestDecoder
= &manifestDecoder{decodeHook: TypedUnionDecodeHook()}) and have
newManifestDecoder() return that shared instance (or eliminate
newManifestDecoder and have callers use the package var) so you avoid per-call
allocations while keeping existing call sites unchanged; reference
manifestDecoder, newManifestDecoder, TypedUnionDecodeHook, unmarshalData, and
decode when updating the code.
In `@internal/persis/filedagrun/writer_test.go`:
- Around line 111-127: The test "WritesNewlineDelimitedJSON" currently asserts
the file ends with '\n'; add a JSON validity check by trimming the trailing
newline from the read bytes (use bytes.TrimRight(data, "\n")) and then
json.Unmarshal the trimmed bytes into a temporary variable (e.g.
map[string]interface{} or the expected struct) and require no error to ensure
the line is well-formed JSON; locate this logic after os.ReadFile(writerPath)
and before the final assertion, referencing the test name, NewWriter,
writer.write and writer.close to find the correct spot.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 959b70c9-4293-4c1c-ad70-cc6351b1f227
📒 Files selected for processing (9)
internal/cmn/sock/client.gointernal/cmn/sock/server.gointernal/cmn/sock/server_internal_test.gointernal/cmn/sock/server_test.gointernal/core/spec/loader.gointernal/core/spec/loader_internal_test.gointernal/core/spec/manifest_decoder.gointernal/persis/filedagrun/writer.gointernal/persis/filedagrun/writer_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/cmn/sock/server.go (1)
105-115: Include a stack trace when logging panics.The recovery path logs the panic value but not where it came from, which makes triaging real production panics harder once you switch to a standard
net/httppipeline. Capturingdebug.Stack()keeps the original signal you'd otherwise have gotten from an uncaught panic.♻️ Proposed change
- "log/slog" + "log/slog" + "runtime/debug" @@ defer func() { if recovered := recover(); recovered != nil { - logger.Error(ctx, "Socket handler panicked", slog.Any("panic", recovered)) + logger.Error(ctx, "Socket handler panicked", + slog.Any("panic", recovered), + slog.String("stack", string(debug.Stack())), + ) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) } }()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/sock/server.go` around lines 105 - 115, The panic recovery in Server.httpHandler currently logs only the recovered value; modify the defer to capture the goroutine stack (runtime/debug.Stack()) when a panic occurs and include it in the error log alongside the panic value so you can trace where it came from (update the logger.Error call in httpHandler to pass both the recovered panic and the stack, e.g. slog.Any("panic", recovered) and slog.Any("stack", string(stack))). Also add the runtime/debug import if missing and keep srv.handlerFunc(w, r) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/cmn/sock/server.go`:
- Around line 105-115: The panic recovery in Server.httpHandler currently logs
only the recovered value; modify the defer to capture the goroutine stack
(runtime/debug.Stack()) when a panic occurs and include it in the error log
alongside the panic value so you can trace where it came from (update the
logger.Error call in httpHandler to pass both the recovered panic and the stack,
e.g. slog.Any("panic", recovered) and slog.Any("stack", string(stack))). Also
add the runtime/debug import if missing and keep srv.handlerFunc(w, r)
unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9a7f57f0-a0a2-4665-98a2-bc70d8ce64e6
📒 Files selected for processing (8)
internal/cmn/sock/client.gointernal/cmn/sock/client_internal_test.gointernal/cmn/sock/server.gointernal/cmn/sock/server_internal_test.gointernal/core/spec/loader_internal_test.gointernal/core/spec/manifest_decoder.gointernal/persis/filedagrun/writer.gointernal/persis/filedagrun/writer_test.go
✅ Files skipped from review due to trivial changes (1)
- internal/cmn/sock/client.go
🚧 Files skipped from review as they are similar to previous changes (4)
- internal/persis/filedagrun/writer_test.go
- internal/core/spec/manifest_decoder.go
- internal/core/spec/loader_internal_test.go
- internal/cmn/sock/server_internal_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (4)
internal/persis/filedagrun/writer_test.go (1)
115-135: LGTM — solid regression test for the NDJSON contract.The subtest correctly validates both invariants required by
parseStatusFileWithContext(line-oriented reads ininternal/persis/filedagrun/attempt.go): the payload is valid JSON and the file ends with'\n'. Imports, lifecycle, and assertion choices align with the rest of the file.One small optional refinement: since
decodedis only used to confirm parseability, you could either assert a meaningful field on it (e.g. thedagRunID) to also guard against accidental schema drift, or replace the unmarshal withjson.Validto make intent explicit.♻️ Optional: assert a field to guard against schema regressions
- var decoded map[string]any - require.NoError(t, json.Unmarshal(bytes.TrimRight(data, "\n"), &decoded)) - - assert.Equal(t, byte('\n'), data[len(data)-1]) + var decoded exec.DAGRunStatus + require.NoError(t, json.Unmarshal(bytes.TrimRight(data, "\n"), &decoded)) + assert.Equal(t, dagRunID, decoded.DAGRunID) + assert.Equal(t, byte('\n'), data[len(data)-1])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/persis/filedagrun/writer_test.go` around lines 115 - 135, Test currently only checks that NDJSON is valid JSON; strengthen it to guard against schema drift by asserting a meaningful field equals the created dagRunID. In the subtest WritesNewlineDelimitedJSON (variables: dagRunID, decoded), after unmarshalling into decoded verify decoded["dagRunID"] (or the actual JSON key produced by transform.NewStatusBuilder) equals dagRunID in addition to the existing newline and parseability checks so the test fails if the payload schema changes; adjust the JSON key if necessary to match the produced status payload.internal/cmn/sock/server.go (2)
33-37:srv.listenertracking is now redundant.After this refactor, the listener's lifecycle is fully owned by
http.Server—httpServer.Shutdown(ctx)closes the registered listener for you, andServeonly useslisteneras a local variable to pass intohttpServer.Serve. Thesrv.listenerfield is no longer read byShutdownand only exists to be assigned and re-cleared. You could drop the field (and the listener parameter oninstall/clear) and key the install/clear guard solely onhttpServer, which would simplify the locking surface a bit.Also applies to: 124-141, 143-165
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/sock/server.go` around lines 33 - 37, The srv.listener field is now redundant because httpServer owns the listener lifecycle; remove the listener field and the listener parameter from install/clear, and change all code that set/cleared srv.listener to instead guard on srv.httpServer (with the existing mu) so install returns error if srv.httpServer != nil and clear only clears/sets httpServer=nil. Update logic in Serve and Shutdown to create a local listener variable and pass it into srv.httpServer.Serve, and rely on srv.httpServer.Shutdown(ctx) to close it; remove any reads of srv.listener and the re-clear behavior that only touched that field.
105-121: Re-panichttp.ErrAbortHandlerto preserve standard library semantics.
net/httptreats a panic withhttp.ErrAbortHandleras a signal to silently abort the response without logging. The current customdefer recover()intercepts this, logging it at error level and writing a 500 response to the client. If any handler uses that idiom, it will produce spurious errors and incorrect behavior.Suggested adjustment
defer func() { if recovered := recover(); recovered != nil { + if recovered == http.ErrAbortHandler { + panic(recovered) + } logger.Error( ctx, "Socket handler panicked", slog.Any("panic", recovered), slog.String("stack", string(debug.Stack())), ) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) } }()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/sock/server.go` around lines 105 - 121, The current panic recovery in Server.httpHandler catches all panics (including http.ErrAbortHandler) causing incorrect logging/500 responses; update the defer to re-panic when recovered equals http.ErrAbortHandler so standard library semantics are preserved. Specifically, inside the deferred func in httpHandler, after obtaining recovered := recover(), if recovered == http.ErrAbortHandler then call panic(recovered) (or return before logging/HttpError), otherwise proceed to log the panic and write the 500 response; this change should be applied around the existing logger.Error and http.Error calls and keeps srv.handlerFunc(w, r) behavior intact.internal/cmn/sock/server_internal_test.go (1)
50-63: Consider also assertingBaseContext.Optional: since
newHTTPServeris the single source of truth forhttp.Serverconfiguration, this test could additionally assert thathttpServer.BaseContext != niland that invoking it returns the supplied context. That would lock down theBaseContextwiring against future regressions alongside the timeout values.httpServer := srv.newHTTPServer(context.Background()) require.Equal(t, defaultTimeout, httpServer.ReadHeaderTimeout) require.Equal(t, idleTimeout, httpServer.IdleTimeout) + require.NotNil(t, httpServer.BaseContext)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/sock/server_internal_test.go` around lines 50 - 63, Add assertions in TestNewHTTPServerConfiguresTimeouts to verify Server.BaseContext wiring: create a ctx := context.Background(), pass it into srv.newHTTPServer(ctx), assert httpServer.BaseContext != nil, then invoke httpServer.BaseContext(nil) and assert it returns the same ctx (use require.Same or equivalent) so the BaseContext preserves the supplied context.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/cmn/sock/server_internal_test.go`:
- Around line 50-63: Add assertions in TestNewHTTPServerConfiguresTimeouts to
verify Server.BaseContext wiring: create a ctx := context.Background(), pass it
into srv.newHTTPServer(ctx), assert httpServer.BaseContext != nil, then invoke
httpServer.BaseContext(nil) and assert it returns the same ctx (use require.Same
or equivalent) so the BaseContext preserves the supplied context.
In `@internal/cmn/sock/server.go`:
- Around line 33-37: The srv.listener field is now redundant because httpServer
owns the listener lifecycle; remove the listener field and the listener
parameter from install/clear, and change all code that set/cleared srv.listener
to instead guard on srv.httpServer (with the existing mu) so install returns
error if srv.httpServer != nil and clear only clears/sets httpServer=nil. Update
logic in Serve and Shutdown to create a local listener variable and pass it into
srv.httpServer.Serve, and rely on srv.httpServer.Shutdown(ctx) to close it;
remove any reads of srv.listener and the re-clear behavior that only touched
that field.
- Around line 105-121: The current panic recovery in Server.httpHandler catches
all panics (including http.ErrAbortHandler) causing incorrect logging/500
responses; update the defer to re-panic when recovered equals
http.ErrAbortHandler so standard library semantics are preserved. Specifically,
inside the deferred func in httpHandler, after obtaining recovered := recover(),
if recovered == http.ErrAbortHandler then call panic(recovered) (or return
before logging/HttpError), otherwise proceed to log the panic and write the 500
response; this change should be applied around the existing logger.Error and
http.Error calls and keeps srv.handlerFunc(w, r) behavior intact.
In `@internal/persis/filedagrun/writer_test.go`:
- Around line 115-135: Test currently only checks that NDJSON is valid JSON;
strengthen it to guard against schema drift by asserting a meaningful field
equals the created dagRunID. In the subtest WritesNewlineDelimitedJSON
(variables: dagRunID, decoded), after unmarshalling into decoded verify
decoded["dagRunID"] (or the actual JSON key produced by
transform.NewStatusBuilder) equals dagRunID in addition to the existing newline
and parseability checks so the test fails if the payload schema changes; adjust
the JSON key if necessary to match the produced status payload.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 55369467-2d22-4266-a9ec-3abf7719328c
📒 Files selected for processing (7)
internal/cmn/sock/client_internal_test.gointernal/cmn/sock/server.gointernal/cmn/sock/server_internal_test.gointernal/cmn/sock/server_test.gointernal/core/spec/loader_internal_test.gointernal/persis/filedagrun/writer.gointernal/persis/filedagrun/writer_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
- internal/cmn/sock/client_internal_test.go
- internal/core/spec/loader_internal_test.go
- internal/persis/filedagrun/writer.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Summary
Testing
Summary by CodeRabbit
Refactor
Bug Fixes
Tests