Skip to content

Commit ad54d69

Browse files
committed
Merge branch 'master' into pubsub-ordering-keys
2 parents c2c8b5c + 262b5af commit ad54d69

3 files changed

Lines changed: 8 additions & 56 deletions

File tree

google-api-grpc/proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PubsubMessage.java

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
220220
final String orderingKey = message.getOrderingKey();
221221
if (orderingKey != null && !orderingKey.isEmpty() && !enableMessageOrdering) {
222222
throw new IllegalStateException(
223-
"Cannot publish a message with an ordering key when message ordeirng is not enabled.");
223+
"Cannot publish a message with an ordering key when message ordering is not enabled.");
224224
}
225225

226226
message = messageTransform.apply(message);

google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 7 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
import com.google.pubsub.v1.ReceivedMessage;
2929
import java.util.ArrayList;
3030
import java.util.Collections;
31-
import java.util.HashMap;
3231
import java.util.List;
33-
import java.util.Map;
3432
import java.util.concurrent.LinkedBlockingQueue;
3533
import java.util.concurrent.ScheduledThreadPoolExecutor;
3634
import java.util.concurrent.TimeUnit;
@@ -44,10 +42,16 @@ public class MessageDispatcherTest {
4442
.setAckId("ackid")
4543
.setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build())
4644
.build();
45+
private static final Runnable NOOP_RUNNABLE =
46+
new Runnable() {
47+
@Override
48+
public void run() {
49+
// No-op; don't do anything.
50+
}
51+
};
4752

4853
private MessageDispatcher dispatcher;
4954
private LinkedBlockingQueue<AckReplyConsumer> consumers;
50-
private Map<String, List<ByteString>> messagesByOrderingKey;
5155
private List<String> sentAcks;
5256
private List<ModAckItem> sentModAcks;
5357
private FakeClock clock;
@@ -67,20 +71,13 @@ static ModAckItem of(String ackId, int seconds) {
6771
@Before
6872
public void setUp() {
6973
consumers = new LinkedBlockingQueue<>();
70-
messagesByOrderingKey = new HashMap<>();
7174
sentAcks = new ArrayList<>();
7275
sentModAcks = new ArrayList<>();
7376

7477
MessageReceiver receiver =
7578
new MessageReceiver() {
7679
@Override
7780
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
78-
List<ByteString> messages = messagesByOrderingKey.get(message.getOrderingKey());
79-
if (messages == null) {
80-
messages = new ArrayList<>();
81-
messagesByOrderingKey.put(message.getOrderingKey(), messages);
82-
}
83-
messages.add(message.getData());
8481
consumers.add(consumer);
8582
}
8683
};
@@ -206,47 +203,4 @@ public void testDeadlineAdjustment() throws Exception {
206203

207204
assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(42);
208205
}
209-
210-
private ReceivedMessage newReceivedMessage(String ackId, String orderingKey, String data) {
211-
return ReceivedMessage.newBuilder()
212-
.setAckId(ackId)
213-
.setMessage(
214-
PubsubMessage.newBuilder()
215-
.setOrderingKey(orderingKey)
216-
.setData(ByteString.copyFromUtf8(data))
217-
.build())
218-
.build();
219-
}
220-
221-
@Test
222-
public void testOrderingKey() throws Exception {
223-
// Create messages with "orderA".
224-
ReceivedMessage message1 = newReceivedMessage("ackId1", "orderA", "m1");
225-
ReceivedMessage message2 = newReceivedMessage("ackId2", "orderA", "m2");
226-
// Create messages with "orderB".
227-
ReceivedMessage message3 = newReceivedMessage("ackId3", "orderB", "m3");
228-
ReceivedMessage message4 = newReceivedMessage("ackId4", "orderB", "m4");
229-
ReceivedMessage message5 = newReceivedMessage("ackId5", "orderB", "m5");
230-
231-
dispatcher.processReceivedMessages(Collections.singletonList(message1));
232-
consumers.take().ack();
233-
dispatcher.processReceivedMessages(Collections.singletonList(message2));
234-
consumers.take().ack();
235-
dispatcher.processReceivedMessages(Collections.singletonList(message3));
236-
consumers.take().ack();
237-
dispatcher.processReceivedMessages(Collections.singletonList(message4));
238-
consumers.take().ack();
239-
dispatcher.processReceivedMessages(Collections.singletonList(message5));
240-
consumers.take().ack();
241-
242-
assertThat(messagesByOrderingKey.get("orderA"))
243-
.containsExactly(ByteString.copyFromUtf8("m1"), ByteString.copyFromUtf8("m2"))
244-
.inOrder();
245-
assertThat(messagesByOrderingKey.get("orderB"))
246-
.containsExactly(
247-
ByteString.copyFromUtf8("m3"),
248-
ByteString.copyFromUtf8("m4"),
249-
ByteString.copyFromUtf8("m5"))
250-
.inOrder();
251-
}
252206
}

0 commit comments

Comments
 (0)