Skip to content

Commit bcb6c8d

Browse files
Merge pull request #3279 from mxpv/ttrpc
Add TTRPC client
2 parents cafda1c + 7f79fbb commit bcb6c8d

9 files changed

Lines changed: 311 additions & 160 deletions

File tree

client_ttrpc_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package containerd
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
25+
"github.com/containerd/containerd/namespaces"
26+
"github.com/containerd/containerd/pkg/ttrpcutil"
27+
"github.com/containerd/ttrpc"
28+
"github.com/gogo/protobuf/types"
29+
"gotest.tools/assert"
30+
)
31+
32+
func TestClientTTRPC_New(t *testing.T) {
33+
client, err := ttrpcutil.NewClient(address + ".ttrpc")
34+
assert.NilError(t, err)
35+
36+
err = client.Close()
37+
assert.NilError(t, err)
38+
}
39+
40+
func TestClientTTRPC_Reconnect(t *testing.T) {
41+
client, err := ttrpcutil.NewClient(address + ".ttrpc")
42+
assert.NilError(t, err)
43+
44+
err = client.Reconnect()
45+
assert.NilError(t, err)
46+
47+
// Send test request to make sure its alive after reconnect
48+
_, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{
49+
Envelope: &v1.Envelope{
50+
Timestamp: time.Now(),
51+
Namespace: namespaces.Default,
52+
Topic: "/test",
53+
Event: &types.Any{},
54+
},
55+
})
56+
assert.NilError(t, err)
57+
58+
err = client.Close()
59+
assert.NilError(t, err)
60+
}
61+
62+
func TestClientTTRPC_Close(t *testing.T) {
63+
client, err := ttrpcutil.NewClient(address + ".ttrpc")
64+
assert.NilError(t, err)
65+
66+
err = client.Close()
67+
assert.NilError(t, err)
68+
69+
_, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{Envelope: &v1.Envelope{}})
70+
assert.Equal(t, err, ttrpc.ErrClosed)
71+
72+
err = client.Close()
73+
assert.NilError(t, err)
74+
}

pkg/ttrpcutil/client.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package ttrpcutil
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
24+
"github.com/containerd/ttrpc"
25+
"github.com/pkg/errors"
26+
)
27+
28+
const ttrpcDialTimeout = 5 * time.Second
29+
30+
type ttrpcConnector func() (*ttrpc.Client, error)
31+
32+
// Client is the client to interact with TTRPC part of containerd server (plugins, events)
33+
type Client struct {
34+
mu sync.Mutex
35+
connector ttrpcConnector
36+
client *ttrpc.Client
37+
closed bool
38+
}
39+
40+
// NewClient returns a new containerd TTRPC client that is connected to the containerd instance provided by address
41+
func NewClient(address string, opts ...ttrpc.ClientOpts) (*Client, error) {
42+
connector := func() (*ttrpc.Client, error) {
43+
conn, err := ttrpcDial(address, ttrpcDialTimeout)
44+
if err != nil {
45+
return nil, errors.Wrap(err, "failed to connect")
46+
}
47+
48+
client := ttrpc.NewClient(conn, opts...)
49+
return client, nil
50+
}
51+
52+
client, err := connector()
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
return &Client{
58+
connector: connector,
59+
client: client,
60+
}, nil
61+
}
62+
63+
// Reconnect re-establishes the TTRPC connection to the containerd daemon
64+
func (c *Client) Reconnect() error {
65+
c.mu.Lock()
66+
defer c.mu.Unlock()
67+
68+
if c.connector == nil {
69+
return errors.New("unable to reconnect to containerd, no connector available")
70+
}
71+
72+
if c.closed {
73+
return errors.New("client is closed")
74+
}
75+
76+
client, err := c.connector()
77+
if err != nil {
78+
return err
79+
}
80+
81+
c.client = client
82+
return nil
83+
}
84+
85+
// EventsService creates an EventsService client
86+
func (c *Client) EventsService() v1.EventsService {
87+
return v1.NewEventsClient(c.Client())
88+
}
89+
90+
// Client returns the underlying TTRPC client object
91+
func (c *Client) Client() *ttrpc.Client {
92+
c.mu.Lock()
93+
defer c.mu.Unlock()
94+
95+
return c.client
96+
}
97+
98+
// Close closes the clients TTRPC connection to containerd
99+
func (c *Client) Close() error {
100+
c.mu.Lock()
101+
defer c.mu.Unlock()
102+
103+
c.closed = true
104+
return c.client.Close()
105+
}

pkg/ttrpcutil/client_unix.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// +build !windows
2+
3+
/*
4+
Copyright The containerd Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package ttrpcutil
20+
21+
import (
22+
"net"
23+
"strings"
24+
"time"
25+
)
26+
27+
func ttrpcDial(address string, timeout time.Duration) (net.Conn, error) {
28+
address = strings.TrimPrefix(address, "unix://")
29+
return net.DialTimeout("unix", address, timeout)
30+
}

pkg/ttrpcutil/client_windows.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// +build windows
2+
3+
/*
4+
Copyright The containerd Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package ttrpcutil
20+
21+
import (
22+
"net"
23+
"os"
24+
"time"
25+
26+
winio "github.com/Microsoft/go-winio"
27+
"github.com/pkg/errors"
28+
)
29+
30+
func ttrpcDial(address string, timeout time.Duration) (net.Conn, error) {
31+
var c net.Conn
32+
var lastError error
33+
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
34+
start := time.Now()
35+
for {
36+
remaining := timeout - time.Since(start)
37+
if remaining <= 0 {
38+
lastError = timedOutError
39+
break
40+
}
41+
c, lastError = winio.DialPipe(address, &remaining)
42+
if lastError == nil {
43+
break
44+
}
45+
if !os.IsNotExist(lastError) {
46+
break
47+
}
48+
// There is nobody serving the pipe. We limit the timeout for this case
49+
// to 5 seconds because any shim that would serve this endpoint should
50+
// serve it within 5 seconds. We use the passed in timeout for the
51+
// `DialPipe` timeout if the pipe exists however to give the pipe time
52+
// to `Accept` the connection.
53+
if time.Since(start) >= 5*time.Second {
54+
lastError = timedOutError
55+
break
56+
}
57+
time.Sleep(10 * time.Millisecond)
58+
}
59+
return c, lastError
60+
}

runtime/v2/shim/dialer.go

Lines changed: 0 additions & 87 deletions
This file was deleted.

0 commit comments

Comments
 (0)