Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.

Commit 5b289c7

Browse files
author
Christian Häusler
authored
Add option/setting to prevent declaration of queue (#37)
In certain setups, queue will already be declared when a consumer gets started. Starting the consumer will fail if the declared queue and the consumers settings mismatch. In order to prevent the need for managing the queue settings in multiple places, one might want to prevent the consumer from declaring the queue. This change adds a new cli option `-no-declare` and a new setting `nodeclare` to the `[queuesettings]` section of the config file which allows to do exactly this. resolves #36.
1 parent 543aa84 commit 5b289c7

12 files changed

+175
-39
lines changed

config/config.go

+27
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Config struct {
3232
DeadLetterExchange string
3333
DeadLetterRoutingKey string
3434
Priority int
35+
Nodeclare bool
3536
}
3637
Exchange struct {
3738
Name string
@@ -76,6 +77,16 @@ func (c *Config) AmqpUrl() string {
7677
return c.RabbitMq.AmqpUrl
7778
}
7879

80+
// QueueName returns the name of toe queue to bind with.
81+
func (c Config) QueueName() string {
82+
return c.RabbitMq.Queue
83+
}
84+
85+
// MustDeclareQueue return if the consumer should declare the queue or if the queue is expected to be already declared.
86+
func (c Config) MustDeclareQueue() bool {
87+
return !c.QueueSettings.Nodeclare
88+
}
89+
7990
// HasExchange checks if an exchange is configured.
8091
func (c Config) HasExchange() bool {
8192
return c.Exchange.Name != ""
@@ -96,6 +107,16 @@ func (c Config) ExchangeType() string {
96107
return c.Exchange.Type
97108
}
98109

110+
// ExchangeIsDurable returns whether the exchange should be durable or not.
111+
func (c Config) ExchangeIsDurable() bool {
112+
return c.Exchange.Durable
113+
}
114+
115+
// ExchangeIsAutoDelete return whether the exchange should be auto deleted or not.
116+
func (c Config) ExchangeIsAutoDelete() bool {
117+
return c.Exchange.Autodelete
118+
}
119+
99120
// PrefetchCount returns the configured prefetch count of the QoS settings.
100121
func (c Config) PrefetchCount() int {
101122
// Attempt to preserve BC here
@@ -106,6 +127,12 @@ func (c Config) PrefetchCount() int {
106127
return c.Prefetch.Count
107128
}
108129

130+
// PrefetchIsGlobal returns if the prefetch count is defined globally for all consumers or locally for just each single
131+
// consumer.
132+
func (c Config) PrefetchIsGlobal() bool {
133+
return c.Prefetch.Global
134+
}
135+
109136
// HasMessageTTL checks if a message TTL is configured.
110137
func (c Config) HasMessageTTL() bool {
111138
return c.QueueSettings.MessageTTL > 0

consumer/config.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package consumer
2+
3+
// Config defines the interface to present configurations to the consumer.
4+
type Config interface {
5+
AmqpUrl() string
6+
ConsumerTag() string
7+
DeadLetterExchange() string
8+
DeadLetterRoutingKey() string
9+
ExchangeIsAutoDelete() bool
10+
ExchangeIsDurable() bool
11+
ExchangeName() string
12+
ExchangeType() string
13+
HasDeadLetterExchange() bool
14+
HasDeadLetterRouting() bool
15+
HasExchange() bool
16+
HasMessageTTL() bool
17+
HasPriority() bool
18+
MessageTTL() int32
19+
MustDeclareQueue() bool
20+
PrefetchCount() int
21+
PrefetchIsGlobal() bool
22+
Priority() int32
23+
QueueName() string
24+
RoutingKeys() []string
25+
}

consumer/consumer.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/corvus-ch/rabbitmq-cli-consumer/config"
87
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
98
"github.com/corvus-ch/rabbitmq-cli-consumer/processor"
109
"github.com/streadway/amqp"
@@ -34,7 +33,7 @@ func New(conn Connection, ch Channel, p processor.Processor, l logr.Logger) *Con
3433

3534
// NewFromConfig creates a new consumer instance. The setup of the amqp connection and channel is done according to the
3635
// configuration.
37-
func NewFromConfig(cfg *config.Config, p processor.Processor, l logr.Logger) (*Consumer, error) {
36+
func NewFromConfig(cfg Config, p processor.Processor, l logr.Logger) (*Consumer, error) {
3837
l.Info("Connecting RabbitMQ...")
3938
conn, err := amqp.Dial(cfg.AmqpUrl())
4039
if nil != err {
@@ -54,7 +53,7 @@ func NewFromConfig(cfg *config.Config, p processor.Processor, l logr.Logger) (*C
5453
return &Consumer{
5554
Connection: conn,
5655
Channel: ch,
57-
Queue: cfg.RabbitMq.Queue,
56+
Queue: cfg.QueueName(),
5857
Tag: cfg.ConsumerTag(),
5958
Processor: p,
6059
Log: l,

consumer/setup.go

+19-17
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,22 @@ package consumer
33
import (
44
"fmt"
55

6-
"github.com/corvus-ch/rabbitmq-cli-consumer/config"
76
"github.com/streadway/amqp"
87
"github.com/thockin/logr"
98
)
109

1110
// Setup configures queues, exchanges and bindings in between according to the configuration.
12-
func Setup(cfg *config.Config, ch Channel, l logr.Logger) error {
11+
func Setup(cfg Config, ch Channel, l logr.Logger) error {
1312
if err := setupQoS(cfg, ch, l); err != nil {
1413
return err
1514
}
1615

17-
if err := declareQueue(cfg, ch, l); err != nil {
18-
return err
16+
if cfg.MustDeclareQueue() {
17+
if err := declareQueue(cfg, ch, l); err != nil {
18+
return err
19+
}
1920
}
21+
2022
// Empty Exchange name means default, no need to declare
2123
if cfg.HasExchange() {
2224
if err := declareExchange(cfg, ch, l); err != nil {
@@ -27,32 +29,32 @@ func Setup(cfg *config.Config, ch Channel, l logr.Logger) error {
2729
return nil
2830
}
2931

30-
func setupQoS(cfg *config.Config, ch Channel, l logr.Logger) error {
32+
func setupQoS(cfg Config, ch Channel, l logr.Logger) error {
3133
l.Info("Setting QoS... ")
32-
if err := ch.Qos(cfg.PrefetchCount(), 0, cfg.Prefetch.Global); err != nil {
34+
if err := ch.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal()); err != nil {
3335
return fmt.Errorf("failed to set QoS: %v", err)
3436
}
3537
l.Info("Succeeded setting QoS.")
3638
return nil
3739
}
3840

39-
func declareQueue(cfg *config.Config, ch Channel, l logr.Logger) error {
40-
l.Infof("Declaring queue \"%s\"...", cfg.RabbitMq.Queue)
41-
_, err := ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, queueArgs(cfg))
41+
func declareQueue(cfg Config, ch Channel, l logr.Logger) error {
42+
l.Infof("Declaring queue \"%s\"...", cfg.QueueName())
43+
_, err := ch.QueueDeclare(cfg.QueueName(), true, false, false, false, queueArgs(cfg))
4244
if nil != err {
4345
return fmt.Errorf("failed to declare queue: %v", err)
4446
}
4547

4648
return nil
4749
}
4850

49-
func declareExchange(cfg *config.Config, ch Channel, l logr.Logger) error {
50-
l.Infof("Declaring exchange \"%s\"...", cfg.Exchange.Name)
51+
func declareExchange(cfg Config, ch Channel, l logr.Logger) error {
52+
l.Infof("Declaring exchange \"%s\"...", cfg.ExchangeName())
5153
if err := ch.ExchangeDeclare(
52-
cfg.Exchange.Name,
54+
cfg.ExchangeName(),
5355
cfg.ExchangeType(),
54-
cfg.Exchange.Durable,
55-
cfg.Exchange.Autodelete,
56+
cfg.ExchangeIsDurable(),
57+
cfg.ExchangeIsAutoDelete(),
5658
false,
5759
false,
5860
amqp.Table{},
@@ -61,10 +63,10 @@ func declareExchange(cfg *config.Config, ch Channel, l logr.Logger) error {
6163
}
6264

6365
// Bind queue
64-
l.Infof("Binding queue \"%s\" to exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name)
66+
l.Infof("Binding queue \"%s\" to exchange \"%s\"...", cfg.QueueName(), cfg.ExchangeName())
6567
for _, routingKey := range cfg.RoutingKeys() {
6668
if err := ch.QueueBind(
67-
cfg.RabbitMq.Queue,
69+
cfg.QueueName(),
6870
routingKey,
6971
cfg.ExchangeName(),
7072
false,
@@ -77,7 +79,7 @@ func declareExchange(cfg *config.Config, ch Channel, l logr.Logger) error {
7779
return nil
7880
}
7981

80-
func queueArgs(cfg *config.Config) amqp.Table {
82+
func queueArgs(cfg Config) amqp.Table {
8183

8284
args := make(amqp.Table)
8385

example.conf

+4
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ deadLetterroutingkey = someroutingkey
103103
# The priority range for this queue.
104104
priority = 10
105105

106+
# Prevents the queue from being declared. If set to true, the queue must have been configured previous to starting the
107+
# consumer. If the queue is not defined, the consumer can not connect and quits.
108+
nodeclare = false
109+
106110
[logs]
107111
# Path to the log file where informational output is written to
108112
# #

fixtures/TestEndToEnd/noDeclareConfigError.golden

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Connecting RabbitMQ...
2+
Connected.
3+
Opening channel...
4+
Done.
5+
Setting QoS...
6+
Succeeded setting QoS.
7+
Registering consumer...
8+
Succeeded registering consumer.
9+
Waiting for messages...

fixtures/TestEndToEnd/noDeclareError.golden

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Connecting RabbitMQ...
2+
Connected.
3+
Opening channel...
4+
Done.
5+
Setting QoS...
6+
Succeeded setting QoS.
7+
Registering consumer...
8+
Succeeded registering consumer.
9+
Waiting for messages...

fixtures/no_declare.conf

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[rabbitmq]
2+
host = localhost
3+
username = guest
4+
password = guest
5+
6+
[queueSettings]
7+
nodeclare = true

integration_test.go

+65-19
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,16 @@ import (
1414

1515
"github.com/sebdah/goldie"
1616
"github.com/streadway/amqp"
17+
"github.com/stretchr/testify/assert"
1718
)
1819

19-
var command = os.Args[0] + " -test.run=TestHelperProcess -- "
20+
var (
21+
command = os.Args[0] + " -test.run=TestHelperProcess -- "
22+
amqpArgs = amqp.Table{
23+
"x-message-ttl": int32(42),
24+
"x-max-priority": int32(42),
25+
}
26+
)
2027

2128
var tests = []struct {
2229
name string
@@ -112,33 +119,63 @@ var tests = []struct {
112119
},
113120
}
114121

122+
var noDeclareTests = []struct {
123+
name string
124+
// The arguments passed to the consumer command.
125+
args []string
126+
}{
127+
{"noDeclare", []string{"-V", "-no-datetime", "-q", "noDeclare", "-e", command, "-no-declare"}},
128+
{"noDeclareConfig", []string{"-V", "-no-datetime", "-q", "noDeclareConfig", "-e", command, "-c", "fixtures/no_declare.conf"}},
129+
}
130+
115131
func TestEndToEnd(t *testing.T) {
116-
conn := prepare(t)
132+
conn, ch := prepare(t)
117133
defer conn.Close()
118-
119-
ch, err := conn.Channel()
120-
if err != nil {
121-
t.Errorf("failed to open channel: %v", err)
122-
}
123134
defer ch.Close()
124135

125136
for _, test := range tests {
126137
t.Run(test.name, func(t *testing.T) {
127138
os.Remove("./command.log")
128139
cmd, stdout, stderr := startConsumer(t, test.env, test.args...)
129140
declareQueueAndPublish(t, ch, test.queue, test.msg)
130-
waitMessageProcessed(t, stdout)
141+
waitForOutput(t, stdout, "Processed!")
131142
stopConsumer(t, cmd)
132143

133144
output, _ := ioutil.ReadFile("./command.log")
134145
goldie.Assert(t, t.Name()+"Command", output)
135-
goldie.Assert(t, t.Name()+"Output", bytes.Trim(stdout.Bytes(), "\x00"))
136-
goldie.Assert(t, t.Name()+"Error", bytes.Trim(stderr.Bytes(), "\x00"))
146+
assertOutput(t, stdout, stderr)
137147
})
138148
}
149+
150+
for _, test := range noDeclareTests {
151+
t.Run(test.name, func(t *testing.T) {
152+
declareQueue(t, ch, test.name, amqpArgs)
153+
154+
cmd, stdout, stderr := startConsumer(t, []string{}, test.args...)
155+
waitForOutput(t, stdout, "Waiting for messages...")
156+
stopConsumer(t, cmd)
157+
158+
assertOutput(t, stdout, stderr)
159+
})
160+
}
161+
162+
t.Run("declareError", func(t *testing.T) {
163+
declareQueue(t, ch, t.Name(), amqpArgs)
164+
165+
cmd, _, _ := startConsumer(t, []string{}, "-V", "-no-datetime", "-q", t.Name(), "-e", command)
166+
exitErr := cmd.Wait()
167+
168+
assert.NotNil(t, exitErr)
169+
assert.Equal(t, "exit status 1", exitErr.Error())
170+
})
171+
}
172+
173+
func assertOutput(t *testing.T, stdout, stderr *bytes.Buffer) {
174+
goldie.Assert(t, t.Name()+"Output", bytes.Trim(stdout.Bytes(), "\x00"))
175+
goldie.Assert(t, t.Name()+"Error", bytes.Trim(stderr.Bytes(), "\x00"))
139176
}
140177

141-
func prepare(t *testing.T) *amqp.Connection {
178+
func prepare(t *testing.T) (*amqp.Connection, *amqp.Channel) {
142179
makeCmd := exec.Command("make", "build")
143180
if err := makeCmd.Run(); err != nil {
144181
t.Fatalf("could not build binary for: %v", err)
@@ -159,7 +196,12 @@ func prepare(t *testing.T) *amqp.Connection {
159196
t.Fatalf("failed to open AMQP connection: %v", err)
160197
}
161198

162-
return conn
199+
ch, err := conn.Channel()
200+
if err != nil {
201+
t.Fatalf("failed to open channel: %v", err)
202+
}
203+
204+
return conn, ch
163205
}
164206

165207
func connect(url string) (*amqp.Connection, error) {
@@ -180,14 +222,18 @@ func connect(url string) (*amqp.Connection, error) {
180222
}
181223
}
182224

183-
func declareQueueAndPublish(t *testing.T, ch *amqp.Channel, name string, msg amqp.Publishing) {
184-
q, err := ch.QueueDeclare(name, true, false, false, false, nil)
225+
func declareQueue(t *testing.T, ch *amqp.Channel, name string, args amqp.Table) amqp.Queue {
226+
q, err := ch.QueueDeclare(name, true, false, false, false, args)
185227
if err != nil {
186228
t.Errorf("failed to declare queue; %v", err)
187229
}
188230

189-
err = ch.Publish("", q.Name, false, false, msg)
190-
if err != nil {
231+
return q
232+
}
233+
234+
func declareQueueAndPublish(t *testing.T, ch *amqp.Channel, name string, msg amqp.Publishing) {
235+
q := declareQueue(t, ch, name, nil)
236+
if err := ch.Publish("", q.Name, false, false, msg); nil != err {
191237
t.Errorf("failed to publish message: %v", err)
192238
}
193239
}
@@ -213,19 +259,19 @@ func stopConsumer(t *testing.T, cmd *exec.Cmd) {
213259
}
214260
}
215261

216-
func waitMessageProcessed(t *testing.T, buf *bytes.Buffer) {
262+
func waitForOutput(t *testing.T, buf *bytes.Buffer, expect string) {
217263
timeout := time.After(10 * time.Second)
218264
ticker := time.NewTicker(100 * time.Millisecond)
219265
defer ticker.Stop()
220266

221267
for {
222268
select {
223269
case <-timeout:
224-
t.Error("timeout while waiting for message processing")
270+
t.Errorf("timeout while waiting for output \"%s\"", expect)
225271
return
226272

227273
case <-ticker.C:
228-
if strings.Contains(buf.String(), "Processed!") {
274+
if strings.Contains(buf.String(), expect) {
229275
return
230276
}
231277
}

0 commit comments

Comments
 (0)