@@ -21,21 +21,17 @@ import (
2121 "flag"
2222 "fmt"
2323 "io"
24- "net"
2524 "os"
2625 "runtime"
2726 "runtime/debug"
2827 "strings"
29- "sync"
3028 "time"
3129
32- v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
3330 "github.com/containerd/containerd/events"
3431 "github.com/containerd/containerd/log"
3532 "github.com/containerd/containerd/namespaces"
3633 shimapi "github.com/containerd/containerd/runtime/v2/task"
3734 "github.com/containerd/ttrpc"
38- "github.com/containerd/typeurl"
3935 "github.com/gogo/protobuf/proto"
4036 "github.com/pkg/errors"
4137 "github.com/sirupsen/logrus"
@@ -165,18 +161,10 @@ func run(id string, initFunc Init, config Config) error {
165161 }
166162 }
167163 address := fmt .Sprintf ("%s.ttrpc" , addressFlag )
168- conn , err := connect (address , dialer )
169- if err != nil {
170- return err
171- }
172- publisher := & remoteEventsPublisher {
173- address : address ,
174- conn : conn ,
175- closed : make (chan struct {}),
176- }
164+
165+ publisher := newPublisher (address )
177166 defer publisher .Close ()
178167
179- publisher .client = v1 .NewEventsClient (ttrpc .NewClient (conn ))
180168 if namespaceFlag == "" {
181169 return fmt .Errorf ("shim namespace cannot be empty" )
182170 }
@@ -310,47 +298,3 @@ func dumpStacks(logger *logrus.Entry) {
310298 buf = buf [:stackSize ]
311299 logger .Infof ("=== BEGIN goroutine stack dump ===\n %s\n === END goroutine stack dump ===" , buf )
312300}
313-
314- type remoteEventsPublisher struct {
315- address string
316- conn net.Conn
317- client v1.EventsService
318- closed chan struct {}
319- closer sync.Once
320- }
321-
322- func (l * remoteEventsPublisher ) Done () <- chan struct {} {
323- return l .closed
324- }
325-
326- func (l * remoteEventsPublisher ) Close () (err error ) {
327- l .closer .Do (func () {
328- err = l .conn .Close ()
329- close (l .closed )
330- })
331- return err
332- }
333-
334- func (l * remoteEventsPublisher ) Publish (ctx context.Context , topic string , event events.Event ) error {
335- ns , err := namespaces .NamespaceRequired (ctx )
336- if err != nil {
337- return err
338- }
339- any , err := typeurl .MarshalAny (event )
340- if err != nil {
341- return err
342- }
343- _ , err = l .client .Forward (ctx , & v1.ForwardRequest {
344- Envelope : & v1.Envelope {
345- Timestamp : time .Now (),
346- Namespace : ns ,
347- Topic : topic ,
348- Event : any ,
349- },
350- })
351- return err
352- }
353-
354- func connect (address string , d func (string , time.Duration ) (net.Conn , error )) (net.Conn , error ) {
355- return d (address , 100 * time .Second )
356- }
0 commit comments