Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.

Commit a3e7882

Browse files
author
Christian Häusler
authored
Handle SIGTERM to allow for graceful shutdown (#29)
Resolves #19.
1 parent ba4a504 commit a3e7882

7 files changed

+396
-110
lines changed

.codeclimate.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ checks:
44
method-complexity:
55
config:
66
threshold: 10
7-
method-length:
7+
method-lines:
88
config:
99
threshold: 32
1010

README.md

+8-1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ configure an empty string you have to be explicit by using the value `<empty>`.
9393

9494
rabbitmq-cli-consumer --verbose --url amqp://guest:guest@localhost --queue myqueue --executable command.php --configuration example.conf
9595

96+
### Graceful shutdown
97+
98+
The consumer handles the signal SIGTERM. When SIGTERM is received, the AMQP
99+
channel will be canceled, preventing any new messages from being consumed. This
100+
allows to stop the consumer but let a currently running executable to finishing
101+
and acknowledgement of the message.
102+
96103
## The executable
97104

98105
Your executable receives the message as the last argument. So consider the following:
@@ -249,7 +256,7 @@ the following.
249256

250257
```
251258

252-
Change your script acording to the following example.
259+
Change your script according to the following example.
253260

254261
```php
255262
#!/usr/bin/env php

config/config.go

+14
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package config
33
import (
44
"fmt"
55
"net/url"
6+
"os"
67
"path/filepath"
78

89
"gopkg.in/gcfg.v1"
@@ -164,6 +165,19 @@ func (c Config) WithDateTime() bool {
164165
return !c.Logs.NoDateTime
165166
}
166167

168+
// ConsumerTag returns the tag used to identify the consumer.
169+
func (c Config) ConsumerTag() string {
170+
if v, set := os.LookupEnv("GO_WANT_HELPER_PROCESS"); set && v == "1" {
171+
return ""
172+
}
173+
174+
host, err := os.Hostname()
175+
if err != nil {
176+
host = "unknown"
177+
}
178+
return fmt.Sprintf("ctag-%s-%d@%s", os.Args[0], os.Getpid(), host)
179+
}
180+
167181
// LoadAndParse creates a new instance of config by parsing the content of teh given file.
168182
func LoadAndParse(location string) (*Config, error) {
169183
if !filepath.IsAbs(location) {

consumer/consumer.go

+76-25
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package consumer
22

33
import (
4+
"context"
45
"fmt"
5-
"os"
66

77
"github.com/corvus-ch/rabbitmq-cli-consumer/config"
88
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
@@ -15,23 +15,26 @@ type Consumer struct {
1515
Connection Connection
1616
Channel Channel
1717
Queue string
18+
Tag string
19+
Processor processor.Processor
1820
Log logr.Logger
21+
canceled bool
1922
}
2023

2124
// New creates a new consumer instance. The setup of the amqp connection and channel is expected to be done by the
2225
// calling code.
23-
func New(conn Connection, ch Channel, queue string, l logr.Logger) *Consumer {
26+
func New(conn Connection, ch Channel, p processor.Processor, l logr.Logger) *Consumer {
2427
return &Consumer{
2528
Connection: conn,
2629
Channel: ch,
27-
Queue: queue,
30+
Processor: p,
2831
Log: l,
2932
}
3033
}
3134

3235
// NewFromConfig creates a new consumer instance. The setup of the amqp connection and channel is done according to the
3336
// configuration.
34-
func NewFromConfig(cfg *config.Config, l logr.Logger) (*Consumer, error) {
37+
func NewFromConfig(cfg *config.Config, p processor.Processor, l logr.Logger) (*Consumer, error) {
3538
l.Info("Connecting RabbitMQ...")
3639
conn, err := amqp.Dial(cfg.AmqpUrl())
3740
if nil != err {
@@ -52,44 +55,92 @@ func NewFromConfig(cfg *config.Config, l logr.Logger) (*Consumer, error) {
5255
Connection: conn,
5356
Channel: ch,
5457
Queue: cfg.RabbitMq.Queue,
58+
Tag: cfg.ConsumerTag(),
59+
Processor: p,
5560
Log: l,
5661
}, nil
5762
}
5863

59-
// ConnectionCloseHandler calls os.Exit after the connection to RabbitMQ got closed.
60-
func ConnectionCloseHandler(closeErr chan *amqp.Error, c *Consumer) {
61-
err := <-closeErr
62-
c.Log.Error("Connection closed: %v", err)
63-
os.Exit(10)
64-
}
65-
6664
// Consume subscribes itself to the message queue and starts consuming messages.
67-
func (c *Consumer) Consume(p processor.Processor) error {
65+
func (c *Consumer) Consume(ctx context.Context) error {
6866
c.Log.Info("Registering consumer... ")
69-
msgs, err := c.Channel.Consume(c.Queue, "", false, false, false, false, nil)
67+
msgs, err := c.Channel.Consume(c.Queue, c.Tag, false, false, false, false, nil)
7068
if err != nil {
7169
return fmt.Errorf("failed to register a consumer: %s", err)
7270
}
7371

7472
c.Log.Info("Succeeded registering consumer.")
73+
c.Log.Info("Waiting for messages...")
7574

76-
defer c.Connection.Close()
75+
done := make(chan error)
76+
go c.consume(msgs, done)
7777

78-
go ConnectionCloseHandler(c.Channel.NotifyClose(make(chan *amqp.Error)), c)
78+
select {
79+
case <-ctx.Done():
80+
c.canceled = true
81+
err := c.Channel.Cancel(c.Tag, false)
82+
if err != nil {
83+
return err
84+
}
85+
return <-done
7986

80-
c.Log.Info("Waiting for messages...")
87+
case err := <-done:
88+
return err
89+
}
90+
}
8191

82-
for d := range msgs {
83-
if err := p.Process(delivery.New(d)); err != nil {
84-
switch err.(type) {
85-
case *processor.CreateCommandError:
86-
c.Log.Error(err)
92+
func (c *Consumer) consume(msgs <-chan amqp.Delivery, done chan error) {
93+
for m := range msgs {
94+
d := delivery.New(m)
95+
if c.canceled {
96+
d.Nack(true)
97+
continue
98+
}
99+
if err := c.checkError(c.Processor.Process(d)); err != nil {
100+
done <- err
101+
return
102+
}
103+
}
104+
done <- nil
105+
}
106+
107+
func (c *Consumer) checkError(err error) error {
108+
switch err.(type) {
109+
case *processor.CreateCommandError:
110+
c.Log.Error(err)
111+
return nil
112+
113+
default:
114+
return err
115+
}
116+
}
87117

88-
default:
89-
return err
118+
// Close tears the connection down, taking the channel with it.
119+
func (c *Consumer) Close() error {
120+
if c.Connection == nil {
121+
return nil
122+
}
123+
return c.Connection.Close()
124+
}
125+
126+
// NotifyClose registers a listener for when the connection gets closed by the server.
127+
//
128+
// The chan provided will be closed when the Channel is closed and on a Graceful close, no error will be sent.
129+
func (c *Consumer) NotifyClose(receiver chan error) chan error {
130+
if c.Channel != nil {
131+
realChan := make(chan *amqp.Error)
132+
c.Channel.NotifyClose(realChan)
133+
134+
go func() {
135+
for {
136+
err, ok := <-realChan
137+
if !ok {
138+
return
139+
}
140+
receiver <- err
90141
}
91-
}
142+
}()
92143
}
93144

94-
return nil
145+
return receiver
95146
}

consumer/consumer_consume_test.go

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package consumer_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
log "github.com/corvus-ch/logr/buffered"
10+
"github.com/corvus-ch/rabbitmq-cli-consumer/consumer"
11+
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
12+
"github.com/corvus-ch/rabbitmq-cli-consumer/processor"
13+
"github.com/streadway/amqp"
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
const intMax = int(^uint(0) >> 1)
18+
19+
type setupFunc func(*testing.T, *consumeTest) error
20+
21+
type consumeTest struct {
22+
Name string
23+
Setup setupFunc
24+
Output string
25+
Tag string
26+
27+
sync chan bool
28+
done chan error
29+
msgs chan amqp.Delivery
30+
ch *TestChannel
31+
p *TestProcessor
32+
a *TestAmqpAcknowledger
33+
dd []amqp.Delivery
34+
cancelCount int
35+
}
36+
37+
func newSimpleConsumeTest(name, output string, setup setupFunc) *consumeTest {
38+
return newConsumeTest(name, output, 1, intMax, setup)
39+
}
40+
41+
func newConsumeTest(name, output string, count uint64, cancelCount int, setup setupFunc) *consumeTest {
42+
a := new(TestAmqpAcknowledger)
43+
dd := make([]amqp.Delivery, count)
44+
for i := uint64(0); i < count; i++ {
45+
dd[i] = amqp.Delivery{Acknowledger: a, DeliveryTag: i}
46+
}
47+
return &consumeTest{
48+
Name: name,
49+
Output: output,
50+
Setup: setup,
51+
Tag: "ctag",
52+
53+
sync: make(chan bool),
54+
done: make(chan error),
55+
msgs: make(chan amqp.Delivery),
56+
ch: new(TestChannel),
57+
p: new(TestProcessor),
58+
a: a,
59+
dd: dd,
60+
cancelCount: cancelCount,
61+
}
62+
}
63+
64+
func (ct *consumeTest) Run(t *testing.T) {
65+
exp := ct.Setup(t, ct)
66+
l := log.New(0)
67+
c := consumer.New(nil, ct.ch, ct.p, l)
68+
c.Queue = t.Name()
69+
c.Tag = ct.Tag
70+
ctx, cancel := context.WithCancel(context.Background())
71+
go func() {
72+
ct.done <- c.Consume(ctx)
73+
}()
74+
go ct.produce(cancel)
75+
assert.Equal(t, exp, <-ct.done)
76+
assert.Equal(t, ct.Output, l.Buf().String())
77+
ct.ch.AssertExpectations(t)
78+
ct.p.AssertExpectations(t)
79+
ct.a.AssertExpectations(t)
80+
}
81+
82+
func (ct *consumeTest) produce(cancel func()) {
83+
defer close(ct.msgs)
84+
if len(ct.dd) == 0 && ct.cancelCount == 0 {
85+
cancel()
86+
return
87+
}
88+
for i, d := range ct.dd {
89+
go func() {
90+
if i >= ct.cancelCount {
91+
<-ct.sync
92+
cancel()
93+
time.Sleep(time.Second)
94+
ct.sync <- true
95+
return
96+
}
97+
}()
98+
ct.msgs <- d
99+
}
100+
}
101+
102+
var consumeTests = []*consumeTest{
103+
newConsumeTest(
104+
"happy path",
105+
"INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n",
106+
3,
107+
intMax,
108+
func(t *testing.T, ct *consumeTest) error {
109+
ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable).
110+
Once().
111+
Return(ct.msgs, nil)
112+
ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(nil)
113+
ct.p.On("Process", delivery.New(ct.dd[1])).Once().Return(nil)
114+
ct.p.On("Process", delivery.New(ct.dd[2])).Once().Return(nil)
115+
return nil
116+
},
117+
),
118+
newSimpleConsumeTest(
119+
"consume error",
120+
"INFO Registering consumer... \n",
121+
func(t *testing.T, ct *consumeTest) error {
122+
ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable).
123+
Once().
124+
Return(nil, fmt.Errorf("consume error"))
125+
return fmt.Errorf("failed to register a consumer: consume error")
126+
},
127+
),
128+
newSimpleConsumeTest(
129+
"process error",
130+
"INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n",
131+
func(t *testing.T, ct *consumeTest) error {
132+
err := fmt.Errorf("process error")
133+
ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable).
134+
Once().
135+
Return(ct.msgs, nil)
136+
ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(err)
137+
return err
138+
},
139+
),
140+
newSimpleConsumeTest(
141+
"create command error",
142+
"INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\nERROR failed to register a consumer: create command error\n",
143+
func(t *testing.T, ct *consumeTest) error {
144+
err := processor.NewCreateCommandError(fmt.Errorf("create command error"))
145+
ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable).
146+
Once().
147+
Return(ct.msgs, nil)
148+
ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(err)
149+
return nil
150+
},
151+
),
152+
newSimpleConsumeTest(
153+
"ack error",
154+
"INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n",
155+
func(t *testing.T, ct *consumeTest) error {
156+
err := processor.NewAcknowledgmentError(fmt.Errorf("ack error"))
157+
ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable).
158+
Once().
159+
Return(ct.msgs, nil)
160+
ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(err)
161+
return err
162+
},
163+
),
164+
}
165+
166+
func TestConsumer_Consume(t *testing.T) {
167+
for _, test := range consumeTests {
168+
t.Run(test.Name, test.Run)
169+
}
170+
}

0 commit comments

Comments
 (0)