Skip to content

Commit 7b06c9a

Browse files
committed
Add TTRPC client
Signed-off-by: Maksym Pavlenko <[email protected]>
1 parent 57fbb16 commit 7b06c9a

9 files changed

Lines changed: 313 additions & 162 deletions

File tree

client_ttrpc.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package containerd
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
24+
"github.com/containerd/ttrpc"
25+
"github.com/pkg/errors"
26+
)
27+
28+
const ttrpcDialTimeout = 5 * time.Second
29+
30+
type ttrpcConnector func() (*ttrpc.Client, error)
31+
32+
// ClientTTRPC is the client to interact with TTRPC part of containerd server (plugins, events)
33+
type ClientTTRPC struct {
34+
mu sync.Mutex
35+
connector ttrpcConnector
36+
client *ttrpc.Client
37+
closed bool
38+
}
39+
40+
// NewTTRPC returns a new containerd TTRPC client that is connected to the containerd instance provided by address
41+
func NewTTRPC(address string, opts ...ttrpc.ClientOpts) (*ClientTTRPC, error) {
42+
connector := func() (*ttrpc.Client, error) {
43+
conn, err := ttrpcDial(address, ttrpcDialTimeout)
44+
if err != nil {
45+
return nil, errors.Wrap(err, "failed to connect")
46+
}
47+
48+
client := ttrpc.NewClient(conn, opts...)
49+
return client, nil
50+
}
51+
52+
client, err := connector()
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
return &ClientTTRPC{
58+
connector: connector,
59+
client: client,
60+
}, nil
61+
}
62+
63+
// Reconnect re-establishes the TTRPC connection to the containerd daemon
64+
func (c *ClientTTRPC) Reconnect() error {
65+
c.mu.Lock()
66+
defer c.mu.Unlock()
67+
68+
if c.connector == nil {
69+
return errors.New("unable to reconnect to containerd, no connector available")
70+
}
71+
72+
if c.closed {
73+
return errors.New("client is closed")
74+
}
75+
76+
client, err := c.connector()
77+
if err != nil {
78+
return err
79+
}
80+
81+
c.client = client
82+
return nil
83+
}
84+
85+
// EventsService creates an EventsService client
86+
func (c *ClientTTRPC) EventsService() v1.EventsService {
87+
return v1.NewEventsClient(c.Client())
88+
}
89+
90+
// Client returns the underlying TTRPC client object
91+
func (c *ClientTTRPC) Client() *ttrpc.Client {
92+
c.mu.Lock()
93+
defer c.mu.Unlock()
94+
95+
return c.client
96+
}
97+
98+
// Close closes the clients TTRPC connection to containerd
99+
func (c *ClientTTRPC) Close() error {
100+
c.mu.Lock()
101+
defer c.mu.Unlock()
102+
103+
c.closed = true
104+
return c.client.Close()
105+
}

client_ttrpc_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package containerd
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
25+
"github.com/containerd/containerd/namespaces"
26+
"github.com/containerd/ttrpc"
27+
"github.com/gogo/protobuf/types"
28+
"gotest.tools/assert"
29+
)
30+
31+
func TestClientTTRPC_New(t *testing.T) {
32+
client, err := NewTTRPC(address + ".ttrpc")
33+
assert.NilError(t, err)
34+
35+
err = client.Close()
36+
assert.NilError(t, err)
37+
}
38+
39+
func TestClientTTRPC_Reconnect(t *testing.T) {
40+
client, err := NewTTRPC(address + ".ttrpc")
41+
assert.NilError(t, err)
42+
43+
err = client.Reconnect()
44+
assert.NilError(t, err)
45+
46+
// Send test request to make sure its alive after reconnect
47+
_, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{
48+
Envelope: &v1.Envelope{
49+
Timestamp: time.Now(),
50+
Namespace: namespaces.Default,
51+
Topic: "/test",
52+
Event: &types.Any{},
53+
},
54+
})
55+
assert.NilError(t, err)
56+
57+
err = client.Close()
58+
assert.NilError(t, err)
59+
}
60+
61+
func TestClientTTRPC_Close(t *testing.T) {
62+
client, err := NewTTRPC(address + ".ttrpc")
63+
assert.NilError(t, err)
64+
65+
err = client.Close()
66+
assert.NilError(t, err)
67+
68+
_, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{Envelope: &v1.Envelope{}})
69+
assert.Equal(t, err, ttrpc.ErrClosed)
70+
71+
err = client.Close()
72+
assert.NilError(t, err)
73+
}

client_ttrpc_unix.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// +build !windows
2+
3+
/*
4+
Copyright The containerd Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package containerd
20+
21+
import (
22+
"net"
23+
"strings"
24+
"time"
25+
)
26+
27+
func ttrpcDial(address string, timeout time.Duration) (net.Conn, error) {
28+
address = strings.TrimPrefix(address, "unix://")
29+
return net.DialTimeout("unix", address, timeout)
30+
}

client_ttrpc_windows.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// +build windows
2+
3+
/*
4+
Copyright The containerd Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package containerd
20+
21+
import (
22+
"net"
23+
"os"
24+
"time"
25+
26+
winio "github.com/Microsoft/go-winio"
27+
"github.com/pkg/errors"
28+
)
29+
30+
func ttrpcDial(address string, timeout time.Duration) (net.Conn, error) {
31+
var c net.Conn
32+
var lastError error
33+
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
34+
start := time.Now()
35+
for {
36+
remaining := timeout - time.Since(start)
37+
if remaining <= 0 {
38+
lastError = timedOutError
39+
break
40+
}
41+
c, lastError = winio.DialPipe(address, &remaining)
42+
if lastError == nil {
43+
break
44+
}
45+
if !os.IsNotExist(lastError) {
46+
break
47+
}
48+
// There is nobody serving the pipe. We limit the timeout for this case
49+
// to 5 seconds because any shim that would serve this endpoint should
50+
// serve it within 5 seconds. We use the passed in timeout for the
51+
// `DialPipe` timeout if the pipe exists however to give the pipe time
52+
// to `Accept` the connection.
53+
if time.Since(start) >= 5*time.Second {
54+
lastError = timedOutError
55+
break
56+
}
57+
time.Sleep(10 * time.Millisecond)
58+
}
59+
return c, lastError
60+
}

runtime/v2/shim/dialer.go

Lines changed: 0 additions & 87 deletions
This file was deleted.

0 commit comments

Comments
 (0)