Skip to content

Commit 63c7a87

Browse files
committed
Requeue events in the shim publisher
Signed-off-by: Michael Crosby <[email protected]>
1 parent 97d247c commit 63c7a87

1 file changed

Lines changed: 69 additions & 11 deletions

File tree

runtime/v2/shim/publisher.go

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,37 @@ import (
2626
"github.com/containerd/containerd/events"
2727
"github.com/containerd/containerd/namespaces"
2828
"github.com/containerd/typeurl"
29+
"github.com/sirupsen/logrus"
2930
)
3031

32+
const (
33+
queueSize = 2048
34+
maxRequeue = 5
35+
)
36+
37+
type item struct {
38+
ev *v1.Envelope
39+
ctx context.Context
40+
count int
41+
}
42+
3143
func newPublisher(address string) *remoteEventsPublisher {
32-
return &remoteEventsPublisher{
44+
l := &remoteEventsPublisher{
3345
dialer: newDialier(func() (net.Conn, error) {
3446
return connect(address, dial)
3547
}),
36-
closed: make(chan struct{}),
48+
closed: make(chan struct{}),
49+
requeue: make(chan *item, queueSize),
3750
}
51+
go l.processQueue()
52+
return l
3853
}
3954

4055
type remoteEventsPublisher struct {
41-
dialer *dialer
42-
closed chan struct{}
43-
closer sync.Once
56+
dialer *dialer
57+
closed chan struct{}
58+
closer sync.Once
59+
requeue chan *item
4460
}
4561

4662
func (l *remoteEventsPublisher) Done() <-chan struct{} {
@@ -55,11 +71,42 @@ func (l *remoteEventsPublisher) Close() (err error) {
5571
return err
5672
}
5773

58-
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
59-
client, err := l.dialer.Get()
60-
if err != nil {
61-
return err
74+
func (l *remoteEventsPublisher) processQueue() {
75+
for i := range l.requeue {
76+
if i.count > maxRequeue {
77+
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
78+
// drop the event
79+
continue
80+
}
81+
82+
client, err := l.dialer.Get()
83+
if err != nil {
84+
l.dialer.Put(err)
85+
86+
l.queue(i)
87+
logrus.WithError(err).Error("get events client")
88+
continue
89+
}
90+
if _, err := client.Forward(i.ctx, &v1.ForwardRequest{
91+
Envelope: i.ev,
92+
}); err != nil {
93+
l.dialer.Put(err)
94+
logrus.WithError(err).Error("forward event")
95+
l.queue(i)
96+
}
6297
}
98+
}
99+
100+
func (l *remoteEventsPublisher) queue(i *item) {
101+
go func() {
102+
i.count++
103+
// re-queue after a short delay
104+
time.Sleep(time.Duration(1*i.count) * time.Second)
105+
l.requeue <- i
106+
}()
107+
}
108+
109+
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
63110
ns, err := namespaces.NamespaceRequired(ctx)
64111
if err != nil {
65112
return err
@@ -68,15 +115,26 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
68115
if err != nil {
69116
return err
70117
}
71-
if _, err := client.Forward(ctx, &v1.ForwardRequest{
72-
Envelope: &v1.Envelope{
118+
i := &item{
119+
ev: &v1.Envelope{
73120
Timestamp: time.Now(),
74121
Namespace: ns,
75122
Topic: topic,
76123
Event: any,
77124
},
125+
ctx: ctx,
126+
}
127+
client, err := l.dialer.Get()
128+
if err != nil {
129+
l.dialer.Put(err)
130+
l.queue(i)
131+
return err
132+
}
133+
if _, err := client.Forward(i.ctx, &v1.ForwardRequest{
134+
Envelope: i.ev,
78135
}); err != nil {
79136
l.dialer.Put(err)
137+
l.queue(i)
80138
return err
81139
}
82140
return nil

0 commit comments

Comments
 (0)