@@ -26,21 +26,37 @@ import (
2626 "github.com/containerd/containerd/events"
2727 "github.com/containerd/containerd/namespaces"
2828 "github.com/containerd/typeurl"
29+ "github.com/sirupsen/logrus"
2930)
3031
32+ const (
33+ queueSize = 2048
34+ maxRequeue = 5
35+ )
36+
37+ type item struct {
38+ ev * v1.Envelope
39+ ctx context.Context
40+ count int
41+ }
42+
3143func newPublisher (address string ) * remoteEventsPublisher {
32- return & remoteEventsPublisher {
44+ l := & remoteEventsPublisher {
3345 dialer : newDialier (func () (net.Conn , error ) {
3446 return connect (address , dial )
3547 }),
36- closed : make (chan struct {}),
48+ closed : make (chan struct {}),
49+ requeue : make (chan * item , queueSize ),
3750 }
51+ go l .processQueue ()
52+ return l
3853}
3954
4055type remoteEventsPublisher struct {
41- dialer * dialer
42- closed chan struct {}
43- closer sync.Once
56+ dialer * dialer
57+ closed chan struct {}
58+ closer sync.Once
59+ requeue chan * item
4460}
4561
4662func (l * remoteEventsPublisher ) Done () <- chan struct {} {
@@ -55,11 +71,42 @@ func (l *remoteEventsPublisher) Close() (err error) {
5571 return err
5672}
5773
58- func (l * remoteEventsPublisher ) Publish (ctx context.Context , topic string , event events.Event ) error {
59- client , err := l .dialer .Get ()
60- if err != nil {
61- return err
74+ func (l * remoteEventsPublisher ) processQueue () {
75+ for i := range l .requeue {
76+ if i .count > maxRequeue {
77+ logrus .Errorf ("evicting %s from queue because of retry count" , i .ev .Topic )
78+ // drop the event
79+ continue
80+ }
81+
82+ client , err := l .dialer .Get ()
83+ if err != nil {
84+ l .dialer .Put (err )
85+
86+ l .queue (i )
87+ logrus .WithError (err ).Error ("get events client" )
88+ continue
89+ }
90+ if _ , err := client .Forward (i .ctx , & v1.ForwardRequest {
91+ Envelope : i .ev ,
92+ }); err != nil {
93+ l .dialer .Put (err )
94+ logrus .WithError (err ).Error ("forward event" )
95+ l .queue (i )
96+ }
6297 }
98+ }
99+
100+ func (l * remoteEventsPublisher ) queue (i * item ) {
101+ go func () {
102+ i .count ++
103+ // re-queue after a short delay
104+ time .Sleep (time .Duration (1 * i .count ) * time .Second )
105+ l .requeue <- i
106+ }()
107+ }
108+
109+ func (l * remoteEventsPublisher ) Publish (ctx context.Context , topic string , event events.Event ) error {
63110 ns , err := namespaces .NamespaceRequired (ctx )
64111 if err != nil {
65112 return err
@@ -68,15 +115,26 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
68115 if err != nil {
69116 return err
70117 }
71- if _ , err := client . Forward ( ctx , & v1. ForwardRequest {
72- Envelope : & v1.Envelope {
118+ i := & item {
119+ ev : & v1.Envelope {
73120 Timestamp : time .Now (),
74121 Namespace : ns ,
75122 Topic : topic ,
76123 Event : any ,
77124 },
125+ ctx : ctx ,
126+ }
127+ client , err := l .dialer .Get ()
128+ if err != nil {
129+ l .dialer .Put (err )
130+ l .queue (i )
131+ return err
132+ }
133+ if _ , err := client .Forward (i .ctx , & v1.ForwardRequest {
134+ Envelope : i .ev ,
78135 }); err != nil {
79136 l .dialer .Put (err )
137+ l .queue (i )
80138 return err
81139 }
82140 return nil
0 commit comments