Conversation
WalkthroughAdds AF_UNIX SOCK_SEQPACKET support: new protected FD enum entry, connection rewirer state for a SEQ listener and pending list, frame-preserving kernel buffer drainer updates (seqpacket-specific reader and per-FD frames), socket type accessors, and a new seqpacket test. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App as App (client/server)
participant SC as SocketConnection
participant CR as ConnectionRewirer
participant KBD as KernelBufferDrainer
participant PFD as ProtectedFds
rect rgba(230,240,255,0.5)
note over CR,PFD: Restore initialization (UDS SEQ listener)
CR->>PFD: Map `PROTECTED_RESTORE_UDS_SEQ_SOCK_FD`
CR->>CR: create AF_UNIX SOCK_SEQPACKET listener\nstore _udsSeqRestoreAddr/_addrlen
end
rect rgba(240,255,230,0.5)
note over SC,KBD: Begin drain (frame-based for SEQ)
SC->>KBD: beginDrainOf(fd, id, baseType=SOCK_SEQPACKET)
KBD->>KBD: Use JSeqpacketReader (MSG_PEEK + read full frame)\nstore per-FD frames
KBD-->>SC: send drained frames first
end
rect rgba(255,245,230,0.5)
note over CR,SC: Reconnect handling
CR->>CR: registerIncoming: route SOCK_SEQPACKET to pending list
CR->>SC: doReconnect: process _pendingUDSSeqIncoming
CR->>PFD: Close `PROTECTED_RESTORE_UDS_SEQ_SOCK_FD`
end
App-->>SC: Normal I/O resumes (SEQ frames)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/plugin/ipc/socket/connectionrewirer.cpp (1)
236-277: UDS stream/seq restore listeners bind to the SAME abstract name (collision)str and strSeq are identical (the appended sun_path prints empty due to leading NUL), so the second bind will fail with EADDRINUSE. Also, no length check is performed.
Apply this diff to generate distinct, bounded names:
- ostringstream o; - // We should use the original path along with the unique pid and timestamp. - // That will ensure that the socket is unique. - o << dmtcp_get_uniquepid_str() << "_" << dmtcp_get_coordinator_timestamp() << "_" << _udsRestoreAddr.sun_path; - string str = o.str(); + // Derive a base unique suffix using uniquepid and coordinator timestamp. + std::string base = std::string(dmtcp_get_uniquepid_str()) + "_" + + std::to_string(dmtcp_get_coordinator_timestamp()); + std::string str = base + "_uds"; @@ - strncpy(&_udsRestoreAddr.sun_path[1], str.c_str(), str.length()); - _udsRestoreAddrlen = sizeof(sa_family_t) + str.length() + 1; + size_t maxlen = sizeof(_udsRestoreAddr.sun_path) - 1; // abstract: skip [0] + size_t copylen = std::min(str.size(), maxlen); + strncpy(&_udsRestoreAddr.sun_path[1], str.c_str(), copylen); + _udsRestoreAddrlen = sizeof(sa_family_t) + copylen + 1; @@ - // Use a different abstract path suffix to avoid collision - ostringstream o2; - // We should use the original path along with the unique pid and timestamp. - // That will ensure that the socket is unique. - o2 << dmtcp_get_uniquepid_str() << "_" << dmtcp_get_coordinator_timestamp() << "_" << _udsRestoreAddr.sun_path; - string strSeq = o2.str(); - strncpy(&_udsSeqRestoreAddr.sun_path[1], strSeq.c_str(), strSeq.length()); - _udsSeqRestoreAddrlen = sizeof(sa_family_t) + strSeq.length() + 1; + // Use a distinct name for SEQPACKET to avoid collision + std::string strSeq = base + "_udsseq"; + size_t maxlen2 = sizeof(_udsSeqRestoreAddr.sun_path) - 1; + size_t copylen2 = std::min(strSeq.size(), maxlen2); + strncpy(&_udsSeqRestoreAddr.sun_path[1], strSeq.c_str(), copylen2); + _udsSeqRestoreAddrlen = sizeof(sa_family_t) + copylen2 + 1;
🧹 Nitpick comments (5)
src/plugin/ipc/socket/socketconnection.h (1)
64-66: Use a safer baseType maskMask out flags explicitly to derive the base type; 077 is brittle across platforms.
Apply this diff:
- int baseType() const { return _sockType & 077; } + int baseType() const { return _sockType & ~(SOCK_CLOEXEC | SOCK_NONBLOCK); }Optional: there is a duplicate include of sys/types.h on Lines 32-33; consider removing one. [As per coding guidelines]
src/plugin/ipc/socket/kernelbufferdrainer.cpp (4)
29-30: Unused include; mismatch with commentsys/ioctl.h is not used. Either remove it or switch to using FIONREAD as the comment suggests.
Apply one of:
- If keeping current approach, drop the include:
-#include <sys/ioctl.h>
- If switching to FIONREAD, keep the include and update the code (see my other comment on Lines 53-57).
38-40: Stale commentComment says “using FIONREAD” but code uses recvmsg(MSG_PEEK|MSG_TRUNC). Update the comment for accuracy.
-// Reader for SOCK_SEQPACKET that preserves message boundaries by reading -// exactly one frame per readOnce() using FIONREAD to determine size. +// Reader for SOCK_SEQPACKET that preserves message boundaries by reading +// exactly one frame per readOnce(). It peeks the next message size using +// recvmsg(MSG_PEEK | MSG_TRUNC).
262-279: Avoid assignment in condition; set map explicitlyUsing assignment in the if condition is easy to misread.
- if (_isSeqpacket[fd] = (baseType == SOCK_SEQPACKET)) { + const bool isSeq = (baseType == SOCK_SEQPACKET); + _isSeqpacket[fd] = isSeq; + if (isSeq) { addDataSocket(new JSeqpacketReader(fd)); } else { addDataSocket(new jalib::JChunkReader(fd, 512)); }
344-392: Frame replay protocol ignores cmsgs; lengths are fine for AF_UNIXThe REFILL path replays only length-prefixed payloads. If you later support cmsgs, this will need to switch to sendmsg/recvmsg with control data per frame.
Short-term: document that seqpacket drain doesn’t support SCM_RIGHTS/credentials and is disabled/guarded (see earlier comment).
Long-term: extend protocol to carry cmsgs and flags per frame and replay via sendmsg.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
include/protectedfds.h(1 hunks)src/plugin/ipc/socket/connectionrewirer.cpp(8 hunks)src/plugin/ipc/socket/connectionrewirer.h(2 hunks)src/plugin/ipc/socket/kernelbufferdrainer.cpp(7 hunks)src/plugin/ipc/socket/kernelbufferdrainer.h(2 hunks)src/plugin/ipc/socket/socketconnection.cpp(3 hunks)src/plugin/ipc/socket/socketconnection.h(1 hunks)test/autotest.py(1 hunks)test/seqpacket.c(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
src/plugin/ipc/socket/kernelbufferdrainer.h (2)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (2)
beginDrainOf(262-282)beginDrainOf(263-263)src/plugin/ipc/ssh/sshdrainer.cpp (2)
beginDrainOf(93-106)beginDrainOf(94-94)
src/plugin/ipc/socket/connectionrewirer.cpp (5)
src/dmtcpplugin.cpp (4)
dmtcp_close_protected_fd(310-310)dmtcp_get_uniquepid_str(233-233)dmtcp_get_coordinator_timestamp(271-275)dmtcp_get_coordinator_timestamp(271-272)src/nosyscallsreal.c (4)
_real_close(188-192)_real_socket(107-111)_real_bind(121-125)_real_listen(128-132)src/dmtcpnohijackstubs.cpp (2)
dmtcp_get_uniquepid_str(65-72)dmtcp_get_uniquepid_str(66-66)src/util_misc.cpp (2)
changeFd(242-250)changeFd(243-243)src/plugin/ipc/socket/socketconnlist.cpp (2)
registerNSData(216-220)registerNSData(217-217)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (4)
src/plugin/ipc/socket/kernelbufferdrainer.h (1)
dmtcp(33-70)src/plugin/ipc/socket/socketconnection.h (1)
dmtcp(40-158)src/plugin/ipc/socket/connectionmessage.h (1)
dmtcp(33-84)src/plugin/ipc/ssh/sshdrainer.cpp (2)
beginDrainOf(93-106)beginDrainOf(94-94)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (10)
test/autotest.py (1)
1023-1024: Add seqpacket test: looks goodPlacement after client-server is appropriate; 2-process expectation matches the test program.
src/plugin/ipc/socket/connectionrewirer.h (1)
70-72: UDS SEQ state additions look correctSeparate restore addr/len and pending list for SOCK_SEQPACKET is the right shape.
Also applies to: 82-82
src/plugin/ipc/socket/kernelbufferdrainer.h (2)
61-67: Data structures for SEQPACKET drainingGood separation between stream buffer and per-frame storage; ensure onData flattens frames for getDrainedData() consumers that expect a single vector.
43-43: beginDrainOf signature ripple check passed
All KernelBufferDrainer::beginDrainOf call sites now use the three-argument overload; SSHDrainer’s two-argument API remains unchanged.src/plugin/ipc/socket/socketconnection.cpp (2)
562-571: Pass baseType to drainer: goodThis wires SEQPACKET-aware draining. No concerns.
659-666: Domain/type warning checks at restart: goodUsing baseType() for AF_UNIX (STREAM/SEQ) and AF_INET/6 (STREAM) is correct.
src/plugin/ipc/socket/connectionrewirer.cpp (2)
281-305: Route AF_UNIX by base type: goodDynamic cast + baseType() split between stream and seqpacket is correct.
320-331: Register NS data for SEQPACKET: goodIncluding the SEQ restore address in NS ensures correct reconnect routing.
include/protectedfds.h (1)
33-33: Confirm PROTECTED_RESTORE_UDS_SEQ_SOCK_FD usage consistency All open (changeFd, markNonBlocking) and close (_real_close) calls for the UDS_SEQ listener use the new enum; no residual ordinal assumptions detected.src/plugin/ipc/socket/kernelbufferdrainer.cpp (1)
193-206: Drain completion heuristic may hang if peer writes after cookieFor SEQPACKET, you require the last frame to equal the cookie. If the peer writes more frames after the cookie, the cookie won’t be “last” and the drain won’t complete.
- Confirm higher layers quiesce peers before drain so no app frames arrive after the cookie.
- If not guaranteed, consider accepting completion when any frame equals the cookie and discarding later frames during refill, or gate drain behind a pause barrier.
| memset(&readMsg, 0, sizeof(readMsg)); | ||
| readMsg.msg_iov = &iov; | ||
| readMsg.msg_iovlen = 1; | ||
| ssize_t got = ::recvmsg(fd, &readMsg, 0); | ||
| if (got <= 0) { | ||
| if (errno != EAGAIN && errno != EINTR) { | ||
| _hadError = true; | ||
| } | ||
| _frame.clear(); | ||
| return false; | ||
| } | ||
| _frame.resize((size_t)got); |
There was a problem hiding this comment.
🧩 Analysis chain
Ancillary (cmsg) data is silently dropped; AF_UNIX/SEQPACKET often carries SCM_RIGHTS/credentials
recvmsg is called without a control buffer and refill replays only raw bytes. Any SCM_RIGHTS/SCM_CREDENTIALS in the drained messages will be lost, breaking app semantics after restore.
Minimum guard (detect and refuse draining when cmsgs exist):
- struct msghdr readMsg;
- memset(&readMsg, 0, sizeof(readMsg));
- readMsg.msg_iov = &iov;
- readMsg.msg_iovlen = 1;
- ssize_t got = ::recvmsg(fd, &readMsg, 0);
+ struct msghdr readMsg;
+ memset(&readMsg, 0, sizeof(readMsg));
+ readMsg.msg_iov = &iov;
+ readMsg.msg_iovlen = 1;
+ // Provide a small control buffer to detect presence of cmsgs
+ char cbuf[256];
+ readMsg.msg_control = cbuf;
+ readMsg.msg_controllen = sizeof(cbuf);
+ ssize_t got = ::recvmsg(fd, &readMsg, 0);
+ if (got > 0) {
+ // If control data present or truncated, abort (not supported yet)
+ if ((readMsg.msg_flags & MSG_CTRUNC) || readMsg.msg_controllen > 0) {
+ _hadError = true;
+ _frame.clear();
+ errno = ENOTSUP;
+ return false;
+ }
+ }Preferred full fix (follow-up): extend drained storage to preserve per-message cmsgs and replay via sendmsg. If that’s out of scope, explicitly mark such sockets as “protected” and skip draining. Please confirm which path you intend.
🏁 Script executed:
#!/bin/bash
# Print lines 60–90 around the recvmsg call for inspection
sed -n '60,90p' src/plugin/ipc/socket/kernelbufferdrainer.cppLength of output: 852
🏁 Script executed:
#!/bin/bash
# Search for references to KernelBufferDrainer in the codebase
rg -n "KernelBufferDrainer" -C2Length of output: 6635
Handle ancillary data in recvmsg to avoid silent drop
Current code calls recvmsg without setting msg_control, so any SCM_RIGHTS/SCM_CREDENTIALS are dropped, breaking AF_UNIX/SEQPACKET semantics.
• Minimum guard: supply a small control buffer and on MSG_CTRUNC or non-zero msg_controllen, set _hadError, clear _frame, errno = ENOTSUP, and return false.
• Preferred full fix: extend drained storage to preserve per-message cmsgs and replay via sendmsg (or mark such sockets protected and skip draining).
struct msghdr readMsg;
memset(&readMsg, 0, sizeof(readMsg));
readMsg.msg_iov = &iov;
readMsg.msg_iovlen = 1;
+ char cbuf[256];
+ readMsg.msg_control = cbuf;
+ readMsg.msg_controllen = sizeof(cbuf);
ssize_t got = ::recvmsg(fd, &readMsg, 0);
+ if (got > 0 && ((readMsg.msg_flags & MSG_CTRUNC) || readMsg.msg_controllen > 0)) {
+ _hadError = true;
+ _frame.clear();
+ errno = ENOTSUP;
+ return false;
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| memset(&readMsg, 0, sizeof(readMsg)); | |
| readMsg.msg_iov = &iov; | |
| readMsg.msg_iovlen = 1; | |
| ssize_t got = ::recvmsg(fd, &readMsg, 0); | |
| if (got <= 0) { | |
| if (errno != EAGAIN && errno != EINTR) { | |
| _hadError = true; | |
| } | |
| _frame.clear(); | |
| return false; | |
| } | |
| _frame.resize((size_t)got); | |
| memset(&readMsg, 0, sizeof(readMsg)); | |
| readMsg.msg_iov = &iov; | |
| readMsg.msg_iovlen = 1; | |
| char cbuf[256]; | |
| readMsg.msg_control = cbuf; | |
| readMsg.msg_controllen = sizeof(cbuf); | |
| ssize_t got = ::recvmsg(fd, &readMsg, 0); | |
| if (got > 0 && ((readMsg.msg_flags & MSG_CTRUNC) || readMsg.msg_controllen > 0)) { | |
| _hadError = true; | |
| _frame.clear(); | |
| errno = ENOTSUP; | |
| return false; | |
| } | |
| if (got <= 0) { | |
| if (errno != EAGAIN && errno != EINTR) { | |
| _hadError = true; | |
| } | |
| _frame.clear(); | |
| return false; | |
| } | |
| _frame.resize((size_t)got); |
🤖 Prompt for AI Agents
In src/plugin/ipc/socket/kernelbufferdrainer.cpp around lines 68 to 79, recvmsg
is called without supplying a control buffer so any ancillary data
(SCM_RIGHTS/SCM_CREDENTIALS) is silently dropped; add a small stack control
buffer (e.g. a cmsghdr-sized array) and set readMsg.msg_control and
readMsg.msg_controllen before calling recvmsg, then after recvmsg check if
(readMsg.msg_flags & MSG_CTRUNC) || readMsg.msg_controllen != 0 and if so set
_hadError = true, _frame.clear(), set errno = ENOTSUP and return false as the
minimal guard; optionally note the preferred full fix is to capture and preserve
per-message cmsgs for replay via sendmsg or mark such sockets as protected and
avoid draining.
| #include <stdio.h> | ||
| #include <stdlib.h> | ||
| #include <string.h> | ||
| #include <sys/socket.h> | ||
| #include <sys/un.h> | ||
| #include <unistd.h> | ||
| #include <assert.h> | ||
| #define SOCKET_NAME "/tmp/seqpacket_sock" | ||
| #define MAX_MSG_SIZE 256 | ||
|
|
||
| // Use a pipe to signal the client to start. | ||
| int pipefd[2]; | ||
| int start = 0; | ||
|
|
||
| void error(const char *msg) { | ||
| perror(msg); | ||
| exit(EXIT_FAILURE); | ||
| } | ||
|
|
||
| int server(void) | ||
| { | ||
| struct sockaddr_un name; | ||
| int connection_socket, data_socket; | ||
| char buffer[MAX_MSG_SIZE]; | ||
| int ret; | ||
|
|
||
| /* 1. Create socket */ | ||
| connection_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0); | ||
| if (connection_socket == -1) { | ||
| error("socket"); | ||
| } | ||
|
|
||
| /* Remove existing socket file if it exists */ | ||
| unlink(SOCKET_NAME); | ||
|
|
||
| /* 2. Bind socket to a name (file path) */ | ||
| memset(&name, 0, sizeof(name)); | ||
| name.sun_family = AF_UNIX; | ||
| strncpy(name.sun_path, SOCKET_NAME, sizeof(name.sun_path) - 1); | ||
|
|
||
| ret = bind(connection_socket, (const struct sockaddr *) &name, sizeof(name)); | ||
| if (ret == -1) { | ||
| error("bind"); | ||
| } | ||
|
|
||
| close(pipefd[0]); | ||
| assert(write(pipefd[1], &start, 1) == 1); | ||
|
|
||
| printf("Server: Listening at %s...\n", SOCKET_NAME); | ||
|
|
||
| /* 3. Prepare for accepting connections (backlog size 5) */ | ||
| ret = listen(connection_socket, 5); | ||
| if (ret == -1) { | ||
| error("listen"); | ||
| } | ||
|
|
||
| /* 4. Accept connection and receive message */ | ||
| printf("Server: Waiting for connection...\n"); | ||
|
|
||
| /* Accept a connection */ | ||
| data_socket = accept(connection_socket, NULL, NULL); | ||
| if (data_socket == -1) { | ||
| error("accept"); | ||
| } | ||
|
|
||
| printf("Server: Client connected. Receiving data...\n"); | ||
|
|
||
| /* Main loop to receive and send messages */ | ||
| for (;;) { | ||
| /* Receive a message */ | ||
| memset(buffer, 0, sizeof(buffer)); | ||
| ret = recv(data_socket, buffer, MAX_MSG_SIZE, 0); | ||
| if (ret == -1) { | ||
| error("Server: recv"); | ||
| } else if (ret == 0) { | ||
| printf("Server: Client closed connection.\n"); | ||
| break; | ||
| } else { | ||
| printf("Server: Received message (%d bytes): '%s'\n", ret, buffer); | ||
| } | ||
|
|
||
| // sleep 1 second | ||
| sleep(1); | ||
| /* Send a response message */ | ||
| const char *response = "Server received message."; | ||
| ret = send(data_socket, response, strlen(response) + 1, 0); | ||
| if (ret == -1) { | ||
| error("Server: send"); | ||
| } else { | ||
| printf("Server: Sent response: '%s'\n", response); | ||
| } | ||
| } | ||
|
|
||
| /* Close the connection socket (unreachable in this infinite loop) */ | ||
| close(connection_socket); | ||
| unlink(SOCKET_NAME); | ||
| return 0; | ||
| } | ||
|
|
||
| int client(void) | ||
| { | ||
| struct sockaddr_un name; | ||
| int data_socket; | ||
| char buffer[MAX_MSG_SIZE]; | ||
| int ret; | ||
|
|
||
| /* 1. Create socket */ | ||
| data_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0); | ||
| if (data_socket == -1) { | ||
| error("socket"); | ||
| } | ||
|
|
||
| /* 2. Connect to server socket */ | ||
| memset(&name, 0, sizeof(name)); | ||
| name.sun_family = AF_UNIX; | ||
| strncpy(name.sun_path, SOCKET_NAME, sizeof(name.sun_path) - 1); | ||
|
|
||
| // Wait for the server to start. | ||
| assert(read(pipefd[0], &start, 1) == 1); | ||
| close(pipefd[0]); | ||
| close(pipefd[1]); | ||
|
|
||
| printf("Client: Connecting to server at %s...\n", SOCKET_NAME); | ||
| ret = connect(data_socket, (const struct sockaddr *) &name, sizeof(name)); | ||
| if (ret == -1) { | ||
| error("Client: connect"); | ||
| } | ||
|
|
||
| printf("Client: Connected. Sending messages...\n"); | ||
|
|
||
| /* 3. Send and receive messages in a loop */ | ||
| int count = 0; | ||
| while (1) { | ||
| char msg[MAX_MSG_SIZE]; | ||
| sprintf(msg, "This is message %d.", count); | ||
| ret = send(data_socket, msg, strlen(msg) + 1, 0); | ||
| if (ret == -1) { | ||
| error("Client: send"); | ||
| } | ||
| printf("Client: Sent message (%d bytes).\n", ret); | ||
|
|
||
| /* 5. Wait for server response */ | ||
| memset(buffer, 0, sizeof(buffer)); | ||
| ret = recv(data_socket, buffer, MAX_MSG_SIZE, 0); | ||
| if (ret == -1) { | ||
| error("Client: recv response"); | ||
| } else if (ret == 0) { | ||
| printf("Client: Server closed connection.\n"); | ||
| } else { | ||
| printf("Client: Received response (%d bytes): '%s'\n", ret, buffer); | ||
| } | ||
| count++; | ||
| } | ||
|
|
||
| /* 6. Close the socket */ | ||
| close(data_socket); | ||
| printf("Client: Connection closed.\n"); | ||
|
|
||
| return 0; | ||
| } | ||
|
|
||
| int main(void) | ||
| { | ||
| assert(pipe(pipefd) == 0); | ||
| pid_t pid = fork(); | ||
| assert(pid != -1); | ||
| if (pid == 0) { | ||
| printf("Server: Starting server\n"); | ||
| server(); | ||
| } else { | ||
| printf("Client: Starting client\n"); | ||
| client(); | ||
| } | ||
| return 0; | ||
| } No newline at end of file |
There was a problem hiding this comment.
Avoid address collisions across parallel runs
Using a fixed "/tmp/seqpacket_sock" will fail when tests run in parallel (bind EADDRINUSE). Make the path unique per test instance (child PID) and compute it in both processes.
Apply this diff:
-#define SOCKET_NAME "/tmp/seqpacket_sock"
+static char SOCKET_NAME[108] = "/tmp/seqpacket_sock";
+static void make_socket_name_from_child(pid_t child_pid) {
+ snprintf(SOCKET_NAME, sizeof(SOCKET_NAME), "/tmp/seqpacket_sock_%ld", (long)child_pid);
+}
@@
-int server(void)
+int server(void)
{
@@
- /* Remove existing socket file if it exists */
- unlink(SOCKET_NAME);
+ /* Remove existing socket file if it exists */
+ unlink(SOCKET_NAME);
@@
-int client(void)
+int client(void)
{
@@
-int main(void)
+int main(void)
{
assert(pipe(pipefd) == 0);
pid_t pid = fork();
assert(pid != -1);
if (pid == 0) {
+ /* Child is server: compute name from its own PID */
+ make_socket_name_from_child(getpid());
printf("Server: Starting server\n");
server();
} else {
+ /* Parent is client: compute name from child's PID */
+ make_socket_name_from_child(pid);
printf("Client: Starting client\n");
client();
}
return 0;
}Optional: consider abstract UDS (sun_path[0] = '\0') to avoid FS cleanup entirely.
There was a problem hiding this comment.
@karya0, here's the committable diff:
diff --git a/test/seqpacket.c b/test/seqpacket.c
index 1234567..abcdefg 100644
--- a/test/seqpacket.c
+++ b/test/seqpacket.c
@@ -5,7 +5,13 @@
#include <sys/un.h>
#include <unistd.h>
#include <assert.h>
-#define SOCKET_NAME "/tmp/seqpacket_sock"
+
+static char SOCKET_NAME[108];
+
+static void make_socket_name_from_child(pid_t child_pid) {
+ snprintf(SOCKET_NAME, sizeof(SOCKET_NAME), "/tmp/seqpacket_sock_%ld", (long)child_pid);
+}
+
#define MAX_MSG_SIZE 256
// Use a pipe to signal the client to start.
@@ -160,10 +166,14 @@ int main(void)
assert(pipe(pipefd) == 0);
pid_t pid = fork();
assert(pid != -1);
if (pid == 0) {
+ /* Child is server: compute name from its own PID */
+ make_socket_name_from_child(getpid());
printf("Server: Starting server\n");
server();
} else {
+ /* Parent is client: compute name from child's PID */
+ make_socket_name_from_child(pid);
printf("Client: Starting client\n");
client();
}This change ensures that:
- Each test run uses a unique socket path based on the server process PID
- Parallel test executions won't collide
- Both server and client compute the same path using the child (server) PID
There was a problem hiding this comment.
@coderabbitai please generate a commitable suggestion
There was a problem hiding this comment.
@karya0, I've already provided a committable diff in my previous response that you can apply to your branch. The diff shows all the necessary changes to make the socket path unique per test instance.
However, I cannot generate GitHub suggestion blocks (the kind that can be committed with a single click through the GitHub UI) as they are not part of my response format.
To apply the changes, you can:
- Copy the diff content from my previous message
- Save it to a file (e.g.,
fix.patch) - Apply it using git:
git apply fix.patch
Or simply make the changes manually by:
- Replacing the
#define SOCKET_NAMEwith thestatic chardeclaration and helper function - Adding the calls to
make_socket_name_from_child()in themain()function for both parent and child processes
The changes are straightforward and will prevent socket path collisions when tests run in parallel.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (1)
68-83: Ancillary data handling still missing (previously flagged).The previous critical concern about silently dropping ancillary data (SCM_RIGHTS/SCM_CREDENTIALS) remains unaddressed. AF_UNIX/SEQPACKET sockets commonly carry file descriptors or credentials, and dropping them breaks application semantics after restore.
As previously suggested:
- Minimum guard: Supply a control buffer, and if MSG_CTRUNC is set or msg_controllen > 0, set
_hadError = true, clear_frame, seterrno = ENOTSUP, and return false.- Preferred fix: Extend drained storage to preserve per-message cmsgs and replay via sendmsg (or mark such sockets as protected and skip draining).
Apply this diff for the minimum guard:
struct msghdr readMsg; memset(&readMsg, 0, sizeof(readMsg)); readMsg.msg_iov = &iov; readMsg.msg_iovlen = 1; + char cbuf[256]; + readMsg.msg_control = cbuf; + readMsg.msg_controllen = sizeof(cbuf); ssize_t got = ::recvmsg(fd, &readMsg, 0); + if (got > 0 && ((readMsg.msg_flags & MSG_CTRUNC) || readMsg.msg_controllen > 0)) { + _hadError = true; + _frame.clear(); + errno = ENOTSUP; + return false; + }
🧹 Nitpick comments (2)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (2)
278-282: Consider clarifying assignment-in-condition.Line 278 uses assignment within the condition (
if (_isSeqpacket[fd] = ...)). While functionally correct, splitting into separate statements improves readability:- if (_isSeqpacket[fd] = (baseType == SOCK_SEQPACKET)) { + _isSeqpacket[fd] = (baseType == SOCK_SEQPACKET); + if (_isSeqpacket[fd]) {
289-396: Consider refactoring for clarity and adding bounds validation.The refill logic uses four separate loops with skip conditions, which is correct but complex. Consider the following improvements:
Complexity: Extract stream and seqpacket refill into separate helper methods to improve readability.
Bounds validation (lines 362-367, 384-394):
- Frames use
uint32_tfor length. Consider validating that frame sizes fit within UINT32_MAX before casting.- When receiving peer frames, validate
len32is reasonable before allocating buffers to prevent potential DoS from malicious/corrupted peers.Example bounds check for receiving frames:
for (int c = 0; c < count; ++c) { uint32_t len32 = 0; sock.readAll((char *)&len32, sizeof(len32)); + // Sanity check: reject unreasonably large frames + const uint32_t MAX_FRAME_SIZE = 1024 * 1024 * 16; // 16MB limit + JASSERT(len32 <= MAX_FRAME_SIZE) (len32) (MAX_FRAME_SIZE) + .Text("Peer sent excessively large frame"); if (len32 > 0) {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/plugin/ipc/socket/kernelbufferdrainer.cpp(7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (1)
src/plugin/ipc/ssh/sshdrainer.cpp (2)
beginDrainOf(93-106)beginDrainOf(94-94)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (2)
src/plugin/ipc/socket/kernelbufferdrainer.cpp (2)
53-60: LGTM! Portability issue resolved.The previous concern about using zero-length iovec with MSG_PEEK|MSG_TRUNC has been addressed. The code now provides a 1-byte buffer, ensuring portable behavior across different Unix implementations.
143-163: LGTM!The dual-path handling for seqpacket vs. stream sockets is correct. Seqpacket frames are stored individually in
_drainedFrames, preserving message boundaries, while stream data is appended to_drainedData.
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (2)
src/plugin/ipc/socket/socketconnection.cpp (2)
437-440: Past review comment remains unaddressed.The previous review recommended using
baseType()instead of directly comparing_sockTypeto handle sockets created withSOCK_CLOEXECorSOCK_NONBLOCKflags.Based on past review comments, apply this diff:
if (!(_sockDomain == AF_INET || _sockDomain == AF_INET6) || - _sockType != SOCK_STREAM) { + baseType() != SOCK_STREAM) { return; }
484-486: Past review comment remains unaddressed.The previous review recommended using
baseType()instead of directly comparing_sockTypeto handle sockets created withSOCK_CLOEXECorSOCK_NONBLOCKflags.Based on past review comments, apply this diff:
if (!(_sockDomain == AF_INET || _sockDomain == AF_INET6) || - _sockType != SOCK_STREAM) { + baseType() != SOCK_STREAM) { return; }
🧹 Nitpick comments (1)
src/plugin/ipc/socket/socketconnection.cpp (1)
659-667: Consider flattening the unnecessary block scope.The domain-specific socket type validation using
baseType()is correct and consistent with the constructor implementation.The extra block scope at line 659 adds nesting without providing value. Consider flattening it:
- { - if (_sockDomain == AF_UNIX) { - JWARNING(baseType() == SOCK_STREAM || baseType() == SOCK_SEQPACKET) - (id()) (_sockDomain) (_sockType) (_sockProtocol); - } else if (_sockDomain == AF_INET || _sockDomain == AF_INET6) { - JWARNING(baseType() == SOCK_STREAM) - (id()) (_sockDomain) (_sockType) (_sockProtocol); - } - } + if (_sockDomain == AF_UNIX) { + JWARNING(baseType() == SOCK_STREAM || baseType() == SOCK_SEQPACKET) + (id()) (_sockDomain) (_sockType) (_sockProtocol); + } else if (_sockDomain == AF_INET || _sockDomain == AF_INET6) { + JWARNING(baseType() == SOCK_STREAM) + (id()) (_sockDomain) (_sockType) (_sockProtocol); + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/plugin/ipc/socket/socketconnection.cpp(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (2)
src/plugin/ipc/socket/socketconnection.cpp (2)
239-247: LGTM!The domain-specific socket type validation using
baseType()is correctly implemented. AF_UNIX now properly supports both SOCK_STREAM and SOCK_SEQPACKET, while AF_INET/AF_INET6 remain restricted to SOCK_STREAM.
571-571: LGTM!Correctly passes
baseType()tobeginDrainOf, aligning with the updated KernelBufferDrainer API for SOCK_SEQPACKET support.
|
EDIT: I realized that I only run the client with DMTCP instead of both server and client. This is not an issue. @karya0 Hi Kapil, I used Gene's SOCK_SEQPACKET program to test this PR. I am seeing this assertion failure in the resume event. Here are the JASSERT message: Here's the backtrace, I omitted frame 0~12 because they are JASSERT printing information: |
|
@karya0 had written elsewhere:
@xuyao0127 , Did you have a chance to look at this issue? |
|
@gc00 Where is this comment? I couldn't find it in this PR. |
Summary by CodeRabbit
New Features
Public API
Tests