1616
1717package com .google .cloud .pubsub .v1 ;
1818
19- import static com .google .cloud .pubsub .v1 .MessageDispatcher .PENDING_ACKS_SEND_DELAY ;
2019import static org .junit .Assert .assertEquals ;
2120import static org .junit .Assert .assertTrue ;
2221
2827import com .google .api .gax .rpc .ApiException ;
2928import com .google .api .gax .rpc .FixedTransportChannelProvider ;
3029import com .google .api .gax .rpc .StatusCode ;
31- import com .google .cloud .pubsub .v1 .FakeSubscriberServiceImpl .ModifyAckDeadline ;
3230import com .google .cloud .pubsub .v1 .Subscriber .Builder ;
33- import com .google .common .base .Function ;
34- import com .google .common .base .Optional ;
35- import com .google .common .base .Preconditions ;
36- import com .google .common .collect .ImmutableList ;
3731import com .google .pubsub .v1 .PubsubMessage ;
38- import com .google .pubsub .v1 .PullResponse ;
39- import com .google .pubsub .v1 .ReceivedMessage ;
40- import com .google .pubsub .v1 .StreamingPullResponse ;
4132import com .google .pubsub .v1 .SubscriptionName ;
4233import io .grpc .ManagedChannel ;
4334import io .grpc .Server ;
4435import io .grpc .Status ;
4536import io .grpc .StatusException ;
4637import io .grpc .inprocess .InProcessChannelBuilder ;
4738import io .grpc .inprocess .InProcessServerBuilder ;
48- import java .util .ArrayList ;
49- import java .util .Arrays ;
50- import java .util .Collection ;
51- import java .util .List ;
52- import java .util .concurrent .CountDownLatch ;
53- import java .util .concurrent .LinkedBlockingQueue ;
5439import org .junit .After ;
5540import org .junit .Before ;
5641import org .junit .Rule ;
5742import org .junit .Test ;
5843import org .junit .rules .TestName ;
59- import org .junit .runner .RunWith ;
60- import org .junit .runners .Parameterized ;
61- import org .junit .runners .Parameterized .Parameters ;
6244
6345/** Tests for {@link Subscriber}. */
64- @ RunWith (Parameterized .class )
6546public class SubscriberTest {
6647
6748 private static final SubscriptionName TEST_SUBSCRIPTION =
6849 SubscriptionName .of ("test-project" , "test-subscription" );
6950
70- private static final PubsubMessage TEST_MESSAGE =
71- PubsubMessage .newBuilder ().setMessageId ("1" ).build ();
72-
73- private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECS = 2 ;
74-
75- private final boolean isStreamingTest ;
76-
7751 private ManagedChannel testChannel ;
7852 private FakeScheduledExecutorService fakeExecutor ;
7953 private FakeSubscriberServiceImpl fakeSubscriberServiceImpl ;
8054 private Server testServer ;
8155
82- private TestReceiver testReceiver ;
83-
84- @ Parameters
85- public static Collection <Object []> data () {
86- return Arrays .asList (new Object [][] {{true }, {false }});
87- }
88-
89- static class TestReceiver implements MessageReceiver {
90- private final LinkedBlockingQueue <AckReplyConsumer > outstandingMessageReplies =
91- new LinkedBlockingQueue <>();
92- private boolean shouldAck = true ; // If false, the receiver will <b>nack</b> the messages
93- private Optional <CountDownLatch > messageCountLatch = Optional .absent ();
94- private Optional <RuntimeException > error = Optional .absent ();
95- private boolean explicitAckReplies ;
96-
97- synchronized void setAckReply () {
98- this .shouldAck = true ;
99- }
100-
101- synchronized void setNackReply () {
102- this .shouldAck = false ;
103- }
104-
105- synchronized void setErrorReply (RuntimeException error ) {
106- this .error = Optional .of (error );
107- }
108-
109- synchronized void setExplicitAck (boolean explicitAckReplies ) {
110- this .explicitAckReplies = explicitAckReplies ;
111- }
112-
113- synchronized void setExpectedMessages (int expected ) {
114- this .messageCountLatch = Optional .of (new CountDownLatch (expected ));
115- }
116-
117- void waitForExpectedMessages () throws InterruptedException {
118- CountDownLatch latch ;
119- synchronized (this ) {
120- if (messageCountLatch .isPresent ()) {
121- latch = messageCountLatch .get ();
122- } else {
123- return ;
56+ private final MessageReceiver testReceiver =
57+ new MessageReceiver () {
58+ @ Override
59+ public void receiveMessage (PubsubMessage message , AckReplyConsumer consumer ) {
60+ consumer .ack ();
12461 }
125- }
126- latch .await ();
127- }
128-
129- @ Override
130- public synchronized void receiveMessage (PubsubMessage message , AckReplyConsumer consumer ) {
131- try {
132- if (explicitAckReplies ) {
133- try {
134- outstandingMessageReplies .put (consumer );
135- } catch (InterruptedException e ) {
136- throw new IllegalStateException (e );
137- }
138- } else {
139- replyTo (consumer );
140- }
141- } finally {
142- if (messageCountLatch .isPresent ()) {
143- messageCountLatch .get ().countDown ();
144- }
145- }
146- }
147-
148- public synchronized void replyNextOutstandingMessage () {
149- Preconditions .checkState (explicitAckReplies );
150- try {
151- replyTo (outstandingMessageReplies .take ());
152- } catch (InterruptedException e ) {
153- throw new IllegalStateException (e );
154- }
155- }
156-
157- public synchronized void replyAllOutstandingMessage () {
158- Preconditions .checkState (explicitAckReplies );
159- AckReplyConsumer reply ;
160- while ((reply = outstandingMessageReplies .poll ()) != null ) {
161- replyTo (reply );
162- }
163- }
164-
165- private synchronized void replyTo (AckReplyConsumer reply ) {
166- if (error .isPresent ()) {
167- throw error .get ();
168- } else {
169- if (shouldAck ) {
170- reply .ack ();
171- } else {
172- reply .nack ();
173- }
174- }
175- }
176- }
177-
178- public SubscriberTest (boolean streamingTest ) {
179- this .isStreamingTest = streamingTest ;
180- }
62+ };
18163
18264 @ Rule public TestName testName = new TestName ();
18365
@@ -190,8 +72,6 @@ public void setUp() throws Exception {
19072 serverBuilder .addService (fakeSubscriberServiceImpl );
19173 testServer = serverBuilder .build ();
19274 testServer .start ();
193-
194- testReceiver = new TestReceiver ();
19575 }
19676
19777 @ After
@@ -202,11 +82,6 @@ public void tearDown() throws Exception {
20282
20383 @ Test
20484 public void testOpenedChannels () throws Exception {
205- if (!isStreamingTest ) {
206- // This test is not applicable to polling.
207- return ;
208- }
209-
21085 int expectedChannelCount = 1 ;
21186
21287 Subscriber subscriber = startSubscriber (getTestSubscriberBuilder (testReceiver ));
@@ -219,11 +94,6 @@ public void testOpenedChannels() throws Exception {
21994
22095 @ Test
22196 public void testFailedChannel_recoverableError_channelReopened () throws Exception {
222- if (!isStreamingTest ) {
223- // This test is not applicable to polling.
224- return ;
225- }
226-
22797 int expectedChannelCount = 1 ;
22898
22999 Subscriber subscriber =
@@ -244,11 +114,6 @@ public void testFailedChannel_recoverableError_channelReopened() throws Exceptio
244114
245115 @ Test (expected = IllegalStateException .class )
246116 public void testFailedChannel_fatalError_subscriberFails () throws Exception {
247- if (!isStreamingTest ) {
248- // This test is not applicable to polling.
249- throw new IllegalStateException ("To fullfil test expectation" );
250- }
251-
252117 Subscriber subscriber =
253118 startSubscriber (
254119 getTestSubscriberBuilder (testReceiver )
@@ -276,7 +141,7 @@ public void testFailedChannel_fatalError_subscriberFails() throws Exception {
276141 }
277142
278143 private Subscriber startSubscriber (Builder testSubscriberBuilder ) throws Exception {
279- Subscriber subscriber = testSubscriberBuilder .setUseStreaming (isStreamingTest ).build ();
144+ Subscriber subscriber = testSubscriberBuilder .setUseStreaming (true ).build ();
280145 subscriber .startAsync ().awaitRunning ();
281146 return subscriber ;
282147 }
0 commit comments