Skip to content

Commit 8322364

Browse files
committed
Added support SOCK_SEQPACKET.
1 parent 7947946 commit 8322364

File tree

7 files changed

+253
-34
lines changed

7 files changed

+253
-34
lines changed

include/protectedfds.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ enum ProtectedFds {
2929
PROTECTED_RESTORE_IP4_SOCK_FD,
3030
PROTECTED_RESTORE_IP6_SOCK_FD,
3131
PROTECTED_RESTORE_UDS_SOCK_FD,
32+
PROTECTED_RESTORE_UDS_SEQ_SOCK_FD,
3233
PROTECTED_COORD_ALT_FD,
3334
PROTECTED_STDERR_FD,
3435
PROTECTED_JASSERTLOG_FD,

src/plugin/ipc/socket/connectionrewirer.cpp

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ ConnectionRewirer::destroy()
8080
dmtcp_close_protected_fd(PROTECTED_RESTORE_IP4_SOCK_FD);
8181
dmtcp_close_protected_fd(PROTECTED_RESTORE_IP6_SOCK_FD);
8282
dmtcp_close_protected_fd(PROTECTED_RESTORE_UDS_SOCK_FD);
83+
dmtcp_close_protected_fd(PROTECTED_RESTORE_UDS_SEQ_SOCK_FD);
8384

8485
// Free up the object.
8586
delete theRewirer;
@@ -158,6 +159,13 @@ ConnectionRewirer::doReconnect()
158159
&_pendingUDSIncoming);
159160
_real_close(PROTECTED_RESTORE_UDS_SOCK_FD);
160161
}
162+
if (_pendingUDSSeqIncoming.size() > 0) {
163+
// Add O_NONBLOCK flag to the listener sockets.
164+
markSocketBlocking(PROTECTED_RESTORE_UDS_SEQ_SOCK_FD);
165+
checkForPendingIncoming(PROTECTED_RESTORE_UDS_SEQ_SOCK_FD,
166+
&_pendingUDSSeqIncoming);
167+
_real_close(PROTECTED_RESTORE_UDS_SEQ_SOCK_FD);
168+
}
161169
JTRACE("Closed restore sockets");
162170
}
163171

