Skip to content

Commit 71bd273

Browse files
arrivenhongalex
andauthored
feat(pubsub/pstest): subscription message ordering (#6257)
Co-authored-by: Alex Hong <[email protected]>
1 parent df178f8 commit 71bd273

2 files changed

Lines changed: 68 additions & 2 deletions

File tree

pubsub/pstest/fake.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,7 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage {
10061006
now := s.timeNowFunc()
10071007
s.maintainMessages(now)
10081008
var msgs []*pb.ReceivedMessage
1009-
for id, m := range s.msgs {
1009+
for id, m := range filterMsgs(s.msgs, s.proto.EnableMessageOrdering) {
10101010
if m.outstanding() {
10111011
continue
10121012
}
@@ -1028,6 +1028,32 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage {
10281028
return msgs
10291029
}
10301030

1031+
func filterMsgs(msgs map[string]*message, enableMessageOrdering bool) map[string]*message {
1032+
if !enableMessageOrdering {
1033+
return msgs
1034+
}
1035+
result := make(map[string]*message)
1036+
1037+
type msg struct {
1038+
id string
1039+
m *message
1040+
}
1041+
orderingKeyMap := make(map[string]msg)
1042+
for id, m := range msgs {
1043+
orderingKey := m.proto.Message.OrderingKey
1044+
if orderingKey == "" {
1045+
orderingKey = id
1046+
}
1047+
if val, ok := orderingKeyMap[orderingKey]; !ok || m.proto.Message.PublishTime.AsTime().Before(val.m.proto.Message.PublishTime.AsTime()) {
1048+
orderingKeyMap[orderingKey] = msg{m: m, id: id}
1049+
}
1050+
}
1051+
for _, val := range orderingKeyMap {
1052+
result[val.id] = val.m
1053+
}
1054+
return result
1055+
}
1056+
10311057
func (s *subscription) deliver() {
10321058
s.mu.Lock()
10331059
defer s.mu.Unlock()
@@ -1036,7 +1062,7 @@ func (s *subscription) deliver() {
10361062
s.maintainMessages(now)
10371063
// Try to deliver each remaining message.
10381064
curIndex := 0
1039-
for id, m := range s.msgs {
1065+
for id, m := range filterMsgs(s.msgs, s.proto.EnableMessageOrdering) {
10401066
if m.outstanding() {
10411067
continue
10421068
}

pubsub/pstest/fake_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1513,3 +1513,43 @@ func TestSubscriptionPushPull(t *testing.T) {
15131513
t.Errorf("sub.BigqueryConfig should be zero value\n%s", diff)
15141514
}
15151515
}
1516+
1517+
func TestSubscriptionMessageOrdering(t *testing.T) {
1518+
ctx := context.Background()
1519+
1520+
s := NewServer()
1521+
defer s.Close()
1522+
1523+
top, err := s.GServer.CreateTopic(ctx, &pb.Topic{Name: "projects/p/topics/t"})
1524+
if err != nil {
1525+
t.Errorf("Failed to init pubsub topic: %v", err)
1526+
}
1527+
sub, err := s.GServer.CreateSubscription(ctx, &pb.Subscription{
1528+
Name: "projects/p/subscriptions/s",
1529+
Topic: top.Name,
1530+
AckDeadlineSeconds: 30,
1531+
EnableMessageOrdering: true,
1532+
})
1533+
if err != nil {
1534+
t.Errorf("Failed to init pubsub subscription: %v", err)
1535+
}
1536+
1537+
const orderingKey = "ordering-key"
1538+
var ids []string
1539+
for i := 0; i < 1000; i++ {
1540+
ids = append(ids, s.PublishOrdered("projects/p/topics/t", []byte("hello"), nil, orderingKey))
1541+
}
1542+
for len(ids) > 0 {
1543+
pull, err := s.GServer.Pull(ctx, &pb.PullRequest{Subscription: sub.Name})
1544+
if err != nil {
1545+
t.Errorf("Failed to pull from server: %v", err)
1546+
}
1547+
for i, msg := range pull.ReceivedMessages {
1548+
if msg.Message.MessageId != ids[i] {
1549+
t.Errorf("want %s, got %s", ids[i], msg.AckId)
1550+
}
1551+
s.GServer.Acknowledge(ctx, &pb.AcknowledgeRequest{Subscription: sub.Name, AckIds: []string{msg.AckId}})
1552+
}
1553+
ids = ids[len(pull.ReceivedMessages):]
1554+
}
1555+
}

0 commit comments

Comments
 (0)