Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.

Commit e3978cb

Browse files
author
Christian Häusler
authored
Delivery package (#25)
Moved delivery to its own package and merged with metadata.
1 parent 41b5ca0 commit e3978cb

13 files changed

+293
-260
lines changed

command/builder.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package command
33
import (
44
"io"
55

6-
"github.com/corvus-ch/rabbitmq-cli-consumer/metadata"
6+
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
77
"github.com/thockin/logr"
88
)
99

@@ -25,7 +25,7 @@ type Builder interface {
2525
SetCommand(cmd string)
2626

2727
// GetCommand gets the executable command for the given message data.
28-
GetCommand(p metadata.Properties, d metadata.DeliveryInfo, body []byte) (Command, error)
28+
GetCommand(p delivery.Properties, d delivery.Info, body []byte) (Command, error)
2929
}
3030

3131
// NewBuilder ensures a builder struct is setup and ready to be used.

command/builder_argument.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"os/exec"
1212
"strings"
1313

14-
"github.com/corvus-ch/rabbitmq-cli-consumer/metadata"
14+
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
1515
"github.com/thockin/logr"
1616
)
1717

@@ -55,14 +55,14 @@ func (b *ArgumentBuilder) SetCaptureOutput(capture bool) {
5555
b.capture = capture
5656
}
5757

