@@ -7,24 +7,104 @@ import (
77 "io"
88 "net"
99 "os"
10+ "runtime"
11+ "sync"
1012)
1113
1214// EnvKey represents the well-known environment variable used to pass the plugin being
1315// executed the socket name it should listen on to coordinate with the host CLI.
1416const EnvKey = "DOCKER_CLI_PLUGIN_SOCKET"
1517
16- // SetupConn sets up a Unix socket listener, establishes a goroutine to handle connections
17- // and update the conn pointer, and returns the listener for the socket (which the caller
18- // is responsible for closing when it's no longer needed).
19- func SetupConn (conn * * net.UnixConn ) (* net.UnixListener , error ) {
20- listener , err := listen ("docker_cli_" + randomID ())
18+ // NewPluginServer creates a plugin server that listens on a new Unix domain socket.
19+ // `h` is called for each new connection to the socket in a goroutine.
20+ func NewPluginServer (h func (net.Conn )) (* PluginServer , error ) {
21+ l , err := listen ("docker_cli_" + randomID ())
2122 if err != nil {
2223 return nil , err
2324 }
2425
25- accept (listener , conn )
26+ if h == nil {
27+ h = func (net.Conn ) {}
28+ }
29+
30+ pl := & PluginServer {
31+ l : l ,
32+ h : h ,
33+ }
34+
35+ go func () {
36+ defer pl .Close ()
37+ for {
38+ err := pl .accept ()
39+ if err != nil {
40+ return
41+ }
42+ }
43+ }()
44+
45+ return pl , nil
46+ }
47+
48+ type PluginServer struct {
49+ mu sync.Mutex
50+ conns []net.Conn
51+ l * net.UnixListener
52+ h func (net.Conn )
53+ closed bool
54+ }
55+
56+ func (pl * PluginServer ) accept () error {
57+ conn , err := pl .l .Accept ()
58+ if err != nil {
59+ return err
60+ }
61+
62+ pl .mu .Lock ()
63+ defer pl .mu .Unlock ()
64+
65+ if pl .closed {
66+ // handle potential race condition between Close and Accept
67+ conn .Close ()
68+ return errors .New ("plugin server is closed" )
69+ }
2670
27- return listener , nil
71+ pl .conns = append (pl .conns , conn )
72+
73+ go pl .h (conn )
74+ return nil
75+ }
76+
77+ func (pl * PluginServer ) Addr () net.Addr {
78+ return pl .l .Addr ()
79+ }
80+
81+ // Close ensures that the server is no longer accepting new connections and closes all existing connections.
82+ // Existing connections will receive [io.EOF].
83+ func (pl * PluginServer ) Close () error {
84+ // Remove the listener socket, if it exists on the filesystem.
85+ unlink (pl .l )
86+
87+ // Close connections first to ensure the connections get io.EOF instead of a connection reset.
88+ pl .closeAllConns ()
89+
90+ // Try to ensure that any active connections have a chance to receive io.EOF
91+ runtime .Gosched ()
92+
93+ return pl .l .Close ()
94+ }
95+
96+ func (pl * PluginServer ) closeAllConns () {
97+ pl .mu .Lock ()
98+ defer pl .mu .Unlock ()
99+
100+ // Prevent new connections from being accepted
101+ pl .closed = true
102+
103+ for _ , conn := range pl .conns {
104+ conn .Close ()
105+ }
106+
107+ pl .conns = nil
28108}
29109
30110func randomID () string {
@@ -35,18 +115,6 @@ func randomID() string {
35115 return hex .EncodeToString (b )
36116}
37117
38- func accept (listener * net.UnixListener , conn * * net.UnixConn ) {
39- go func () {
40- for {
41- // ignore error here, if we failed to accept a connection,
42- // conn is nil and we fallback to previous behavior
43- * conn , _ = listener .AcceptUnix ()
44- // perform any platform-specific actions on accept (e.g. unlink non-abstract sockets)
45- onAccept (* conn , listener )
46- }
47- }()
48- }
49-
50118// ConnectAndWait connects to the socket passed via well-known env var,
51119// if present, and attempts to read from it until it receives an EOF, at which
52120// point cb is called.
0 commit comments