@@ -80,6 +80,7 @@ ConnectionRewirer::destroy()
8080 dmtcp_close_protected_fd (PROTECTED_RESTORE_IP4_SOCK_FD);
8181 dmtcp_close_protected_fd (PROTECTED_RESTORE_IP6_SOCK_FD);
8282 dmtcp_close_protected_fd (PROTECTED_RESTORE_UDS_SOCK_FD);
83+ dmtcp_close_protected_fd (PROTECTED_RESTORE_UDS_SEQ_SOCK_FD);
8384
8485 // Free up the object.
8586 delete theRewirer;
@@ -158,6 +159,13 @@ ConnectionRewirer::doReconnect()
158159 &_pendingUDSIncoming);
159160 _real_close (PROTECTED_RESTORE_UDS_SOCK_FD);
160161 }
162+ if (_pendingUDSSeqIncoming.size () > 0 ) {
163+ // Add O_NONBLOCK flag to the listener sockets.
164+ markSocketBlocking (PROTECTED_RESTORE_UDS_SEQ_SOCK_FD);
165+ checkForPendingIncoming (PROTECTED_RESTORE_UDS_SEQ_SOCK_FD,
166+ &_pendingUDSSeqIncoming);
167+ _real_close (PROTECTED_RESTORE_UDS_SEQ_SOCK_FD);
168+ }
161169 JTRACE (" Closed restore sockets" );
162170}
163171
@@ -169,6 +177,7 @@ ConnectionRewirer::openRestoreSocket(bool hasIPv4Sock,
169177 memset (&_ip4RestoreAddr, 0 , sizeof (_ip4RestoreAddr));
170178 memset (&_ip6RestoreAddr, 0 , sizeof (_ip6RestoreAddr));
171179 memset (&_udsRestoreAddr, 0 , sizeof (_udsRestoreAddr));
180+ memset (&_udsSeqRestoreAddr, 0 , sizeof (_udsSeqRestoreAddr));
172181
173182 // Open IP4 Restore Socket
174183 if (hasIPv4Sock) {
@@ -223,7 +232,9 @@ ConnectionRewirer::openRestoreSocket(bool hasIPv4Sock,
223232 // Open UDS Restore Socket
224233 if (hasUNIXSock) {
225234 ostringstream o;
226- o << dmtcp_get_uniquepid_str () << " _" << dmtcp_get_coordinator_timestamp ();
235+ // We should use the original path along with the unique pid and timestamp.
236+ // That will ensure that the socket is unique.
237+ o << dmtcp_get_uniquepid_str () << " _" << dmtcp_get_coordinator_timestamp () << " _" << _udsRestoreAddr.sun_path ;
227238 string str = o.str ();
228239 int udsfd = _real_socket (AF_UNIX, SOCK_STREAM, 0 );
229240 JASSERT (udsfd != -1 );
@@ -240,6 +251,29 @@ ConnectionRewirer::openRestoreSocket(bool hasIPv4Sock,
240251 JTRACE (" opened UDS listen socket" )
241252 (PROTECTED_RESTORE_UDS_SOCK_FD) (&_udsRestoreAddr.sun_path [1 ]);
242253 markSocketNonBlocking (PROTECTED_RESTORE_UDS_SOCK_FD);
254+
255+ // Also open a seqpacket listener for AF_UNIX SOCK_SEQPACKET reconnections
256+ int udsseqfd = _real_socket (AF_UNIX, SOCK_SEQPACKET, 0 );
257+ JASSERT (udsseqfd != -1 );
258+ memset (&_udsSeqRestoreAddr, 0 , sizeof (struct sockaddr_un ));
259+ _udsSeqRestoreAddr.sun_family = AF_UNIX;
260+ // Use a different abstract path suffix to avoid collision
261+ ostringstream o2;
262+ // We should use the original path along with the unique pid and timestamp.
263+ // That will ensure that the socket is unique.
264+ o2 << dmtcp_get_uniquepid_str () << " _" << dmtcp_get_coordinator_timestamp () << " _" << _udsRestoreAddr.sun_path ;
265+ string strSeq = o2.str ();
266+ strncpy (&_udsSeqRestoreAddr.sun_path [1 ], strSeq.c_str (), strSeq.length ());
267+ _udsSeqRestoreAddrlen = sizeof (sa_family_t ) + strSeq.length () + 1 ;
268+ JASSERT (_real_bind (udsseqfd, (struct sockaddr *)&_udsSeqRestoreAddr,
269+ _udsSeqRestoreAddrlen) == 0 )
270+ (JASSERT_ERRNO);
271+ JASSERT (_real_listen (udsseqfd, 32 ) == 0 ) (JASSERT_ERRNO);
272+ Util::changeFd (udsseqfd, PROTECTED_RESTORE_UDS_SEQ_SOCK_FD);
273+
274+ JTRACE (" opened UDS SEQPACKET listen socket" )
275+ (PROTECTED_RESTORE_UDS_SEQ_SOCK_FD) (&_udsSeqRestoreAddr.sun_path [1 ]);
276+ markSocketNonBlocking (PROTECTED_RESTORE_UDS_SEQ_SOCK_FD);
243277 }
244278}
245279
@@ -250,6 +284,8 @@ ConnectionRewirer::registerIncoming(const ConnectionIdentifier &local,
250284{
251285 JASSERT (domain == AF_INET || domain == AF_INET6 || domain == AF_UNIX)
252286 (domain).Text (" Unsupported domain." );
287+ SocketConnection *sc = dynamic_cast <SocketConnection *>(con);
288+ JASSERT (sc != NULL );
253289
254290 if (domain == AF_INET) {
255291 _pendingIP4Incoming[local] = con;
@@ -260,7 +296,12 @@ ConnectionRewirer::registerIncoming(const ConnectionIdentifier &local,
260296 _pendingIP4Incoming[local] = con;
261297#endif // ifdef ENABLE_IP6_SUPPORT
262298 } else if (domain == AF_UNIX) {
263- _pendingUDSIncoming[local] = con;
299+ // Route AF_UNIX by base type: stream vs seqpacket
300+ if (sc->baseType () == SOCK_SEQPACKET) {
301+ _pendingUDSSeqIncoming[local] = con;
302+ } else {
303+ _pendingUDSIncoming[local] = con;
304+ }
264305 } else {
265306 JASSERT (false ).Text (" Not implemented" );
266307 }
@@ -285,6 +326,8 @@ ConnectionRewirer::registerNSData()
285326 &_pendingIP6Incoming);
286327 registerNSData ((void *)&_udsRestoreAddr, _udsRestoreAddrlen,
287328 &_pendingUDSIncoming);
329+ registerNSData ((void *)&_udsSeqRestoreAddr, _udsSeqRestoreAddrlen,
330+ &_pendingUDSSeqIncoming);
288331}
289332
290333void
0 commit comments