Skip to content

Commit 1154e47

Browse files
committed
Rework /internal/queue package (#1449)
* Rework /internal/queue package Given our use cases for this package, we don't need methods that don't block on reads if there's no value to be read. Due to this, I've removed the ReadOrWait function and did a small redesign of the methods to be more in line with standard queue method naming. * Change Read/Write/IsEmpty to Dequeue/Enqueue/Size and remove ReadOrWait. Now there is no version of Read/Dequeue that doesn't block if the queue is empty. * Fix up tests to be in line with this removal of the non-blocking read and simplified most of the tests. Signed-off-by: Daniel Canter <[email protected]> (cherry picked from commit 12d4cd8) Signed-off-by: Daniel Canter <[email protected]>
1 parent 50b68e6 commit 1154e47

4 files changed

Lines changed: 93 additions & 130 deletions

File tree

internal/jobobject/iocp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func pollIOCP(ctx context.Context, iocpHandle windows.Handle) {
5757
}).Warn("failed to parse job object message")
5858
continue
5959
}
60-
if err := msq.Write(notification); err == queue.ErrQueueClosed {
60+
if err := msq.Enqueue(notification); err == queue.ErrQueueClosed {
6161
// Write will only return an error when the queue is closed.
6262
// The only time a queue would ever be closed is when we call `Close` on
6363
// the job it belongs to which also removes it from the jobMap, so something

internal/jobobject/jobobject.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (job *JobObject) PollNotification() (interface{}, error) {
235235
if job.mq == nil {
236236
return nil, ErrNotRegistered
237237
}
238-
return job.mq.ReadOrWait()
238+
return job.mq.Dequeue()
239239
}
240240

241241
// UpdateProcThreadAttribute updates the passed in ProcThreadAttributeList to contain what is necessary to

internal/queue/mq.go

Lines changed: 21 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import (
55
"sync"
66
)
77

8-
var (
9-
ErrQueueClosed = errors.New("the queue is closed for reading and writing")
10-
ErrQueueEmpty = errors.New("the queue is empty")
11-
)
8+
var ErrQueueClosed = errors.New("the queue is closed for reading and writing")
129

1310
// MessageQueue represents a threadsafe message queue to be used to retrieve or
1411
// write messages to.
@@ -29,8 +26,8 @@ func NewMessageQueue() *MessageQueue {
2926
}
3027
}
3128

32-
// Write writes `msg` to the queue.
33-
func (mq *MessageQueue) Write(msg interface{}) error {
29+
// Enqueue writes `msg` to the queue.
30+
func (mq *MessageQueue) Enqueue(msg interface{}) error {
3431
mq.m.Lock()
3532
defer mq.m.Unlock()
3633

@@ -43,69 +40,53 @@ func (mq *MessageQueue) Write(msg interface{}) error {
4340
return nil
4441
}
4542

46-
// Read will read a value from the queue if available, otherwise return an error.
47-
func (mq *MessageQueue) Read() (interface{}, error) {
43+
// Dequeue will read a value from the queue and remove it. If the queue
44+
// is empty, this will block until the queue is closed or a value gets enqueued.
45+
func (mq *MessageQueue) Dequeue() (interface{}, error) {
4846
mq.m.Lock()
4947
defer mq.m.Unlock()
50-
if mq.closed {
51-
return nil, ErrQueueClosed
52-
}
53-
if mq.isEmpty() {
54-
return nil, ErrQueueEmpty
48+
49+
for !mq.closed && mq.size() == 0 {
50+
mq.c.Wait()
5551
}
56-
val := mq.messages[0]
57-
mq.messages[0] = nil
58-
mq.messages = mq.messages[1:]
59-
return val, nil
60-
}
6152

62-
// ReadOrWait will read a value from the queue if available, else it will wait for a
63-
// value to become available. This will block forever if nothing gets written or until
64-
// the queue gets closed.
65-
func (mq *MessageQueue) ReadOrWait() (interface{}, error) {
66-
mq.m.Lock()
53+
// We got woken up, check if it's because the queue got closed.
6754
if mq.closed {
68-
mq.m.Unlock()
6955
return nil, ErrQueueClosed
7056
}
71-
if mq.isEmpty() {
72-
for !mq.closed && mq.isEmpty() {
73-
mq.c.Wait()
74-
}
75-
mq.m.Unlock()
76-
return mq.Read()
77-
}
57+
7858
val := mq.messages[0]
7959
mq.messages[0] = nil
8060
mq.messages = mq.messages[1:]
81-
mq.m.Unlock()
8261
return val, nil
8362
}
8463

85-
// IsEmpty returns if the queue is empty
86-
func (mq *MessageQueue) IsEmpty() bool {
64+
// Size returns the size of the queue.
65+
func (mq *MessageQueue) Size() int {
8766
mq.m.RLock()
8867
defer mq.m.RUnlock()
89-
return len(mq.messages) == 0
68+
return mq.size()
9069
}
9170

92-
// Nonexported empty check that doesn't lock so we can call this in Read and Write.
93-
func (mq *MessageQueue) isEmpty() bool {
94-
return len(mq.messages) == 0
71+
// Nonexported size check to check if the queue is empty inside already locked functions.
72+
func (mq *MessageQueue) size() int {
73+
return len(mq.messages)
9574
}
9675

9776
// Close closes the queue for future writes or reads. Any attempts to read or write from the
9877
// queue after close will return ErrQueueClosed. This is safe to call multiple times.
9978
func (mq *MessageQueue) Close() {
10079
mq.m.Lock()
10180
defer mq.m.Unlock()
102-
// Already closed
81+
82+
// Already closed, noop
10383
if mq.closed {
10484
return
10585
}
86+
10687
mq.messages = nil
10788
mq.closed = true
108-
// If there's anybody currently waiting on a value from ReadOrWait, we need to
89+
// If there's anybody currently waiting on a value from Dequeue, we need to
10990
// broadcast so the read(s) can return ErrQueueClosed.
11091
mq.c.Broadcast()
11192
}

internal/queue/queue_test.go

Lines changed: 70 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,59 @@
11
package queue
22

33
import (
4+
"fmt"
45
"sync"
56
"testing"
67
"time"
78
)
89

9-
func TestReadWrite(t *testing.T) {
10+
func TestEnqueueDequeue(t *testing.T) {
1011
q := NewMessageQueue()
1112

12-
// Reading from an empty queue should return ErrQueueEmpty
13-
if _, err := q.Read(); err != ErrQueueEmpty {
14-
t.Fatal("expected to receive `ErrQueueEmpty` for reading from empty queue")
15-
}
16-
17-
// Write 1 to the queue and read this later.
18-
if err := q.Write(1); err != nil {
19-
t.Fatal(err)
20-
}
21-
22-
// Read the value. Value will be dequeued.
23-
if msg, err := q.Read(); err != nil || msg != 1 {
24-
t.Fatal(err)
13+
vals := []int{1, 2, 3, 4, 5}
14+
for _, val := range vals {
15+
// Enqueue vals to the queue and read later.
16+
if err := q.Enqueue(val); err != nil {
17+
t.Fatal(err)
18+
}
2519
}
2620

27-
// We just read a value, now try and read again and verify that we get ErrQueueEmpty again.
28-
if _, err := q.Read(); err != ErrQueueEmpty {
29-
t.Fatal(err)
30-
}
21+
for _, val := range vals {
22+
// Dequeueing from an empty queue should block forever until a write occurs.
23+
qVal, err := q.Dequeue()
24+
if err != nil {
25+
t.Fatal(err)
26+
}
3127

32-
// Close the queue and verify that we get an error on write.
33-
q.Close()
34-
if err := q.Write(1); err != ErrQueueClosed {
35-
t.Fatal(err)
28+
if qVal != val {
29+
t.Fatalf("expected %d, got: %d", val, qVal)
30+
}
3631
}
3732
}
3833

39-
func TestReadOrWaitClose(t *testing.T) {
34+
func TestEnqueueDequeueClose(t *testing.T) {
4035
q := NewMessageQueue()
4136

37+
vals := []int{1, 2, 3}
4238
go func() {
43-
_ = q.Write(1)
44-
_ = q.Write(2)
45-
_ = q.Write(3)
46-
time.Sleep(time.Second * 5)
47-
q.Close()
39+
for _, val := range vals {
40+
_ = q.Enqueue(val)
41+
}
4842
}()
4943

50-
time.Sleep(time.Second * 2)
51-
5244
read := 0
5345
for {
54-
if _, err := q.ReadOrWait(); err != nil {
55-
if err == ErrQueueClosed && read == 3 {
56-
break
57-
}
58-
t.Fatal(err)
59-
}
60-
read++
61-
}
62-
}
63-
64-
func TestReadOrWait(t *testing.T) {
65-
q := NewMessageQueue()
66-
67-
go func() {
68-
_ = q.Write(1)
69-
_ = q.Write(2)
70-
_ = q.Write(3)
71-
time.Sleep(time.Second * 5)
72-
_ = q.Write(4)
73-
}()
74-
75-
// Small sleep so that we can give time to ensure a value is written to the queue so we
76-
// can test both states ReadOrWait could be in. These states being there is already a value
77-
// ready for consumption and all we have to do is just read it, or we wait to get signalled of
78-
// an available value.
79-
time.Sleep(time.Second * 1)
80-
timeout := time.After(time.Second * 20)
81-
done := make(chan struct{})
82-
readErr := make(chan error)
83-
84-
go func() {
85-
for {
86-
if msg, err := q.ReadOrWait(); err != nil {
87-
readErr <- err
88-
} else {
89-
if msg == 4 {
90-
done <- struct{}{}
91-
break
92-
}
46+
if _, err := q.Dequeue(); err == nil {
47+
read++
48+
if read == len(vals) {
49+
// Close after we've read all of our values, then on the next
50+
// go around make sure we get ErrClosed()
51+
q.Close()
9352
}
53+
} else if err != ErrQueueClosed {
54+
t.Fatalf("expected to receive ErrQueueClosed, instead got: %s", err)
9455
}
95-
}()
96-
97-
select {
98-
case <-timeout:
99-
t.Fatal("timed out waiting for all queue values to be read")
100-
case <-done:
101-
case err := <-readErr:
102-
t.Fatal(err)
56+
break
10357
}
10458
}
10559

@@ -109,7 +63,7 @@ func TestMultipleReaders(t *testing.T) {
10963
done := make(chan struct{})
11064
go func() {
11165
for i := 0; i < 50; i++ {
112-
if err := q.Write(1); err != nil {
66+
if err := q.Enqueue(1); err != nil {
11367
errChan <- err
11468
}
11569
}
@@ -121,7 +75,7 @@ func TestMultipleReaders(t *testing.T) {
12175
// Reader 1
12276
go func() {
12377
for i := 0; i < 25; i++ {
124-
if _, err := q.ReadOrWait(); err != nil {
78+
if _, err := q.Dequeue(); err != nil {
12579
errChan <- err
12680
}
12781
}
@@ -131,7 +85,7 @@ func TestMultipleReaders(t *testing.T) {
13185
// Reader 2
13286
go func() {
13387
for i := 0; i < 25; i++ {
134-
if _, err := q.ReadOrWait(); err != nil {
88+
if _, err := q.Dequeue(); err != nil {
13589
errChan <- err
13690
}
13791
}
@@ -143,13 +97,11 @@ func TestMultipleReaders(t *testing.T) {
14397
done <- struct{}{}
14498
}()
14599

146-
timeout := time.After(time.Second * 20)
147-
148100
select {
149101
case err := <-errChan:
150102
t.Fatalf("failed in read or write: %s", err)
151103
case <-done:
152-
case <-timeout:
104+
case <-time.After(time.Second * 20):
153105
t.Fatalf("timeout exceeded waiting for reads to complete")
154106
}
155107
}
@@ -164,15 +116,15 @@ func TestMultipleReadersClose(t *testing.T) {
164116

165117
// Reader 1
166118
go func() {
167-
if _, err := q.ReadOrWait(); err != ErrQueueClosed {
119+
if _, err := q.Dequeue(); err != ErrQueueClosed {
168120
errChan <- err
169121
}
170122
wg.Done()
171123
}()
172124

173125
// Reader 2
174126
go func() {
175-
if _, err := q.ReadOrWait(); err != ErrQueueClosed {
127+
if _, err := q.Dequeue(); err != ErrQueueClosed {
176128
errChan <- err
177129
}
178130
wg.Done()
@@ -187,13 +139,43 @@ func TestMultipleReadersClose(t *testing.T) {
187139
// Close the queue and this should signal both readers to return ErrQueueClosed.
188140
q.Close()
189141

190-
timeout := time.After(time.Second * 20)
191-
192142
select {
193143
case err := <-errChan:
194144
t.Fatalf("failed in read or write: %s", err)
195145
case <-done:
196-
case <-timeout:
146+
case <-time.After(time.Second * 20):
197147
t.Fatalf("timeout exceeded waiting for reads to complete")
198148
}
199149
}
150+
151+
func TestDequeueBlock(t *testing.T) {
152+
q := NewMessageQueue()
153+
errChan := make(chan error)
154+
testVal := 1
155+
156+
go func() {
157+
// Intentionally dequeue right away with no elements so we know we actually block on
158+
// no elements.
159+
val, err := q.Dequeue()
160+
if err != nil {
161+
errChan <- err
162+
}
163+
if val != testVal {
164+
errChan <- fmt.Errorf("expected %d, but got %d", testVal, val)
165+
}
166+
close(errChan)
167+
}()
168+
169+
// Ensure dequeue has started
170+
time.Sleep(time.Second * 3)
171+
if err := q.Enqueue(testVal); err != nil {
172+
t.Fatal(err)
173+
}
174+
175+
select {
176+
case err := <-errChan:
177+
if err != nil {
178+
t.Fatal(err)
179+
}
180+
}
181+
}

0 commit comments

Comments
 (0)