2828import com .google .pubsub .v1 .ReceivedMessage ;
2929import java .util .ArrayList ;
3030import java .util .Collections ;
31- import java .util .HashMap ;
3231import java .util .List ;
33- import java .util .Map ;
3432import java .util .concurrent .LinkedBlockingQueue ;
3533import java .util .concurrent .ScheduledThreadPoolExecutor ;
3634import java .util .concurrent .TimeUnit ;
@@ -44,10 +42,16 @@ public class MessageDispatcherTest {
4442 .setAckId ("ackid" )
4543 .setMessage (PubsubMessage .newBuilder ().setData (ByteString .EMPTY ).build ())
4644 .build ();
45+ private static final Runnable NOOP_RUNNABLE =
46+ new Runnable () {
47+ @ Override
48+ public void run () {
49+ // No-op; don't do anything.
50+ }
51+ };
4752
4853 private MessageDispatcher dispatcher ;
4954 private LinkedBlockingQueue <AckReplyConsumer > consumers ;
50- private Map <String , List <ByteString >> messagesByOrderingKey ;
5155 private List <String > sentAcks ;
5256 private List <ModAckItem > sentModAcks ;
5357 private FakeClock clock ;
@@ -67,20 +71,13 @@ static ModAckItem of(String ackId, int seconds) {
6771 @ Before
6872 public void setUp () {
6973 consumers = new LinkedBlockingQueue <>();
70- messagesByOrderingKey = new HashMap <>();
7174 sentAcks = new ArrayList <>();
7275 sentModAcks = new ArrayList <>();
7376
7477 MessageReceiver receiver =
7578 new MessageReceiver () {
7679 @ Override
7780 public void receiveMessage (final PubsubMessage message , final AckReplyConsumer consumer ) {
78- List <ByteString > messages = messagesByOrderingKey .get (message .getOrderingKey ());
79- if (messages == null ) {
80- messages = new ArrayList <>();
81- messagesByOrderingKey .put (message .getOrderingKey (), messages );
82- }
83- messages .add (message .getData ());
8481 consumers .add (consumer );
8582 }
8683 };
@@ -206,47 +203,4 @@ public void testDeadlineAdjustment() throws Exception {
206203
207204 assertThat (dispatcher .computeDeadlineSeconds ()).isEqualTo (42 );
208205 }
209-
210- private ReceivedMessage newReceivedMessage (String ackId , String orderingKey , String data ) {
211- return ReceivedMessage .newBuilder ()
212- .setAckId (ackId )
213- .setMessage (
214- PubsubMessage .newBuilder ()
215- .setOrderingKey (orderingKey )
216- .setData (ByteString .copyFromUtf8 (data ))
217- .build ())
218- .build ();
219- }
220-
221- @ Test
222- public void testOrderingKey () throws Exception {
223- // Create messages with "orderA".
224- ReceivedMessage message1 = newReceivedMessage ("ackId1" , "orderA" , "m1" );
225- ReceivedMessage message2 = newReceivedMessage ("ackId2" , "orderA" , "m2" );
226- // Create messages with "orderB".
227- ReceivedMessage message3 = newReceivedMessage ("ackId3" , "orderB" , "m3" );
228- ReceivedMessage message4 = newReceivedMessage ("ackId4" , "orderB" , "m4" );
229- ReceivedMessage message5 = newReceivedMessage ("ackId5" , "orderB" , "m5" );
230-
231- dispatcher .processReceivedMessages (Collections .singletonList (message1 ));
232- consumers .take ().ack ();
233- dispatcher .processReceivedMessages (Collections .singletonList (message2 ));
234- consumers .take ().ack ();
235- dispatcher .processReceivedMessages (Collections .singletonList (message3 ));
236- consumers .take ().ack ();
237- dispatcher .processReceivedMessages (Collections .singletonList (message4 ));
238- consumers .take ().ack ();
239- dispatcher .processReceivedMessages (Collections .singletonList (message5 ));
240- consumers .take ().ack ();
241-
242- assertThat (messagesByOrderingKey .get ("orderA" ))
243- .containsExactly (ByteString .copyFromUtf8 ("m1" ), ByteString .copyFromUtf8 ("m2" ))
244- .inOrder ();
245- assertThat (messagesByOrderingKey .get ("orderB" ))
246- .containsExactly (
247- ByteString .copyFromUtf8 ("m3" ),
248- ByteString .copyFromUtf8 ("m4" ),
249- ByteString .copyFromUtf8 ("m5" ))
250- .inOrder ();
251- }
252206}
0 commit comments