Skip to content

Commit 318911b

Browse files
authored
Merge pull request #4905 from cpuguy83/plugin_notify_conn_cleanup
plugin: closer-based plugin notification socket
2 parents 4468148 + d68cc0e commit 318911b

8 files changed

Lines changed: 244 additions & 139 deletions

File tree

cli-plugins/socket/socket.go

Lines changed: 87 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,104 @@ import (
77
"io"
88
"net"
99
"os"
10+
"runtime"
11+
"sync"
1012
)
1113

1214
// EnvKey represents the well-known environment variable used to pass the plugin being
1315
// executed the socket name it should listen on to coordinate with the host CLI.
1416
const EnvKey = "DOCKER_CLI_PLUGIN_SOCKET"
1517

16-
// SetupConn sets up a Unix socket listener, establishes a goroutine to handle connections
17-
// and update the conn pointer, and returns the listener for the socket (which the caller
18-
// is responsible for closing when it's no longer needed).
19-
func SetupConn(conn **net.UnixConn) (*net.UnixListener, error) {
20-
listener, err := listen("docker_cli_" + randomID())
18+
// NewPluginServer creates a plugin server that listens on a new Unix domain socket.
19+
// `h` is called for each new connection to the socket in a goroutine.
20+
func NewPluginServer(h func(net.Conn)) (*PluginServer, error) {
21+
l, err := listen("docker_cli_" + randomID())
2122
if err != nil {
2223
return nil, err
2324
}
2425

25-
accept(listener, conn)
26+
if h == nil {
27+
h = func(net.Conn) {}
28+
}
29+
30+
pl := &PluginServer{
31+
l: l,
32+
h: h,
33+
}
34+
35+
go func() {
36+
defer pl.Close()
37+
for {
38+
err := pl.accept()
39+
if err != nil {
40+
return
41+
}
42+
}
43+
}()
44+
45+
return pl, nil
46+
}
47+
48+
type PluginServer struct {
49+
mu sync.Mutex
50+
conns []net.Conn
51+
l *net.UnixListener
52+
h func(net.Conn)
53+
closed bool
54+
}
55+
56+
func (pl *PluginServer) accept() error {
57+
conn, err := pl.l.Accept()
58+
if err != nil {
59+
return err
60+
}
61+
62+
pl.mu.Lock()
63+
defer pl.mu.Unlock()
64+
65+
if pl.closed {
66+
// handle potential race condition between Close and Accept
67+
conn.Close()
68+
return errors.New("plugin server is closed")
69+
}
2670

27-
return listener, nil
71+
pl.conns = append(pl.conns, conn)
72+
73+
go pl.h(conn)
74+
return nil
75+
}
76+
77+
func (pl *PluginServer) Addr() net.Addr {
78+
return pl.l.Addr()
79+
}
80+
81+
// Close ensures that the server is no longer accepting new connections and closes all existing connections.
82+
// Existing connections will receive [io.EOF].
83+
func (pl *PluginServer) Close() error {
84+
// Remove the listener socket, if it exists on the filesystem.
85+
unlink(pl.l)
86+
87+
// Close connections first to ensure the connections get io.EOF instead of a connection reset.
88+
pl.closeAllConns()
89+
90+
// Try to ensure that any active connections have a chance to receive io.EOF
91+
runtime.Gosched()
92+
93+
return pl.l.Close()
94+
}
95+
96+
func (pl *PluginServer) closeAllConns() {
97+
pl.mu.Lock()
98+
defer pl.mu.Unlock()
99+
100+
// Prevent new connections from being accepted
101+
pl.closed = true
102+
103+
for _, conn := range pl.conns {
104+
conn.Close()
105+
}
106+
107+
pl.conns = nil
28108
}
29109

30110
func randomID() string {
@@ -35,18 +115,6 @@ func randomID() string {
35115
return hex.EncodeToString(b)
36116
}
37117

38-
func accept(listener *net.UnixListener, conn **net.UnixConn) {
39-
go func() {
40-
for {
41-
// ignore error here, if we failed to accept a connection,
42-
// conn is nil and we fallback to previous behavior
43-
*conn, _ = listener.AcceptUnix()
44-
// perform any platform-specific actions on accept (e.g. unlink non-abstract sockets)
45-
onAccept(*conn, listener)
46-
}
47-
}()
48-
}
49-
50118
// ConnectAndWait connects to the socket passed via well-known env var,
51119
// if present, and attempts to read from it until it receives an EOF, at which
52120
// point cb is called.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
//go:build windows || linux
2+
3+
package socket
4+
5+
import (
6+
"net"
7+
)
8+
9+
func listen(socketname string) (*net.UnixListener, error) {
10+
// Create an abstract socket -- this socket can be opened by name, but is
11+
// not present in the filesystem.
12+
return net.ListenUnix("unix", &net.UnixAddr{
13+
Name: "@" + socketname,
14+
Net: "unix",
15+
})
16+
}
17+
18+
func unlink(listener *net.UnixListener) {
19+
// Do nothing; the socket is not present in the filesystem.
20+
}

cli-plugins/socket/socket_darwin.go

Lines changed: 0 additions & 19 deletions
This file was deleted.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
//go:build !windows && !linux
2+
3+
package socket
4+
5+
import (
6+
"net"
7+
"os"
8+
"path/filepath"
9+
"syscall"
10+
)
11+
12+
func listen(socketname string) (*net.UnixListener, error) {
13+
// Because abstract sockets are unavailable, we create a socket in the
14+
// system temporary directory instead.
15+
return net.ListenUnix("unix", &net.UnixAddr{
16+
Name: filepath.Join(os.TempDir(), socketname),
17+
Net: "unix",
18+
})
19+
}
20+
21+
func unlink(listener *net.UnixListener) {
22+
// unlink(2) is best effort here; if it fails, we may 'leak' a socket
23+
// into the filesystem, but this is unlikely and overall harmless.
24+
_ = syscall.Unlink(listener.Addr().String())
25+
}

cli-plugins/socket/socket_nodarwin.go

Lines changed: 0 additions & 20 deletions
This file was deleted.

cli-plugins/socket/socket_openbsd.go

Lines changed: 0 additions & 19 deletions
This file was deleted.

0 commit comments

Comments
 (0)