58-
func (b *ArgumentBuilder) GetCommand(p metadata.Properties, d metadata.DeliveryInfo, body []byte) (Command, error) {
58+
func (b *ArgumentBuilder) GetCommand(p delivery.Properties, d delivery.Info, body []byte) (Command, error) {
5959
var err error
6060
payload := body
6161
if b.WithMetadata {
6262
payload, err = json.Marshal(&struct {
63-
Properties metadata.Properties `json:"properties"`
64-
DeliveryInfo metadata.DeliveryInfo `json:"delivery_info"`
65-
Body string `json:"body"`
63+
Properties delivery.Properties `json:"properties"`
64+
DeliveryInfo delivery.Info `json:"delivery_info"`
65+
Body string `json:"body"`
6666
}{
6767

6868
Properties: p,

command/builder_pipe.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"os/exec"
1010
"strings"
1111

12-
"github.com/corvus-ch/rabbitmq-cli-consumer/metadata"
12+
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
1313
"github.com/thockin/logr"
1414
)
1515

@@ -51,11 +51,11 @@ func (b *PipeBuilder) SetCaptureOutput(capture bool) {
5151
b.capture = capture
5252
}
5353

54-
func (b *PipeBuilder) GetCommand(p metadata.Properties, d metadata.DeliveryInfo, body []byte) (Command, error) {
54+
func (b *PipeBuilder) GetCommand(p delivery.Properties, d delivery.Info, body []byte) (Command, error) {
5555

5656
meta, err := json.Marshal(&struct {
57-
Properties metadata.Properties `json:"properties"`
58-
DeliveryInfo metadata.DeliveryInfo `json:"delivery_info"`
57+
Properties delivery.Properties `json:"properties"`
58+
DeliveryInfo delivery.Info `json:"delivery_info"`
5959
}{
6060

6161
Properties: p,

command/builder_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
log "github.com/corvus-ch/logr/buffered"
1010
"github.com/corvus-ch/rabbitmq-cli-consumer/command"
11-
"github.com/corvus-ch/rabbitmq-cli-consumer/metadata"
11+
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
1212
"github.com/stretchr/testify/assert"
1313
)
1414

@@ -21,7 +21,7 @@ func assertWriter(t *testing.T, exp *bytes.Buffer, got io.Writer, captured bool)
2121
}
2222

2323
func createAndAssertCommand(t *testing.T, b command.Builder, body []byte) *exec.Cmd {
24-
c, err := b.GetCommand(metadata.Properties{}, metadata.DeliveryInfo{}, body)
24+
c, err := b.GetCommand(delivery.Properties{}, delivery.Info{}, body)
2525
if err != nil {
2626
t.Errorf("failed to create command: %v", err)
2727
}

consumer/acknowledger.go

+12-11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package consumer
22

33
import (
44
"fmt"
5+
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
56
)
67

78
// Mapping of script exit codes and message acknowledgment.
@@ -15,7 +16,7 @@ const (
1516

1617
// Message acknowledgment depending on the scripts exit code.
1718
type Acknowledger interface {
18-
Ack(d Delivery, code int) error
19+
Ack(d delivery.Delivery, code int) error
1920
}
2021

2122
// Creates new Acknowledger using strict or default behaviour.
@@ -36,9 +37,9 @@ type DefaultAcknowledger struct {
3637
}
3738

3839
// Default acknowledgment using a predefined behaviour on script error.
39-
func (a DefaultAcknowledger) Ack(d Delivery, code int) error {
40+
func (a DefaultAcknowledger) Ack(d delivery.Delivery, code int) error {
4041
if code == exitAck {
41-
d.Ack(true)
42+
d.Ack()
4243
return nil
4344
}
4445
switch a.OnFailure {
@@ -47,11 +48,11 @@ func (a DefaultAcknowledger) Ack(d Delivery, code int) error {
4748
case exitRejectRequeue:
4849
d.Reject(true)
4950
case exitNack:
50-
d.Nack(true, false)
51+
d.Nack(false)
5152
case exitNackRequeue:
52-
d.Nack(true, true)
53+
d.Nack(true)
5354
default:
54-
d.Nack(true, true)
55+
d.Nack(true)
5556
}
5657
return nil
5758
}
@@ -62,20 +63,20 @@ type StrictAcknowledger struct {
6263
}
6364

6465
// Strict acknowledgment returning an err if script exits with an unknown exit code.
65-
func (a StrictAcknowledger) Ack(d Delivery, code int) error {
66+
func (a StrictAcknowledger) Ack(d delivery.Delivery, code int) error {
6667
switch code {
6768
case exitAck:
68-
d.Ack(true)
69+
d.Ack()
6970
case exitReject:
7071
d.Reject(false)
7172
case exitRejectRequeue:
7273
d.Reject(true)
7374
case exitNack:
74-
d.Nack(true, false)
75+
d.Nack(false)
7576
case exitNackRequeue:
76-
d.Nack(true, true)
77+
d.Nack(true)
7778
default:
78-
d.Nack(true, true)
79+
d.Nack(true)
7980
return fmt.Errorf("unexpected exit code %v", code)
8081
}
8182

consumer/consume_test.go

+66-20
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
log "github.com/corvus-ch/logr/buffered"
1212
"github.com/corvus-ch/rabbitmq-cli-consumer/command"
1313
"github.com/corvus-ch/rabbitmq-cli-consumer/consumer"
14-
"github.com/corvus-ch/rabbitmq-cli-consumer/metadata"
14+
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
1515
"github.com/stretchr/testify/assert"
1616
"github.com/stretchr/testify/mock"
1717
"github.com/thockin/logr"
@@ -25,26 +25,26 @@ var processingTests = []struct {
2525
strict bool
2626
onFailure int
2727
}{
28-
{"ack", "Ack", []interface{}{true}, 0, false, 0},
28+
{"ack", "Ack", []interface{}{}, 0, false, 0},
2929
{"reject", "Reject", []interface{}{false}, 1, false, 3},
3030
{"rejectRequeue", "Reject", []interface{}{true}, 1, false, 4},
31-
{"nack", "Nack", []interface{}{true, false}, 1, false, 5},
32-
{"nackRequeue", "Nack", []interface{}{true, true}, 1, false, 6},
33-
{"fallback", "Nack", []interface{}{true, true}, 1, false, 0},
34-
{"strictAck", "Ack", []interface{}{true}, 0, true, 0},
31+
{"nack", "Nack", []interface{}{false}, 1, false, 5},
32+
{"nackRequeue", "Nack", []interface{}{true}, 1, false, 6},
33+
{"fallback", "Nack", []interface{}{true}, 1, false, 0},
34+
{"strictAck", "Ack", []interface{}{}, 0, true, 0},
3535
{"strictReject", "Reject", []interface{}{false}, 3, true, 0},
3636
{"strictRejectRequeue", "Reject", []interface{}{true}, 4, true, 0},
37-
{"strictNack", "Nack", []interface{}{true, false}, 5, true, 0},
38-
{"strictNackRequeue", "Nack", []interface{}{true, true}, 6, true, 0},
37+
{"strictNack", "Nack", []interface{}{false}, 5, true, 0},
38+
{"strictNackRequeue", "Nack", []interface{}{true}, 6, true, 0},
3939
}
4040

4141
func TestProcessing(t *testing.T) {
4242
for _, test := range processingTests {
4343
t.Run(test.name, func(t *testing.T) {
4444
d := new(TestDelivery)
4545
b := new(TestBuilder)
46-
p := metadata.Properties{}
47-
di := metadata.DeliveryInfo{}
46+
p := delivery.Properties{}
47+
di := delivery.Info{}
4848
cmd := new(TestCommand)
4949
body := []byte(test.name)
5050
c := consumer.Consumer{
@@ -55,9 +55,11 @@ func TestProcessing(t *testing.T) {
5555
b.On("GetCommand", p, di, body).Return(cmd, nil)
5656
d.On("Body").Return(body)
5757
d.On(test.ackMethod, test.ackArgs...).Return(nil)
58+
d.On("Properties").Return(p)
59+
d.On("Info").Return(di)
5860
cmd.On("Run").Return(test.exit)
5961

60-
c.ProcessMessage(d, p, di)
62+
c.ProcessMessage(d)
6163

6264
d.AssertExpectations(t)
6365
b.AssertExpectations(t)
@@ -70,8 +72,8 @@ func TestCommandFailure(t *testing.T) {
7072
l := log.New(0)
7173
d := new(TestDelivery)
7274
b := new(TestBuilder)
73-
p := metadata.Properties{}
74-
di := metadata.DeliveryInfo{}
75+
p := delivery.Properties{}
76+
di := delivery.Info{}
7577
body := []byte("cmdFailure")
7678
c := consumer.Consumer{
7779
Builder: b,
@@ -80,9 +82,11 @@ func TestCommandFailure(t *testing.T) {
8082

8183
b.On("GetCommand", p, di, body).Return(new(TestCommand), fmt.Errorf("failed from test"))
8284
d.On("Body").Return(body)
83-
d.On("Nack", true, true).Return(nil)
85+
d.On("Nack", true).Return(nil)
86+
d.On("Properties").Return(p)
87+
d.On("Info").Return(di)
8488

85-
c.ProcessMessage(d, p, di)
89+
c.ProcessMessage(d)
8690

8791
assert.Equal(t, "ERROR failed to create command: failed from test\n", l.Buf().String())
8892
d.AssertExpectations(t)
@@ -98,8 +102,8 @@ func TestStrictDefault(t *testing.T) {
98102

99103
d := new(TestDelivery)
100104
b := new(TestBuilder)
101-
p := metadata.Properties{}
102-
di := metadata.DeliveryInfo{}
105+
p := delivery.Properties{}
106+
di := delivery.Info{}
103107
cmd := new(TestCommand)
104108
body := []byte("strictDefault")
105109
c := consumer.Consumer{
@@ -110,11 +114,13 @@ func TestStrictDefault(t *testing.T) {
110114

111115
b.On("GetCommand", p, di, body).Return(cmd, nil)
112116
d.On("Body").Return(body)
113-
d.On("Nack", true, true).Return(nil)
117+
d.On("Nack", true).Return(nil)
118+
d.On("Properties").Return(p)
119+
d.On("Info").Return(di)
114120
cmd.On("Run").Return(1)
115121

116122
assert.PanicsWithValue(t, "os.Exit called with: 11", func() {
117-
c.ProcessMessage(d, p, di)
123+
c.ProcessMessage(d)
118124
}, "os.Exit was not called")
119125

120126
d.AssertExpectations(t)
@@ -143,7 +149,7 @@ func (b *TestBuilder) SetCommand(cmd string) {
143149
b.Called(cmd)
144150
}
145151

146-
func (b *TestBuilder) GetCommand(p metadata.Properties, d metadata.DeliveryInfo, body []byte) (command.Command, error) {
152+
func (b *TestBuilder) GetCommand(p delivery.Properties, d delivery.Info, body []byte) (command.Command, error) {
147153
argsT := b.Called(p, d, body)
148154

149155
return argsT.Get(0).(command.Command), argsT.Error(1)
@@ -161,3 +167,43 @@ func (t TestCommand) Run() int {
161167
func (t TestCommand) Cmd() *exec.Cmd {
162168
return t.Called().Get(0).(*exec.Cmd)
163169
}
170+
171+
type TestDelivery struct {
172+
mock.Mock
173+
}
174+
175+
func (t *TestDelivery) Ack() error {
176+
argstT := t.Called()
177+
178+
return argstT.Error(0)
179+
}
180+
181+
func (t *TestDelivery) Nack(requeue bool) error {
182+
argsT := t.Called(requeue)
183+
184+
return argsT.Error(0)
185+
}
186+
187+
func (t *TestDelivery) Reject(requeue bool) error {
188+
argsT := t.Called(requeue)
189+
190+
return argsT.Error(0)
191+
}
192+
193+
func (t *TestDelivery) Body() []byte {
194+
argsT := t.Called()
195+
196+
return argsT.Get(0).([]byte)
197+
}
198+
199+
func (t *TestDelivery) Properties() delivery.Properties {
200+
argsT := t.Called()
201+
202+
return argsT.Get(0).(delivery.Properties)
203+
}
204+
205+
func (t *TestDelivery) Info() delivery.Info {
206+
argsT := t.Called()
207+
208+
return argsT.Get(0).(delivery.Info)
209+
}

consumer/consumer.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package consumer
22

33
import (
4+
"fmt"
45
"io"
56
"os"
67

7-
"fmt"
88
"github.com/corvus-ch/rabbitmq-cli-consumer/command"
99
"github.com/corvus-ch/rabbitmq-cli-consumer/config"
10-
"github.com/corvus-ch/rabbitmq-cli-consumer/metadata"
10+
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
1111
"github.com/streadway/amqp"
1212
"github.com/thockin/logr"
1313
)
@@ -47,7 +47,7 @@ func (c *Consumer) Consume() error {
4747

4848
go func() {
4949
for d := range msgs {
50-
c.ProcessMessage(NewRabbitMqDelivery(d), metadata.NewProperties(d), metadata.NewDeliveryInfo(d))
50+
c.ProcessMessage(delivery.New(d))
5151
}
5252
}()
5353

@@ -58,11 +58,11 @@ func (c *Consumer) Consume() error {
5858
}
5959

6060
// ProcessMessage processes a single message by running the executable.
61-
func (c *Consumer) ProcessMessage(d Delivery, p metadata.Properties, m metadata.DeliveryInfo) {
62-
cmd, err := c.Builder.GetCommand(p, m, d.Body())
61+
func (c *Consumer) ProcessMessage(d delivery.Delivery) {
62+
cmd, err := c.Builder.GetCommand(d.Properties(), d.Info(), d.Body())
6363
if err != nil {
6464
c.Log.Errorf("failed to create command: %v", err)
65-
d.Nack(true, true)
65+
d.Nack(true)
6666
return
6767
}
6868

consumer/delivery.go

-35
This file was deleted.

0 commit comments

Comments
 (0)