Skip to content

Commit 906e8bc

Browse files
authored
Merge pull request #3207 from crosbymichael/ttrpc-deps
Requeue events in the ttrpc publisher
2 parents 5f4c977 + 63c7a87 commit 906e8bc

2 files changed

Lines changed: 75 additions & 17 deletions

File tree

runtime/v2/shim/publisher.go

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,37 @@ import (
2626
"github.com/containerd/containerd/events"
2727
"github.com/containerd/containerd/namespaces"
2828
"github.com/containerd/typeurl"
29+
"github.com/sirupsen/logrus"
2930
)
3031

32+
const (
33+
queueSize = 2048
34+
maxRequeue = 5
35+
)
36+
37+
type item struct {
38+
ev *v1.Envelope
39+
ctx context.Context
40+
count int
41+
}
42+
3143
func newPublisher(address string) *remoteEventsPublisher {
32-
return &remoteEventsPublisher{
44+
l := &remoteEventsPublisher{
3345
dialer: newDialier(func() (net.Conn, error) {
3446
return connect(address, dial)
3547
}),
36-
closed: make(chan struct{}),
48+
closed: make(chan struct{}),
49+
requeue: make(chan *item, queueSize),
3750
}
51+
go l.processQueue()
52+
return l
3853
}
3954

4055
type remoteEventsPublisher struct {
41-
dialer *dialer
42-
closed chan struct{}
43-
closer sync.Once
56+
dialer *dialer
57+
closed chan struct{}
58+
closer sync.Once
59+
requeue chan *item
4460
}
4561

4662
func (l *remoteEventsPublisher) Done() <-chan struct{} {
@@ -55,11 +71,42 @@ func (l *remoteEventsPublisher) Close() (err error) {
5571
return err
5672
}
5773

58-
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
59-
client, err := l.dialer.Get()
60-
if err != nil {
61-
return err
74+
func (l *remoteEventsPublisher) processQueue() {
75+
for i := range l.requeue {
76+
if i.count > maxRequeue {
77+
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
78+
// drop the event
79+
continue
80+
}
81+
82+
client, err := l.dialer.Get()
83+
if err != nil {
84+
l.dialer.Put(err)
85+
86+
l.queue(i)
87+
logrus.WithError(err).Error("get events client")
88+
continue
89+
}
90+
if _, err := client.Forward(i.ctx, &v1.ForwardRequest{
91+
Envelope: i.ev,
92+
}); err != nil {
93+
l.dialer.Put(err)
94+
logrus.WithError(err).Error("forward event")
95+
l.queue(i)
96+
}
6297
}
98+
}
99+
100+
func (l *remoteEventsPublisher) queue(i *item) {
101+
go func() {
102+
i.count++
103+
// re-queue after a short delay
104+
time.Sleep(time.Duration(1*i.count) * time.Second)
105+
l.requeue <- i
106+
}()
107+
}
108+
109+
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
63110
ns, err := namespaces.NamespaceRequired(ctx)
64111
if err != nil {
65112
return err
@@ -68,15 +115,26 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
68115
if err != nil {
69116
return err
70117
}
71-
if _, err := client.Forward(ctx, &v1.ForwardRequest{
72-
Envelope: &v1.Envelope{
118+
i := &item{
119+
ev: &v1.Envelope{
73120
Timestamp: time.Now(),
74121
Namespace: ns,
75122
Topic: topic,
76123
Event: any,
77124
},
125+
ctx: ctx,
126+
}
127+
client, err := l.dialer.Get()
128+
if err != nil {
129+
l.dialer.Put(err)
130+
l.queue(i)
131+
return err
132+
}
133+
if _, err := client.Forward(i.ctx, &v1.ForwardRequest{
134+
Envelope: i.ev,
78135
}); err != nil {
79136
l.dialer.Put(err)
137+
l.queue(i)
80138
return err
81139
}
82140
return nil

services/server/server.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,15 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
107107
}
108108
var (
109109
grpcServer = grpc.NewServer(serverOpts...)
110-
hrpc = grpc.NewServer(tcpServerOpts...)
110+
tcpServer = grpc.NewServer(tcpServerOpts...)
111111

112112
grpcServices []plugin.Service
113113
tcpServices []plugin.TCPService
114114
ttrpcServices []plugin.TTRPCService
115115

116116
s = &Server{
117117
grpcServer: grpcServer,
118-
hrpc: hrpc,
118+
tcpServer: tcpServer,
119119
ttrpcServer: ttrpcServer,
120120
events: exchange.NewExchange(),
121121
config: config,
@@ -199,7 +199,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
199199
}
200200
}
201201
for _, service := range tcpServices {
202-
if err := service.RegisterTCP(hrpc); err != nil {
202+
if err := service.RegisterTCP(tcpServer); err != nil {
203203
return nil, err
204204
}
205205
}
@@ -210,7 +210,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
210210
type Server struct {
211211
grpcServer *grpc.Server
212212
ttrpcServer *ttrpc.Server
213-
hrpc *grpc.Server
213+
tcpServer *grpc.Server
214214
events *exchange.Exchange
215215
config *srvconfig.Config
216216
plugins []*plugin.Plugin
@@ -243,8 +243,8 @@ func (s *Server) ServeMetrics(l net.Listener) error {
243243

244244
// ServeTCP allows services to serve over tcp
245245
func (s *Server) ServeTCP(l net.Listener) error {
246-
grpc_prometheus.Register(s.hrpc)
247-
return trapClosedConnErr(s.hrpc.Serve(l))
246+
grpc_prometheus.Register(s.tcpServer)
247+
return trapClosedConnErr(s.tcpServer.Serve(l))
248248
}
249249

250250
// ServeDebug provides a debug endpoint

0 commit comments

Comments
 (0)