Skip to content

Added support for SOCK_SEQPACKET#1221

Merged
xuyao0127 merged 4 commits intomainfrom
dev/karya0/seqpacket
Oct 30, 2025
Merged

Added support for SOCK_SEQPACKET#1221
xuyao0127 merged 4 commits intomainfrom
dev/karya0/seqpacket

Conversation

@karya0
Copy link
Copy Markdown
Member

@karya0 karya0 commented Oct 10, 2025

Summary by CodeRabbit

  • New Features

    • Full support for UNIX-domain SOCK_SEQPACKET: restoration, reconnection, and frame-preserving buffer draining.
    • Improved routing and domain/type-aware diagnostics for incoming UNIX-domain connections.
    • Separate frame-based handling for seqpacket vs. stream sockets for more reliable processing.
  • Public API

    • Drain initialization and socket-typing interfaces updated to expose seqpacket-aware behavior.
  • Tests

    • Added end-to-end seqpacket test and integrated into automated suite.

@karya0 karya0 requested review from gc00 and xuyao0127 October 10, 2025 08:38
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Oct 10, 2025

Walkthrough

Adds AF_UNIX SOCK_SEQPACKET support: new protected FD enum entry, connection rewirer state for a SEQ listener and pending list, frame-preserving kernel buffer drainer updates (seqpacket-specific reader and per-FD frames), socket type accessors, and a new seqpacket test.

Changes

Cohort / File(s) Summary
Protected FD enum
include/protectedfds.h
Adds PROTECTED_RESTORE_UDS_SEQ_SOCK_FD to ProtectedFds between existing UDS and coord entries.
Connection rewirer (UDS SEQ)
src/plugin/ipc/socket/connectionrewirer.h, src/plugin/ipc/socket/connectionrewirer.cpp
Adds _udsSeqRestoreAddr, _udsSeqRestoreAddrlen, _pendingUDSSeqIncoming; creates/binds/listens an AF_UNIX SOCK_SEQPACKET restore listener, records its addr/len, routes incoming SEQ connections to a pending list, processes pending SEQ reconnects, and closes the new protected FD during destroy.
Kernel buffer drainer (SEQ frames)
src/plugin/ipc/socket/kernelbufferdrainer.h, src/plugin/ipc/socket/kernelbufferdrainer.cpp
Adds JSeqpacketReader, per-FD _isSeqpacket tracking and _drainedFrames; branches on baseType to preserve message boundaries for SOCK_SEQPACKET (MSG_PEEK+read), updates refill and timeout handling to send frames first and echo peer frames, and changes beginDrainOf signature to accept int baseType.
Socket connection API & usage
src/plugin/ipc/socket/socketconnection.h, src/plugin/ipc/socket/socketconnection.cpp
Adds sockType() and baseType() accessors; updates domain/type warning logic to use baseType(); forwards baseType() to KernelBufferDrainer::beginDrainOf.
Tests
test/autotest.py, test/seqpacket.c
Registers a new "seqpacket" test and adds test/seqpacket.c, a SOCK_SEQPACKET AF_UNIX client/server demo (startup sync, messaging loop, cleanup).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant App as App (client/server)
  participant SC as SocketConnection
  participant CR as ConnectionRewirer
  participant KBD as KernelBufferDrainer
  participant PFD as ProtectedFds

  rect rgba(230,240,255,0.5)
  note over CR,PFD: Restore initialization (UDS SEQ listener)
  CR->>PFD: Map `PROTECTED_RESTORE_UDS_SEQ_SOCK_FD`
  CR->>CR: create AF_UNIX SOCK_SEQPACKET listener\nstore _udsSeqRestoreAddr/_addrlen
  end

  rect rgba(240,255,230,0.5)
  note over SC,KBD: Begin drain (frame-based for SEQ)
  SC->>KBD: beginDrainOf(fd, id, baseType=SOCK_SEQPACKET)
  KBD->>KBD: Use JSeqpacketReader (MSG_PEEK + read full frame)\nstore per-FD frames
  KBD-->>SC: send drained frames first
  end

  rect rgba(255,245,230,0.5)
  note over CR,SC: Reconnect handling
  CR->>CR: registerIncoming: route SOCK_SEQPACKET to pending list
  CR->>SC: doReconnect: process _pendingUDSSeqIncoming
  CR->>PFD: Close `PROTECTED_RESTORE_UDS_SEQ_SOCK_FD`
  end

  App-->>SC: Normal I/O resumes (SEQ frames)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

A packet per nibble, neat little stacks,
I thump with delight as frames bounce back.
A listener tucked where paths interweave,
Seqpacket hops and sockets believe.
Tests run, lights green — rabbit-approved weave. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly describes the core enhancement of adding support for SOCK_SEQPACKET and matches the extensive changes introducing sequence-packet socket handling in multiple components.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch dev/karya0/seqpacket

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/plugin/ipc/socket/connectionrewirer.cpp (1)