@@ -169,6 +177,7 @@ ConnectionRewirer::openRestoreSocket(bool hasIPv4Sock,
169177
memset(&_ip4RestoreAddr, 0, sizeof(_ip4RestoreAddr));
170178
memset(&_ip6RestoreAddr, 0, sizeof(_ip6RestoreAddr));
171179
memset(&_udsRestoreAddr, 0, sizeof(_udsRestoreAddr));
180+
memset(&_udsSeqRestoreAddr, 0, sizeof(_udsSeqRestoreAddr));
172181

173182
// Open IP4 Restore Socket
174183
if (hasIPv4Sock) {
@@ -223,7 +232,9 @@ ConnectionRewirer::openRestoreSocket(bool hasIPv4Sock,
223232
// Open UDS Restore Socket
224233
if (hasUNIXSock) {
225234
ostringstream o;
226-
o << dmtcp_get_uniquepid_str() << "_" << dmtcp_get_coordinator_timestamp();
235+
// We should use the original path along with the unique pid and timestamp.
236+
// That will ensure that the socket is unique.
237+
o << dmtcp_get_uniquepid_str() << "_" << dmtcp_get_coordinator_timestamp() << "_" << _udsRestoreAddr.sun_path;
227238
string str = o.str();
228239
int udsfd = _real_socket(AF_UNIX, SOCK_STREAM, 0);
229240
JASSERT(udsfd != -1);
@@ -240,6 +251,29 @@ ConnectionRewirer::openRestoreSocket(bool hasIPv4Sock,
240251
JTRACE("opened UDS listen socket")
241252
(PROTECTED_RESTORE_UDS_SOCK_FD) (&_udsRestoreAddr.sun_path[1]);
242253
markSocketNonBlocking(PROTECTED_RESTORE_UDS_SOCK_FD);
254+
255+
// Also open a seqpacket listener for AF_UNIX SOCK_SEQPACKET reconnections
256+
int udsseqfd = _real_socket(AF_UNIX, SOCK_SEQPACKET, 0);
257+
JASSERT(udsseqfd != -1);
258+
memset(&_udsSeqRestoreAddr, 0, sizeof(struct sockaddr_un));
259+
_udsSeqRestoreAddr.sun_family = AF_UNIX;
260+
// Use a different abstract path suffix to avoid collision
261+
ostringstream o2;
262+
// We should use the original path along with the unique pid and timestamp.
263+
// That will ensure that the socket is unique.
264+
o2 << dmtcp_get_uniquepid_str() << "_" << dmtcp_get_coordinator_timestamp() << "_" << _udsRestoreAddr.sun_path;
265+
string strSeq = o2.str();
266+
strncpy(&_udsSeqRestoreAddr.sun_path[1], strSeq.c_str(), strSeq.length());
267+
_udsSeqRestoreAddrlen = sizeof(sa_family_t) + strSeq.length() + 1;
268+
JASSERT(_real_bind(udsseqfd, (struct sockaddr *)&_udsSeqRestoreAddr,
269+
_udsSeqRestoreAddrlen) == 0)
270+
(JASSERT_ERRNO);
271+
JASSERT(_real_listen(udsseqfd, 32) == 0) (JASSERT_ERRNO);
272+
Util::changeFd(udsseqfd, PROTECTED_RESTORE_UDS_SEQ_SOCK_FD);
273+
274+
JTRACE("opened UDS SEQPACKET listen socket")
275+
(PROTECTED_RESTORE_UDS_SEQ_SOCK_FD) (&_udsSeqRestoreAddr.sun_path[1]);
276+
markSocketNonBlocking(PROTECTED_RESTORE_UDS_SEQ_SOCK_FD);
243277
}
244278
}
245279

@@ -250,6 +284,8 @@ ConnectionRewirer::registerIncoming(const ConnectionIdentifier &local,
250284
{
251285
JASSERT(domain == AF_INET || domain == AF_INET6 || domain == AF_UNIX)
252286
(domain).Text("Unsupported domain.");
287+
SocketConnection *sc = dynamic_cast<SocketConnection *>(con);
288+
JASSERT(sc != NULL);
253289

254290
if (domain == AF_INET) {
255291
_pendingIP4Incoming[local] = con;
@@ -260,7 +296,12 @@ ConnectionRewirer::registerIncoming(const ConnectionIdentifier &local,
260296
_pendingIP4Incoming[local] = con;
261297
#endif // ifdef ENABLE_IP6_SUPPORT
262298
} else if (domain == AF_UNIX) {
263-
_pendingUDSIncoming[local] = con;
299+
// Route AF_UNIX by base type: stream vs seqpacket
300+
if (sc->baseType() == SOCK_SEQPACKET) {
301+
_pendingUDSSeqIncoming[local] = con;
302+
} else {
303+
_pendingUDSIncoming[local] = con;
304+
}
264305
} else {
265306
JASSERT(false).Text("Not implemented");
266307
}
@@ -285,6 +326,8 @@ ConnectionRewirer::registerNSData()
285326
&_pendingIP6Incoming);
286327
registerNSData((void *)&_udsRestoreAddr, _udsRestoreAddrlen,
287328
&_pendingUDSIncoming);
329+
registerNSData((void *)&_udsSeqRestoreAddr, _udsSeqRestoreAddrlen,
330+
&_pendingUDSSeqIncoming);
288331
}
289332

290333
void

src/plugin/ipc/socket/connectionrewirer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class ConnectionRewirer
6767
socklen_t _ip6RestoreAddrlen;
6868
struct sockaddr_un _udsRestoreAddr;
6969
socklen_t _udsRestoreAddrlen;
70+
struct sockaddr_un _udsSeqRestoreAddr;
71+
socklen_t _udsSeqRestoreAddrlen;
7072

7173
typedef ConnectionListT::iterator iterator;
7274
typedef ConnectionListT::const_iterator const_iterator;
@@ -76,6 +78,7 @@ class ConnectionRewirer
7678
ConnectionListT _pendingIP4Incoming;
7779
ConnectionListT _pendingIP6Incoming;
7880
ConnectionListT _pendingUDSIncoming;
81+
ConnectionListT _pendingUDSSeqIncoming;
7982

8083
ConnectionListT _pendingOutgoing;
8184
RemoteInfoT _remoteInfo;

0 commit comments

Comments
 (0)