Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.

Commit 2e639a6

Browse files
VodzoChristian Häusler
authored and
Christian Häusler
committed
Allow to set queue declare options in config file (#50)
Resolves #49.
1 parent cf17cd9 commit 2e639a6

11 files changed

+163
-4
lines changed

config/config.go

+39-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ type Config struct {
3333
DeadLetterRoutingKey string
3434
Priority int
3535
Nodeclare bool
36+
Durable bool
37+
Exclusive bool
38+
AutoDelete bool
39+
NoWait bool
3640
}
3741
Exchange struct {
3842
Name string
@@ -192,6 +196,27 @@ func (c Config) WithDateTime() bool {
192196
return !c.Logs.NoDateTime
193197
}
194198

199+
// QueueIsDurable checks if queue should be declared durable.
200+
// Defaults to true to keep backwards compatibility
201+
func (c Config) QueueIsDurable() bool {
202+
return c.QueueSettings.Durable
203+
}
204+
205+
// QueueIsExclusive checks if queue should be declared exclusive
206+
func (c Config) QueueIsExclusive() bool {
207+
return c.QueueSettings.Exclusive
208+
}
209+
210+
// QueueIsAutoDelete checks if queue should be declared 'autoDelete'
211+
func (c Config) QueueIsAutoDelete() bool {
212+
return c.QueueSettings.AutoDelete
213+
}
214+
215+
// QueueIsNoWait checks if queue should be declared 'noWait'
216+
func (c Config) QueueIsNoWait() bool {
217+
return c.QueueSettings.NoWait
218+
}
219+
195220
// ConsumerTag returns the tag used to identify the consumer.
196221
func (c Config) ConsumerTag() string {
197222
if v, set := os.LookupEnv("GO_WANT_HELPER_PROCESS"); set && v == "1" {
@@ -217,23 +242,34 @@ func LoadAndParse(location string) (*Config, error) {
217242
location = location
218243
}
219244

220-
cfg := Config{}
221-
if err := gcfg.ReadFileInto(&cfg, location); err != nil {
245+
cfg := &Config{}
246+
247+
SetDefaultQueueDurability(cfg)
248+
249+
if err := gcfg.ReadFileInto(cfg, location); err != nil {
222250
return nil, err
223251
}
224252

225-
return &cfg, nil
253+
return cfg, nil
226254
}
227255

228256
func CreateFromString(data string) (*Config, error) {
229257
cfg := &Config{}
258+
259+
SetDefaultQueueDurability(cfg)
260+
230261
if err := gcfg.ReadStringInto(cfg, data); err != nil {
231262
return nil, err
232263
}
233264

234265
return cfg, nil
235266
}
236267

268+
// SetDefaultQueueDurability sets queue durable to true to keep backwards compatibility
269+
func SetDefaultQueueDurability(cfg *Config) {
270+
cfg.QueueSettings.Durable = true
271+
}
272+
237273
func transformToStringValue(val string) string {
238274
if val == "<empty>" {
239275
return ""

consumer/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,8 @@ type Config interface {
2222
Priority() int32
2323
QueueName() string
2424
RoutingKeys() []string
25+
QueueIsDurable() bool
26+
QueueIsExclusive() bool
27+
QueueIsAutoDelete() bool
28+
QueueIsNoWait() bool
2529
}
+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[rabbitmq]
2+
queue = autoDeleteQueue
3+
4+
[queuesettings]
5+
autodelete = On
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[rabbitmq]
2+
queue = defaultQueueDurability

consumer/fixtures/durable_queue.conf

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[rabbitmq]
2+
queue = durableQueue
3+
4+
[queuesettings]
5+
durable = On
+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[rabbitmq]
2+
queue = exclusiveQueue
3+
4+
[queuesettings]
5+
exclusive = On
+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[rabbitmq]
2+
queue = nonDurableQueue
3+
4+
[queuesettings]
5+
durable = Off

consumer/fixtures/nowait_queue.conf

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[rabbitmq]
2+
queue = noWaitQueue
3+
4+
[queuesettings]
5+
nowait = On

consumer/setup.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,14 @@ func setupQoS(cfg Config, ch Channel, l logr.Logger) error {
3939

4040
func declareQueue(cfg Config, ch Channel, l logr.Logger) error {
4141
l.Infof("Declaring queue \"%s\"...", cfg.QueueName())
42-
_, err := ch.QueueDeclare(cfg.QueueName(), true, false, false, false, queueArgs(cfg))
42+
_, err := ch.QueueDeclare(
43+
cfg.QueueName(), // Queue name
44+
cfg.QueueIsDurable(), // durable
45+
cfg.QueueIsAutoDelete(), // autoDelete
46+
cfg.QueueIsExclusive(), // exclusive
47+
cfg.QueueIsNoWait(), // noWait
48+
queueArgs(cfg), // arguments
49+
)
4350
if nil != err {
4451
if amqpErr, ok := err.(*amqp.Error); ok && amqpErr.Code == 406 {
4552
l.Error("Queue already declared with conflicting settings. You might want to use --no-declare.")

consumer/setup_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ const (
2323
routingConfig = "routing"
2424
simpleExchangeConfig = "exchange"
2525
ttlConfig = "ttl"
26+
autoDeleteQueue = "autodelete_queue"
27+
durableQueue = "durable_queue"
28+
nonDurableQueue = "non_durable_queue"
29+
defaultQueueDurability = "default_queue_durability"
30+
exclusiveQueue = "exclusive_queue"
31+
noWaitQueue = "nowait_queue"
2632
)
2733

2834
var nilAmqpTable amqp.Table
@@ -203,6 +209,66 @@ var queueTests = []struct {
203209
},
204210
fmt.Errorf("failed to bind queue to exchange: queue bind error"),
205211
},
212+
// Durable queue
213+
{
214+
"durableQueue",
215+
durableQueue,
216+
func(ch *TestChannel) {
217+
ch.On("Qos", 3, 0, false).Return(nil).Once()
218+
ch.On("QueueDeclare", "durableQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once()
219+
},
220+
nil,
221+
},
222+
// Non durable queue
223+
{
224+
"nonDurableQueue",
225+
nonDurableQueue,
226+
func(ch *TestChannel) {
227+
ch.On("Qos", 3, 0, false).Return(nil).Once()
228+
ch.On("QueueDeclare", "nonDurableQueue", false, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once()
229+
},
230+
nil,
231+
},
232+
// Default queue durability
233+
{
234+
"defaultQueueDurability",
235+
defaultQueueDurability,
236+
func(ch *TestChannel) {
237+
ch.On("Qos", 3, 0, false).Return(nil).Once()
238+
ch.On("QueueDeclare", "defaultQueueDurability", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once()
239+
},
240+
nil,
241+
},
242+
// AutoDelete queue
243+
{
244+
"autoDeleteQueue",
245+
autoDeleteQueue,
246+
func(ch *TestChannel) {
247+
ch.On("Qos", 3, 0, false).Return(nil).Once()
248+
ch.On("QueueDeclare", "autoDeleteQueue", true, true, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once()
249+
},
250+
nil,
251+
},
252+
// Exclusive queue
253+
{
254+
"exclusiveQueue",
255+
exclusiveQueue,
256+
func(ch *TestChannel) {
257+
ch.On("Qos", 3, 0, false).Return(nil).Once()
258+
ch.On("QueueDeclare", "exclusiveQueue", true, false, true, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once()
259+
},
260+
nil,
261+
},
262+
// Nowait queue
263+
{
264+
"noWaitQueue",
265+
noWaitQueue,
266+
func(ch *TestChannel) {
267+
ch.On("Qos", 3, 0, false).Return(nil).Once()
268+
ch.On("QueueDeclare", "noWaitQueue", true, false, false, true, emptyAmqpTable).Return(amqp.Queue{}, nil).Once()
269+
},
270+
nil,
271+
},
206272
}
207273

208274
func TestQueueSettings(t *testing.T) {

example.conf

+19
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,25 @@ priority = 10
107107
# consumer. If the queue is not defined, the consumer can not connect and quits.
108108
nodeclare = false
109109

110+
# Should the queue be declared as durable. If set to true, the queue will survive server restarts.
111+
# Defaults to true.
112+
durable = true
113+
114+
# Should the queue be declared as exclusive. If set to true, the queue will be removed when consumer disconnects.
115+
# Defaults to false.
116+
exclusive = false
117+
118+
# Should the queue be declared as autodelete. If set to true and is durable, the queue will survive server restarts
119+
# but will be removed when there are no remaining consumers or bindings.
120+
# Defaults to false
121+
autodelete = false
122+
123+
# Should the queue be declared as noWait. When noWait is true, the queue will assume to be declared on the server.
124+
# A channel exception will arrive if the conditions are met for existing queues
125+
# or attempting to modify an existing queue from a different connection.
126+
# Defaults to false
127+
nowait = false
128+
110129
[logs]
111130
# Path to the log file where informational output is written to
112131
# When providing the --verbose, -V option, this section becomes optional.

0 commit comments

Comments
 (0)