236-277: UDS stream/seq restore listeners bind to the SAME abstract name (collision)

str and strSeq are identical (the appended sun_path prints empty due to leading NUL), so the second bind will fail with EADDRINUSE. Also, no length check is performed.

Apply this diff to generate distinct, bounded names:

-    ostringstream o;
-    // We should use the original path along with the unique pid and timestamp.
-    // That will ensure that the socket is unique.
-    o << dmtcp_get_uniquepid_str() << "_" << dmtcp_get_coordinator_timestamp() << "_" << _udsRestoreAddr.sun_path;
-    string str = o.str();
+    // Derive a base unique suffix using uniquepid and coordinator timestamp.
+    std::string base = std::string(dmtcp_get_uniquepid_str()) + "_" +
+                       std::to_string(dmtcp_get_coordinator_timestamp());
+    std::string str = base + "_uds";
@@
-    strncpy(&_udsRestoreAddr.sun_path[1], str.c_str(), str.length());
-    _udsRestoreAddrlen = sizeof(sa_family_t) + str.length() + 1;
+    size_t maxlen = sizeof(_udsRestoreAddr.sun_path) - 1; // abstract: skip [0]
+    size_t copylen = std::min(str.size(), maxlen);
+    strncpy(&_udsRestoreAddr.sun_path[1], str.c_str(), copylen);
+    _udsRestoreAddrlen = sizeof(sa_family_t) + copylen + 1;
@@
-    // Use a different abstract path suffix to avoid collision
-    ostringstream o2;
-    // We should use the original path along with the unique pid and timestamp.
-    // That will ensure that the socket is unique.
-    o2 << dmtcp_get_uniquepid_str() << "_" << dmtcp_get_coordinator_timestamp() << "_" << _udsRestoreAddr.sun_path;
-    string strSeq = o2.str();
-    strncpy(&_udsSeqRestoreAddr.sun_path[1], strSeq.c_str(), strSeq.length());
-    _udsSeqRestoreAddrlen = sizeof(sa_family_t) + strSeq.length() + 1;
+    // Use a distinct name for SEQPACKET to avoid collision
+    std::string strSeq = base + "_udsseq";
+    size_t maxlen2 = sizeof(_udsSeqRestoreAddr.sun_path) - 1;
+    size_t copylen2 = std::min(strSeq.size(), maxlen2);
+    strncpy(&_udsSeqRestoreAddr.sun_path[1], strSeq.c_str(), copylen2);
+    _udsSeqRestoreAddrlen = sizeof(sa_family_t) + copylen2 + 1;
🧹 Nitpick comments (5)
src/plugin/ipc/socket/socketconnection.h (1)

64-66: Use a safer baseType mask

Mask out flags explicitly to derive the base type; 077 is brittle across platforms.

Apply this diff:

-    int baseType() const { return _sockType & 077; }
+    int baseType() const { return _sockType & ~(SOCK_CLOEXEC | SOCK_NONBLOCK); }

Optional: there is a duplicate include of sys/types.h on Lines 32-33; consider removing one. [As per coding guidelines]

src/plugin/ipc/socket/kernelbufferdrainer.cpp (4)

29-30: Unused include; mismatch with comment

sys/ioctl.h is not used. Either remove it or switch to using FIONREAD as the comment suggests.

Apply one of:

  • If keeping current approach, drop the include:
-#include <sys/ioctl.h>
  • If switching to FIONREAD, keep the include and update the code (see my other comment on Lines 53-57).

38-40: Stale comment

Comment says “using FIONREAD” but code uses recvmsg(MSG_PEEK|MSG_TRUNC). Update the comment for accuracy.

-// Reader for SOCK_SEQPACKET that preserves message boundaries by reading
-// exactly one frame per readOnce() using FIONREAD to determine size.
+// Reader for SOCK_SEQPACKET that preserves message boundaries by reading
+// exactly one frame per readOnce(). It peeks the next message size using
+// recvmsg(MSG_PEEK | MSG_TRUNC).

262-279: Avoid assignment in condition; set map explicitly

Using assignment in the if condition is easy to misread.

