Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.

Commit 261fa7a

Browse files
author
Christian Häusler
authored
Clean and fix shutdown handling (#52)
Resolves #51
1 parent 2e639a6 commit 261fa7a

File tree

5 files changed

+72
-52
lines changed

5 files changed

+72
-52
lines changed

consumer/consumer.go

+9-33
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package consumer
33
import (
44
"context"
55
"fmt"
6-
"sync"
7-
86
"github.com/bketelsen/logr"
97
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
108
"github.com/corvus-ch/rabbitmq-cli-consumer/processor"
@@ -19,9 +17,6 @@ type Consumer struct {
1917
Processor processor.Processor
2018
Log logr.Logger
2119
canceled bool
22-
23-
// wg is used to ensure NotifyClose() gets handled before the consumer exits.
24-
wg sync.WaitGroup
2520
}
2621

2722
// New creates a new consumer instance. The setup of the amqp connection and channel is expected to be done by the
@@ -77,17 +72,23 @@ func (c *Consumer) Consume(ctx context.Context) error {
7772
c.Log.Info("Succeeded registering consumer.")
7873
c.Log.Info("Waiting for messages...")
7974

75+
remoteClose := make(chan *amqp.Error)
76+
c.Channel.NotifyClose(remoteClose)
77+
8078
done := make(chan error)
8179
go c.consume(msgs, done)
8280

8381
select {
82+
case err := <-remoteClose:
83+
return err
84+
8485
case <-ctx.Done():
8586
c.canceled = true
8687
err := c.Channel.Cancel(c.Tag, false)
87-
if err != nil {
88-
return err
88+
if err == nil {
89+
err = <-done
8990
}
90-
return <-done
91+
return err
9192

9293
case err := <-done:
9394
return err
@@ -106,7 +107,6 @@ func (c *Consumer) consume(msgs <-chan amqp.Delivery, done chan error) {
106107
return
107108
}
108109
}
109-
c.wg.Wait()
110110
done <- nil
111111
}
112112

@@ -128,27 +128,3 @@ func (c *Consumer) Close() error {
128128
}
129129
return c.Connection.Close()
130130
}
131-
132-
// NotifyClose registers a listener for when the connection gets closed by the server.
133-
//
134-
// The chan provided will be closed when the Channel is closed and on a Graceful close, no error will be sent.
135-
func (c *Consumer) NotifyClose(receiver chan error) chan error {
136-
if c.Channel != nil {
137-
c.wg.Add(1)
138-
realChan := make(chan *amqp.Error)
139-
c.Channel.NotifyClose(realChan)
140-
141-
go func() {
142-
for {
143-
err, ok := <-realChan
144-
if !ok {
145-
c.wg.Done()
146-
return
147-
}
148-
receiver <- err
149-
}
150-
}()
151-
}
152-
153-
return receiver
154-
}

consumer/consumer_consume_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,31 @@ func TestConsumer_Consume(t *testing.T) {
168168
t.Run(test.Name, test.Run)
169169
}
170170
}
171+
172+
func TestConsumer_Consume_NotifyClose(t *testing.T) {
173+
ch := new(TestChannel)
174+
d := make(chan amqp.Delivery)
175+
done := make(chan error)
176+
l := log.New(0)
177+
178+
ch.On("Consume", "", "", false, false, false, false, nilAmqpTable).Once().Return(d, nil)
179+
180+
c := consumer.New(nil, ch, new(TestProcessor), l)
181+
182+
go func() {
183+
done <- c.Consume(context.Background())
184+
}()
185+
186+
retry := 5
187+
for !ch.TriggerNotifyClose("server close") && retry > 0 {
188+
retry--
189+
if retry == 0 {
190+
t.Fatal("No notify handler registered.")
191+
}
192+
// When called too early, the close handler is not yet registered. Try again later.
193+
time.Sleep(time.Millisecond)
194+
}
195+
196+
assert.Equal(t, &amqp.Error{Reason: "server close", Code: 320}, <-done)
197+
ch.AssertExpectations(t)
198+
}

consumer/consumer_test.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"testing"
7+
"time"
78

89
log "github.com/corvus-ch/logr/buffered"
910
"github.com/corvus-ch/rabbitmq-cli-consumer/consumer"
@@ -97,21 +98,20 @@ func TestConsumer_Cancel(t *testing.T) {
9798
t.Run("error", func(t *testing.T) {
9899
testConsumerCancel(t, fmt.Errorf("cancel error"))
99100
})
101+
t.Run("notify no block", func(t *testing.T) {
102+
ch := make(chan bool)
103+
go func() {
104+
testConsumerCancel(t, nil)
105+
ch <- true
106+
}()
107+
select {
108+
case <-ch:
109+
// Intentionally left blank.
110+
case <-time.After(5 * time.Second):
111+
t.Error("Timeout because notify handler is blocking cancel")
112+
}
113+
})
100114
for _, test := range cancelTests {
101115
t.Run(test.Name, test.Run)
102116
}
103117
}
104-
105-
func TestConsumer_NotifyClose(t *testing.T) {
106-
err := amqp.ErrClosed
107-
done := make(chan error)
108-
var realChan chan *amqp.Error
109-
ch := new(TestChannel)
110-
ch.On("NotifyClose", mock.Anything).Return(done).Run(func(args mock.Arguments) {
111-
realChan = args.Get(0).(chan *amqp.Error)
112-
})
113-
c := consumer.New(nil, ch, nil, log.New(0))
114-
assert.Equal(t, done, c.NotifyClose(done))
115-
realChan <- err
116-
assert.Equal(t, err, <-done)
117-
}

consumer/mock_test.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func (t *TestConnection) Channel() (*amqp.Channel, error) {
2828
type TestChannel struct {
2929
consumer.Channel
3030
mock.Mock
31+
notifyClose chan *amqp.Error
3132
}
3233

3334
func (t *TestChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
@@ -37,11 +38,21 @@ func (t *TestChannel) ExchangeDeclare(name, kind string, durable, autoDelete, in
3738
}
3839

3940
func (t *TestChannel) NotifyClose(c chan *amqp.Error) chan *amqp.Error {
40-
t.Called(c)
41-
41+
t.notifyClose = c
4242
return c
4343
}
4444

45+
func (t *TestChannel) TriggerNotifyClose(reason string) bool {
46+
if t.notifyClose != nil {
47+
t.notifyClose <- &amqp.Error{
48+
Reason: reason,
49+
Code: 320,
50+
}
51+
return true
52+
}
53+
return false
54+
}
55+
4556
func (t *TestChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
4657
argsT := t.Called(name, durable, autoDelete, exclusive, noWait, args)
4758

main.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
stdlog "log"
77
"os"
88
"os/signal"
9+
"strings"
910
"syscall"
1011

1112
"github.com/bketelsen/logr"
@@ -16,6 +17,7 @@ import (
1617
"github.com/corvus-ch/rabbitmq-cli-consumer/consumer"
1718
"github.com/corvus-ch/rabbitmq-cli-consumer/log"
1819
"github.com/corvus-ch/rabbitmq-cli-consumer/processor"
20+
"github.com/streadway/amqp"
1921
)
2022

2123
var (
@@ -140,9 +142,6 @@ func consume(client *consumer.Consumer, l logr.Logger) error {
140142
}()
141143

142144
select {
143-
case err := <-client.NotifyClose(make(chan error)):
144-
return cli.NewExitError(fmt.Sprintf("connection closed: %v", err), 10)
145-
146145
case <-sig:
147146
l.Info("Cancel consumption of messages.")
148147
cancel()
@@ -155,6 +154,12 @@ func consume(client *consumer.Consumer, l logr.Logger) error {
155154

156155
func checkConsumeError(err error) error {
157156
switch err.(type) {
157+
case *amqp.Error:
158+
if strings.Contains(err.Error(), "Exception (320) Reason:") {
159+
return cli.NewExitError(fmt.Sprintf("connection closed: %v", err.(*amqp.Error).Reason), 10)
160+
}
161+
return err
162+
158163
case *processor.AcknowledgmentError:
159164
return cli.NewExitError(err, 11)
160165

0 commit comments

Comments
 (0)