Skip to content

Commit 047348e

Browse files
committed
Add dialer for events service
Signed-off-by: Michael Crosby <[email protected]>
1 parent ae87730 commit 047348e

5 files changed

Lines changed: 178 additions & 60 deletions

File tree

runtime/v2/shim/dialer.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package shim
18+
19+
import (
20+
"net"
21+
"sync"
22+
23+
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
24+
"github.com/containerd/ttrpc"
25+
"github.com/pkg/errors"
26+
)
27+
28+
type dialConnect func() (net.Conn, error)
29+
30+
var errDialerClosed = errors.New("events dialer is closed")
31+
32+
func newDialier(newFn dialConnect) *dialer {
33+
return &dialer{
34+
newFn: newFn,
35+
}
36+
}
37+
38+
type dialer struct {
39+
mu sync.Mutex
40+
41+
newFn dialConnect
42+
service v1.EventsService
43+
conn net.Conn
44+
closed bool
45+
}
46+
47+
func (d *dialer) Get() (v1.EventsService, error) {
48+
d.mu.Lock()
49+
defer d.mu.Unlock()
50+
51+
if d.closed {
52+
return nil, errDialerClosed
53+
}
54+
if d.service == nil {
55+
conn, err := d.newFn()
56+
if err != nil {
57+
return nil, err
58+
}
59+
d.conn = conn
60+
d.service = v1.NewEventsClient(ttrpc.NewClient(conn))
61+
}
62+
return d.service, nil
63+
}
64+
65+
func (d *dialer) Put(err error) {
66+
if err != nil {
67+
d.mu.Lock()
68+
d.conn.Close()
69+
d.service = nil
70+
d.mu.Unlock()
71+
}
72+
}
73+
74+
func (d *dialer) Close() (err error) {
75+
d.mu.Lock()
76+
if d.closed {
77+
return errDialerClosed
78+
}
79+
if d.conn != nil {
80+
err = d.conn.Close()
81+
}
82+
d.service = nil
83+
d.closed = true
84+
d.mu.Unlock()
85+
86+
return err
87+
}

runtime/v2/shim/publisher.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package shim
18+
19+
import (
20+
"context"
21+
"net"
22+
"sync"
23+
"time"
24+
25+
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
26+
"github.com/containerd/containerd/events"
27+
"github.com/containerd/containerd/namespaces"
28+
"github.com/containerd/typeurl"
29+
)
30+
31+
func newPublisher(address string) *remoteEventsPublisher {
32+
return &remoteEventsPublisher{
33+
dialer: newDialier(func() (net.Conn, error) {
34+
return connect(address, dial)
35+
}),
36+
closed: make(chan struct{}),
37+
}
38+
}
39+
40+
type remoteEventsPublisher struct {
41+
dialer *dialer
42+
closed chan struct{}
43+
closer sync.Once
44+
}
45+
46+
func (l *remoteEventsPublisher) Done() <-chan struct{} {
47+
return l.closed
48+
}
49+
50+
func (l *remoteEventsPublisher) Close() (err error) {
51+
err = l.dialer.Close()
52+
l.closer.Do(func() {
53+
close(l.closed)
54+
})
55+
return err
56+
}
57+
58+
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
59+
client, err := l.dialer.Get()
60+
if err != nil {
61+
return err
62+
}
63+
ns, err := namespaces.NamespaceRequired(ctx)
64+
if err != nil {
65+
return err
66+
}
67+
any, err := typeurl.MarshalAny(event)
68+
if err != nil {
69+
return err
70+
}
71+
if _, err := client.Forward(ctx, &v1.ForwardRequest{
72+
Envelope: &v1.Envelope{
73+
Timestamp: time.Now(),
74+
Namespace: ns,
75+
Topic: topic,
76+
Event: any,
77+
},
78+
}); err != nil {
79+
l.dialer.Put(err)
80+
return err
81+
}
82+
return nil
83+
}
84+
85+
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
86+
return d(address, 5*time.Second)
87+
}

runtime/v2/shim/shim.go

Lines changed: 2 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,17 @@ import (
2121
"flag"
2222
"fmt"
2323
"io"
24-
"net"
2524
"os"
2625
"runtime"
2726
"runtime/debug"
2827
"strings"
29-
"sync"
3028
"time"
3129

32-
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
3330
"github.com/containerd/containerd/events"
3431
"github.com/containerd/containerd/log"
3532
"github.com/containerd/containerd/namespaces"
3633
shimapi "github.com/containerd/containerd/runtime/v2/task"
3734
"github.com/containerd/ttrpc"
38-
"github.com/containerd/typeurl"
3935
"github.com/gogo/protobuf/proto"
4036
"github.com/pkg/errors"
4137
"github.com/sirupsen/logrus"
@@ -165,18 +161,10 @@ func run(id string, initFunc Init, config Config) error {
165161
}
166162
}
167163
address := fmt.Sprintf("%s.ttrpc", addressFlag)
168-
conn, err := connect(address, dialer)
169-
if err != nil {
170-
return err
171-
}
172-
publisher := &remoteEventsPublisher{
173-
address: address,
174-
conn: conn,
175-
closed: make(chan struct{}),
176-
}
164+
165+
publisher := newPublisher(address)
177166
defer publisher.Close()
178167

179-
publisher.client = v1.NewEventsClient(ttrpc.NewClient(conn))
180168
if namespaceFlag == "" {
181169
return fmt.Errorf("shim namespace cannot be empty")
182170
}
@@ -310,47 +298,3 @@ func dumpStacks(logger *logrus.Entry) {
310298
buf = buf[:stackSize]
311299
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
312300
}
313-
314-
type remoteEventsPublisher struct {
315-
address string
316-
conn net.Conn
317-
client v1.EventsService
318-
closed chan struct{}
319-
closer sync.Once
320-
}
321-
322-
func (l *remoteEventsPublisher) Done() <-chan struct{} {
323-
return l.closed
324-
}
325-
326-
func (l *remoteEventsPublisher) Close() (err error) {
327-
l.closer.Do(func() {
328-
err = l.conn.Close()
329-
close(l.closed)
330-
})
331-
return err
332-
}
333-
334-
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
335-
ns, err := namespaces.NamespaceRequired(ctx)
336-
if err != nil {
337-
return err
338-
}
339-
any, err := typeurl.MarshalAny(event)
340-
if err != nil {
341-
return err
342-
}
343-
_, err = l.client.Forward(ctx, &v1.ForwardRequest{
344-
Envelope: &v1.Envelope{
345-
Timestamp: time.Now(),
346-
Namespace: ns,
347-
Topic: topic,
348-
Event: any,
349-
},
350-
})
351-
return err
352-
}
353-
354-
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
355-
return d(address, 100*time.Second)
356-
}

runtime/v2/shim/shim_unix.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func openLog(ctx context.Context, _ string) (io.Writer, error) {
9494
return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700)
9595
}
9696

97-
func dialer(address string, timeout time.Duration) (net.Conn, error) {
97+
func dial(address string, timeout time.Duration) (net.Conn, error) {
9898
address = strings.TrimPrefix(address, "unix://")
9999
return net.DialTimeout("unix", address, timeout)
100100
}

runtime/v2/shim/shim_windows.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
287287
return dswl, nil
288288
}
289289

290-
func dialer(address string, timeout time.Duration) (net.Conn, error) {
290+
func dial(address string, timeout time.Duration) (net.Conn, error) {
291291
var c net.Conn
292292
var lastError error
293293
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)

0 commit comments

Comments
 (0)