-  if (_isSeqpacket[fd] = (baseType == SOCK_SEQPACKET)) {
+  const bool isSeq = (baseType == SOCK_SEQPACKET);
+  _isSeqpacket[fd] = isSeq;
+  if (isSeq) {
     addDataSocket(new JSeqpacketReader(fd));
   } else {
     addDataSocket(new jalib::JChunkReader(fd, 512));
   }

344-392: Frame replay protocol ignores cmsgs; lengths are fine for AF_UNIX

The REFILL path replays only length-prefixed payloads. If you later support cmsgs, this will need to switch to sendmsg/recvmsg with control data per frame.

Short-term: document that seqpacket drain doesn’t support SCM_RIGHTS/credentials and is disabled/guarded (see earlier comment).
Long-term: extend protocol to carry cmsgs and flags per frame and replay via sendmsg.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 28ebd1e and 8322364.

📒 Files selected for processing (9)
  • include/protectedfds.h (1 hunks)
  • src/plugin/ipc/socket/connectionrewirer.cpp (8 hunks)
  • src/plugin/ipc/socket/connectionrewirer.h (2 hunks)
  • src/plugin/ipc/socket/kernelbufferdrainer.cpp (7 hunks)
  • src/plugin/ipc/socket/kernelbufferdrainer.h (2 hunks)
  • src/plugin/ipc/socket/socketconnection.cpp (3 hunks)
  • src/plugin/ipc/socket/socketconnection.h (1 hunks)
  • test/autotest.py (1 hunks)
  • test/seqpacket.c (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
src/plugin/ipc/socket/kernelbufferdrainer.h (2)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (2)
  • beginDrainOf (262-282)
  • beginDrainOf (263-263)
src/plugin/ipc/ssh/sshdrainer.cpp (2)
  • beginDrainOf (93-106)
  • beginDrainOf (94-94)
src/plugin/ipc/socket/connectionrewirer.cpp (5)
src/dmtcpplugin.cpp (4)
  • dmtcp_close_protected_fd (310-310)
  • dmtcp_get_uniquepid_str (233-233)
  • dmtcp_get_coordinator_timestamp (271-275)
  • dmtcp_get_coordinator_timestamp (271-272)
src/nosyscallsreal.c (4)
  • _real_close (188-192)
  • _real_socket (107-111)
  • _real_bind (121-125)
  • _real_listen (128-132)
src/dmtcpnohijackstubs.cpp (2)
  • dmtcp_get_uniquepid_str (65-72)
  • dmtcp_get_uniquepid_str (66-66)
src/util_misc.cpp (2)
  • changeFd (242-250)
  • changeFd (243-243)
src/plugin/ipc/socket/socketconnlist.cpp (2)
  • registerNSData (216-220)
  • registerNSData (217-217)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (4)
src/plugin/ipc/socket/kernelbufferdrainer.h (1)
  • dmtcp (33-70)
src/plugin/ipc/socket/socketconnection.h (1)
  • dmtcp (40-158)
src/plugin/ipc/socket/connectionmessage.h (1)
  • dmtcp (33-84)
src/plugin/ipc/ssh/sshdrainer.cpp (2)
  • beginDrainOf (93-106)
  • beginDrainOf (94-94)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (10)
test/autotest.py (1)

1023-1024: Add seqpacket test: looks good

Placement after client-server is appropriate; 2-process expectation matches the test program.

src/plugin/ipc/socket/connectionrewirer.h (1)

70-72: UDS SEQ state additions look correct

Separate restore addr/len and pending list for SOCK_SEQPACKET is the right shape.

Also applies to: 82-82

src/plugin/ipc/socket/kernelbufferdrainer.h (2)

61-67: Data structures for SEQPACKET draining

Good separation between stream buffer and per-frame storage; ensure onData flattens frames for getDrainedData() consumers that expect a single vector.


43-43: beginDrainOf signature ripple check passed
All KernelBufferDrainer::beginDrainOf call sites now use the three-argument overload; SSHDrainer’s two-argument API remains unchanged.

src/plugin/ipc/socket/socketconnection.cpp (2)

562-571: Pass baseType to drainer: good

This wires SEQPACKET-aware draining. No concerns.


659-666: Domain/type warning checks at restart: good

Using baseType() for AF_UNIX (STREAM/SEQ) and AF_INET/6 (STREAM) is correct.

src/plugin/ipc/socket/connectionrewirer.cpp (2)

281-305: Route AF_UNIX by base type: good

Dynamic cast + baseType() split between stream and seqpacket is correct.


320-331: Register NS data for SEQPACKET: good

Including the SEQ restore address in NS ensures correct reconnect routing.

include/protectedfds.h (1)

33-33: Confirm PROTECTED_RESTORE_UDS_SEQ_SOCK_FD usage consistency All open (changeFd, markNonBlocking) and close (_real_close) calls for the UDS_SEQ listener use the new enum; no residual ordinal assumptions detected.

src/plugin/ipc/socket/kernelbufferdrainer.cpp (1)

193-206: Drain completion heuristic may hang if peer writes after cookie

For SEQPACKET, you require the last frame to equal the cookie. If the peer writes more frames after the cookie, the cookie won’t be “last” and the drain won’t complete.

  • Confirm higher layers quiesce peers before drain so no app frames arrive after the cookie.
  • If not guaranteed, consider accepting completion when any frame equals the cookie and discarding later frames during refill, or gate drain behind a pause barrier.

Comment on lines +68 to +79
memset(&readMsg, 0, sizeof(readMsg));
readMsg.msg_iov = &iov;
readMsg.msg_iovlen = 1;
ssize_t got = ::recvmsg(fd, &readMsg, 0);
if (got <= 0) {
if (errno != EAGAIN && errno != EINTR) {
_hadError = true;
}
_frame.clear();
return false;
}
_frame.resize((size_t)got);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Ancillary (cmsg) data is silently dropped; AF_UNIX/SEQPACKET often carries SCM_RIGHTS/credentials

recvmsg is called without a control buffer and refill replays only raw bytes. Any SCM_RIGHTS/SCM_CREDENTIALS in the drained messages will be lost, breaking app semantics after restore.

Minimum guard (detect and refuse draining when cmsgs exist):

-      struct msghdr readMsg;
-      memset(&readMsg, 0, sizeof(readMsg));
-      readMsg.msg_iov = &iov;
-      readMsg.msg_iovlen = 1;
-      ssize_t got = ::recvmsg(fd, &readMsg, 0);
+      struct msghdr readMsg;
+      memset(&readMsg, 0, sizeof(readMsg));
+      readMsg.msg_iov = &iov;
+      readMsg.msg_iovlen = 1;
+      // Provide a small control buffer to detect presence of cmsgs
+      char cbuf[256];
+      readMsg.msg_control = cbuf;
+      readMsg.msg_controllen = sizeof(cbuf);
+      ssize_t got = ::recvmsg(fd, &readMsg, 0);
+      if (got > 0) {
+        // If control data present or truncated, abort (not supported yet)
+        if ((readMsg.msg_flags & MSG_CTRUNC) || readMsg.msg_controllen > 0) {
+          _hadError = true;
+          _frame.clear();
+          errno = ENOTSUP;
+          return false;
+        }
+      }

Preferred full fix (follow-up): extend drained storage to preserve per-message cmsgs and replay via sendmsg. If that’s out of scope, explicitly mark such sockets as “protected” and skip draining. Please confirm which path you intend.


🏁 Script executed:

#!/bin/bash
# Print lines 60–90 around the recvmsg call for inspection
sed -n '60,90p' src/plugin/ipc/socket/kernelbufferdrainer.cpp

Length of output: 852


🏁 Script executed:

#!/bin/bash
# Search for references to KernelBufferDrainer in the codebase
rg -n "KernelBufferDrainer" -C2

Length of output: 6635


Handle ancillary data in recvmsg to avoid silent drop

Current code calls recvmsg without setting msg_control, so any SCM_RIGHTS/SCM_CREDENTIALS are dropped, breaking AF_UNIX/SEQPACKET semantics.

• Minimum guard: supply a small control buffer and on MSG_CTRUNC or non-zero msg_controllen, set _hadError, clear _frame, errno = ENOTSUP, and return false.
• Preferred full fix: extend drained storage to preserve per-message cmsgs and replay via sendmsg (or mark such sockets protected and skip draining).

   struct msghdr readMsg;
   memset(&readMsg, 0, sizeof(readMsg));
   readMsg.msg_iov = &iov;
   readMsg.msg_iovlen = 1;
+  char cbuf[256];
+  readMsg.msg_control = cbuf;
+  readMsg.msg_controllen = sizeof(cbuf);
   ssize_t got = ::recvmsg(fd, &readMsg, 0);
+  if (got > 0 && ((readMsg.msg_flags & MSG_CTRUNC) || readMsg.msg_controllen > 0)) {
+    _hadError = true;
+    _frame.clear();
+    errno = ENOTSUP;
+    return false;
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
memset(&readMsg, 0, sizeof(readMsg));
readMsg.msg_iov = &iov;
readMsg.msg_iovlen = 1;
ssize_t got = ::recvmsg(fd, &readMsg, 0);
if (got <= 0) {
if (errno != EAGAIN && errno != EINTR) {
_hadError = true;
}
_frame.clear();
return false;
}
_frame.resize((size_t)got);
memset(&readMsg, 0, sizeof(readMsg));
readMsg.msg_iov = &iov;
readMsg.msg_iovlen = 1;
char cbuf[256];
readMsg.msg_control = cbuf;
readMsg.msg_controllen = sizeof(cbuf);
ssize_t got = ::recvmsg(fd, &readMsg, 0);
if (got > 0 && ((readMsg.msg_flags & MSG_CTRUNC) || readMsg.msg_controllen > 0)) {
_hadError = true;
_frame.clear();
errno = ENOTSUP;
return false;
}
if (got <= 0) {
if (errno != EAGAIN && errno != EINTR) {
_hadError = true;
}
_frame.clear();
return false;
}
_frame.resize((size_t)got);
🤖 Prompt for AI Agents
In src/plugin/ipc/socket/kernelbufferdrainer.cpp around lines 68 to 79, recvmsg
is called without supplying a control buffer so any ancillary data
(SCM_RIGHTS/SCM_CREDENTIALS) is silently dropped; add a small stack control
buffer (e.g. a cmsghdr-sized array) and set readMsg.msg_control and
readMsg.msg_controllen before calling recvmsg, then after recvmsg check if
(readMsg.msg_flags & MSG_CTRUNC) || readMsg.msg_controllen != 0 and if so set
_hadError = true, _frame.clear(), set errno = ENOTSUP and return false as the
minimal guard; optionally note the preferred full fix is to capture and preserve
per-message cmsgs for replay via sendmsg or mark such sockets as protected and
avoid draining.

Comment on lines +1 to +175
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <assert.h>
#define SOCKET_NAME "/tmp/seqpacket_sock"
#define MAX_MSG_SIZE 256

// Use a pipe to signal the client to start.
int pipefd[2];
int start = 0;

void error(const char *msg) {
perror(msg);
exit(EXIT_FAILURE);
}

int server(void)
{
struct sockaddr_un name;
int connection_socket, data_socket;
char buffer[MAX_MSG_SIZE];
int ret;

/* 1. Create socket */
connection_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if (connection_socket == -1) {
error("socket");
}

/* Remove existing socket file if it exists */
unlink(SOCKET_NAME);

/* 2. Bind socket to a name (file path) */
memset(&name, 0, sizeof(name));
name.sun_family = AF_UNIX;
strncpy(name.sun_path, SOCKET_NAME, sizeof(name.sun_path) - 1);

ret = bind(connection_socket, (const struct sockaddr *) &name, sizeof(name));
if (ret == -1) {
error("bind");
}

close(pipefd[0]);
assert(write(pipefd[1], &start, 1) == 1);

printf("Server: Listening at %s...\n", SOCKET_NAME);

/* 3. Prepare for accepting connections (backlog size 5) */
ret = listen(connection_socket, 5);
if (ret == -1) {
error("listen");
}

/* 4. Accept connection and receive message */
printf("Server: Waiting for connection...\n");

/* Accept a connection */
data_socket = accept(connection_socket, NULL, NULL);
if (data_socket == -1) {
error("accept");
}

printf("Server: Client connected. Receiving data...\n");

/* Main loop to receive and send messages */
for (;;) {
/* Receive a message */
memset(buffer, 0, sizeof(buffer));
ret = recv(data_socket, buffer, MAX_MSG_SIZE, 0);
if (ret == -1) {
error("Server: recv");
} else if (ret == 0) {
printf("Server: Client closed connection.\n");
break;
} else {
printf("Server: Received message (%d bytes): '%s'\n", ret, buffer);
}

// sleep 1 second
sleep(1);
/* Send a response message */
const char *response = "Server received message.";
ret = send(data_socket, response, strlen(response) + 1, 0);
if (ret == -1) {
error("Server: send");
} else {
printf("Server: Sent response: '%s'\n", response);
}
}

/* Close the connection socket (unreachable in this infinite loop) */
close(connection_socket);
unlink(SOCKET_NAME);
return 0;
}

int client(void)
{
struct sockaddr_un name;
int data_socket;
char buffer[MAX_MSG_SIZE];
int ret;

/* 1. Create socket */
data_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if (data_socket == -1) {
error("socket");
}

/* 2. Connect to server socket */
memset(&name, 0, sizeof(name));
name.sun_family = AF_UNIX;
strncpy(name.sun_path, SOCKET_NAME, sizeof(name.sun_path) - 1);

// Wait for the server to start.
assert(read(pipefd[0], &start, 1) == 1);
close(pipefd[0]);
close(pipefd[1]);

printf("Client: Connecting to server at %s...\n", SOCKET_NAME);
ret = connect(data_socket, (const struct sockaddr *) &name, sizeof(name));
if (ret == -1) {
error("Client: connect");
}

printf("Client: Connected. Sending messages...\n");

/* 3. Send and receive messages in a loop */
int count = 0;
while (1) {
char msg[MAX_MSG_SIZE];
sprintf(msg, "This is message %d.", count);
ret = send(data_socket, msg, strlen(msg) + 1, 0);
if (ret == -1) {
error("Client: send");
}
printf("Client: Sent message (%d bytes).\n", ret);

/* 5. Wait for server response */
memset(buffer, 0, sizeof(buffer));
ret = recv(data_socket, buffer, MAX_MSG_SIZE, 0);
if (ret == -1) {
error("Client: recv response");
} else if (ret == 0) {
printf("Client: Server closed connection.\n");
} else {
printf("Client: Received response (%d bytes): '%s'\n", ret, buffer);
}
count++;
}

/* 6. Close the socket */
close(data_socket);
printf("Client: Connection closed.\n");

return 0;
}

int main(void)
{
assert(pipe(pipefd) == 0);
pid_t pid = fork();
assert(pid != -1);
if (pid == 0) {
printf("Server: Starting server\n");
server();
} else {
printf("Client: Starting client\n");
client();
}
return 0;
} No newline at end of file
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid address collisions across parallel runs

Using a fixed "/tmp/seqpacket_sock" will fail when tests run in parallel (bind EADDRINUSE). Make the path unique per test instance (child PID) and compute it in both processes.

Apply this diff:

-#define SOCKET_NAME "/tmp/seqpacket_sock"
+static char SOCKET_NAME[108] = "/tmp/seqpacket_sock";
+static void make_socket_name_from_child(pid_t child_pid) {
+    snprintf(SOCKET_NAME, sizeof(SOCKET_NAME), "/tmp/seqpacket_sock_%ld", (long)child_pid);
+}
@@
-int server(void)
+int server(void)
 {
@@
-    /* Remove existing socket file if it exists */
-    unlink(SOCKET_NAME);
+    /* Remove existing socket file if it exists */
+    unlink(SOCKET_NAME);
@@
-int client(void)
+int client(void)
 {
@@
-int main(void)
+int main(void)
 {
     assert(pipe(pipefd) == 0);
     pid_t pid = fork();
     assert(pid != -1);
     if (pid == 0) {
+        /* Child is server: compute name from its own PID */
+        make_socket_name_from_child(getpid());
         printf("Server: Starting server\n");
         server();
     } else {
+        /* Parent is client: compute name from child's PID */
+        make_socket_name_from_child(pid);
         printf("Client: Starting client\n");
         client();
     }
     return 0;
 }

Optional: consider abstract UDS (sun_path[0] = '\0') to avoid FS cleanup entirely.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai generate a commitable diff

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karya0, here's the committable diff:

diff --git a/test/seqpacket.c b/test/seqpacket.c
index 1234567..abcdefg 100644
--- a/test/seqpacket.c
+++ b/test/seqpacket.c
@@ -5,7 +5,13 @@
 #include <sys/un.h>
 #include <unistd.h>
 #include <assert.h>
-#define SOCKET_NAME "/tmp/seqpacket_sock"
+
+static char SOCKET_NAME[108];
+
+static void make_socket_name_from_child(pid_t child_pid) {
+    snprintf(SOCKET_NAME, sizeof(SOCKET_NAME), "/tmp/seqpacket_sock_%ld", (long)child_pid);
+}
+
 #define MAX_MSG_SIZE 256
 
 // Use a pipe to signal the client to start.
@@ -160,10 +166,14 @@ int main(void)
     assert(pipe(pipefd) == 0);
     pid_t pid = fork();
     assert(pid != -1);
     if (pid == 0) {
+        /* Child is server: compute name from its own PID */
+        make_socket_name_from_child(getpid());
         printf("Server: Starting server\n");
         server();
     } else {
+        /* Parent is client: compute name from child's PID */
+        make_socket_name_from_child(pid);
         printf("Client: Starting client\n");
         client();
     }

This change ensures that:

  • Each test run uses a unique socket path based on the server process PID
  • Parallel test executions won't collide
  • Both server and client compute the same path using the child (server) PID

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai please generate a commitable suggestion

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karya0, I've already provided a committable diff in my previous response that you can apply to your branch. The diff shows all the necessary changes to make the socket path unique per test instance.

However, I cannot generate GitHub suggestion blocks (the kind that can be committed with a single click through the GitHub UI) as they are not part of my response format.

To apply the changes, you can:

  1. Copy the diff content from my previous message
  2. Save it to a file (e.g., fix.patch)
  3. Apply it using git:
    git apply fix.patch

Or simply make the changes manually by:

  • Replacing the #define SOCKET_NAME with the static char declaration and helper function
  • Adding the calls to make_socket_name_from_child() in the main() function for both parent and child processes

The changes are straightforward and will prevent socket path collisions when tests run in parallel.

karya0 and others added 2 commits October 10, 2025 02:02
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (1)

68-83: Ancillary data handling still missing (previously flagged).

The previous critical concern about silently dropping ancillary data (SCM_RIGHTS/SCM_CREDENTIALS) remains unaddressed. AF_UNIX/SEQPACKET sockets commonly carry file descriptors or credentials, and dropping them breaks application semantics after restore.

As previously suggested:

  • Minimum guard: Supply a control buffer, and if MSG_CTRUNC is set or msg_controllen > 0, set _hadError = true, clear _frame, set errno = ENOTSUP, and return false.
  • Preferred fix: Extend drained storage to preserve per-message cmsgs and replay via sendmsg (or mark such sockets as protected and skip draining).

Apply this diff for the minimum guard:

   struct msghdr readMsg;
   memset(&readMsg, 0, sizeof(readMsg));
   readMsg.msg_iov = &iov;
   readMsg.msg_iovlen = 1;
+  char cbuf[256];
+  readMsg.msg_control = cbuf;
+  readMsg.msg_controllen = sizeof(cbuf);
   ssize_t got = ::recvmsg(fd, &readMsg, 0);
+  if (got > 0 && ((readMsg.msg_flags & MSG_CTRUNC) || readMsg.msg_controllen > 0)) {
+    _hadError = true;
+    _frame.clear();
+    errno = ENOTSUP;
+    return false;
+  }
🧹 Nitpick comments (2)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (2)

278-282: Consider clarifying assignment-in-condition.

Line 278 uses assignment within the condition (if (_isSeqpacket[fd] = ...)). While functionally correct, splitting into separate statements improves readability:

- if (_isSeqpacket[fd] = (baseType == SOCK_SEQPACKET)) {
+ _isSeqpacket[fd] = (baseType == SOCK_SEQPACKET);
+ if (_isSeqpacket[fd]) {

289-396: Consider refactoring for clarity and adding bounds validation.

The refill logic uses four separate loops with skip conditions, which is correct but complex. Consider the following improvements:

  1. Complexity: Extract stream and seqpacket refill into separate helper methods to improve readability.

  2. Bounds validation (lines 362-367, 384-394):

    • Frames use uint32_t for length. Consider validating that frame sizes fit within UINT32_MAX before casting.
    • When receiving peer frames, validate len32 is reasonable before allocating buffers to prevent potential DoS from malicious/corrupted peers.

Example bounds check for receiving frames:

   for (int c = 0; c < count; ++c) {
     uint32_t len32 = 0;
     sock.readAll((char *)&len32, sizeof(len32));
+    // Sanity check: reject unreasonably large frames
+    const uint32_t MAX_FRAME_SIZE = 1024 * 1024 * 16; // 16MB limit
+    JASSERT(len32 <= MAX_FRAME_SIZE) (len32) (MAX_FRAME_SIZE)
+      .Text("Peer sent excessively large frame");
     if (len32 > 0) {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8322364 and dce8449.

📒 Files selected for processing (1)
  • src/plugin/ipc/socket/kernelbufferdrainer.cpp (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (1)
src/plugin/ipc/ssh/sshdrainer.cpp (2)
  • beginDrainOf (93-106)
  • beginDrainOf (94-94)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (2)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (2)

53-60: LGTM! Portability issue resolved.

The previous concern about using zero-length iovec with MSG_PEEK|MSG_TRUNC has been addressed. The code now provides a 1-byte buffer, ensuring portable behavior across different Unix implementations.


143-163: LGTM!

The dual-path handling for seqpacket vs. stream sockets is correct. Seqpacket frames are stored individually in _drainedFrames, preserving message boundaries, while stream data is appended to _drainedData.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
src/plugin/ipc/socket/socketconnection.cpp (2)

437-440: Past review comment remains unaddressed.

The previous review recommended using baseType() instead of directly comparing _sockType to handle sockets created with SOCK_CLOEXEC or SOCK_NONBLOCK flags.

Based on past review comments, apply this diff:

   if (!(_sockDomain == AF_INET || _sockDomain == AF_INET6) ||
-      _sockType != SOCK_STREAM) {
+      baseType() != SOCK_STREAM) {
     return;
   }

484-486: Past review comment remains unaddressed.

The previous review recommended using baseType() instead of directly comparing _sockType to handle sockets created with SOCK_CLOEXEC or SOCK_NONBLOCK flags.

Based on past review comments, apply this diff:

   if (!(_sockDomain == AF_INET || _sockDomain == AF_INET6) ||
-      _sockType != SOCK_STREAM) {
+      baseType() != SOCK_STREAM) {
     return;
   }
🧹 Nitpick comments (1)
src/plugin/ipc/socket/socketconnection.cpp (1)

659-667: Consider flattening the unnecessary block scope.

The domain-specific socket type validation using baseType() is correct and consistent with the constructor implementation.

The extra block scope at line 659 adds nesting without providing value. Consider flattening it:

-    {
-      if (_sockDomain == AF_UNIX) {
-        JWARNING(baseType() == SOCK_STREAM || baseType() == SOCK_SEQPACKET)
-        (id()) (_sockDomain) (_sockType) (_sockProtocol);
-      } else if (_sockDomain == AF_INET || _sockDomain == AF_INET6) {
-        JWARNING(baseType() == SOCK_STREAM)
-        (id()) (_sockDomain) (_sockType) (_sockProtocol);
-      }
-    }
+    if (_sockDomain == AF_UNIX) {
+      JWARNING(baseType() == SOCK_STREAM || baseType() == SOCK_SEQPACKET)
+      (id()) (_sockDomain) (_sockType) (_sockProtocol);
+    } else if (_sockDomain == AF_INET || _sockDomain == AF_INET6) {
+      JWARNING(baseType() == SOCK_STREAM)
+      (id()) (_sockDomain) (_sockType) (_sockProtocol);
+    }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dce8449 and 6b9f88a.

📒 Files selected for processing (1)
  • src/plugin/ipc/socket/socketconnection.cpp (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (2)
src/plugin/ipc/socket/socketconnection.cpp (2)

239-247: LGTM!

The domain-specific socket type validation using baseType() is correctly implemented. AF_UNIX now properly supports both SOCK_STREAM and SOCK_SEQPACKET, while AF_INET/AF_INET6 remain restricted to SOCK_STREAM.


571-571: LGTM!

Correctly passes baseType() to beginDrainOf, aligning with the updated KernelBufferDrainer API for SOCK_SEQPACKET support.

@xuyao0127
Copy link
Copy Markdown
Collaborator

xuyao0127 commented Oct 10, 2025

EDIT: I realized that I only run the client with DMTCP instead of both server and client. This is not an issue.

@karya0 Hi Kapil, I used Gene's SOCK_SEQPACKET program to test this PR. I am seeing this assertion failure in the resume event. Here are the JASSERT message:

[2025-10-10T16:14:19.537, 40000, 40002, Error] at connectionmessage.h:69 in assertValid; REASON='JASSERT(strcmp(sign, HANDSHAKE_SIGNATURE_MSG) == 0) failed; (sign = "");Message: read invalid message, signature mismatch. (External socket?)    client: Terminating...
    Backtrace:
        1 _ZN16jassert_internal7JAssertD2Ev in /home/yao/dmtcp/lib/dmtcp/libdmtcp.so 0x7ffff7ebc350
        2 _ZN5dmtcp7ConnMsg11assertValidENS0_7MsgTypeE in /home/yao/dmtcp/lib/dmtcp/libdmtcp_ipc.so 0x7ffff7f830ba
        3 _ZN5dmtcp19KernelBufferDrainer16refillAllSocketsEv in /home/yao/dmtcp/lib/dmtcp/libdmtcp_ipc.so 0x7ffff7f7f8e0
        4 _Z30dmtcp_SocketConnList_EventHook11eDmtcpEventP17_DmtcpEventData_t in /home/yao/dmtcp/lib/dmtcp/libdmtcp_ipc.so 0x7ffff7f8e85c
        5 _ZN5dmtcp13PluginManager9eventHookE11eDmtcpEventP17_DmtcpEventData_t in /home/yao/dmtcp/lib/dmtcp/libdmtcp.so 0x7ffff7e7f3c9
        6 _ZN5dmtcp11DmtcpWorker14postCheckpointEv in /home/yao/dmtcp/lib/dmtcp/libdmtcp.so 0x7ffff7e74570
        7  in /home/yao/dmtcp/lib/dmtcp/libdmtcp.so 0x7ffff7e89f16
        8  in /home/yao/dmtcp/lib/dmtcp/libdmtcp.so 0x7ffff7e8c25c
        9  in /lib/x86_64-linux-gnu/libc.so.6 0x7ffff7c9caa4
        10  in /lib/x86_64-linux-gnu/libc.so.6 0x7ffff7d29c6c

Here's the backtrace, I omitted frame 0~12 because they are JASSERT printing information:

#13 0x00007ffff7f830ba in dmtcp::ConnMsg::assertValid (
    this=this@entry=0x7ffff09fe0d0, t=t@entry=dmtcp::ConnMsg::REFILL)
    at ipc/socket/connectionmessage.h:69
#14 0x00007ffff7f7f8e0 in dmtcp::KernelBufferDrainer::refillAllSockets (
    this=0x7ffff680ec08) at ipc/socket/kernelbufferdrainer.cpp:381
#15 0x00007ffff7f8e85c in dmtcp::SocketConnList::refill (isRestart=false,
    this=0x7ffff6805408) at ipc/socket/socketconnlist.cpp:233
#16 dmtcp::SocketConnList::resumeRefill () at ipc/socket/socketconnlist.h:37
#17 dmtcp_SocketConnList_EventHook (event=<optimized out>, data=0x0)
    at ipc/socket/socketconnlist.cpp:64
#18 0x00007ffff7e7f3c9 in dmtcp::PluginManager::eventHook (
    event=event@entry=DMTCP_EVENT_RESUME, data=data@entry=0x0)
    at pluginmanager.cpp:135
#19 0x00007ffff7e74570 in dmtcp::DmtcpWorker::postCheckpoint ()
    at dmtcpworker.cpp:505
#20 0x00007ffff7e89f16 in checkpointhread (dummy=<optimized out>)
    at threadlist.cpp:433
#21 0x00007ffff7e8c25c in thread_start (arg=0x7ffff6825008) at threadwrappers.cpp:77
#22 0x00007ffff7c9caa4 in start_thread (arg=<optimized out>)
    at ./nptl/pthread_create.c:447
#23 0x00007ffff7d29c6c in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:78

@gc00
Copy link
Copy Markdown
Contributor

gc00 commented Oct 11, 2025

@karya0 had written elsewhere:

I have a weird reconnect issue on restart that's taking longer than expected. If I can't resolve it in the next few hours, I'll create a branch and push it as is to see if @xuyao0127 or @gc00 can pick it up.

@xuyao0127 , Did you have a chance to look at this issue?

@xuyao0127
Copy link
Copy Markdown
Collaborator

@gc00 Where is this comment? I couldn't find it in this PR.

Copy link
Copy Markdown
Collaborator

@xuyao0127 xuyao0127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xuyao0127 xuyao0127 merged commit 88d7de4 into main Oct 30, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants