Skip to content

Commit b15dc82

Browse files
authored
---
yaml --- r: 8583 b: refs/heads/master c: 537f8b0 h: refs/heads/master i: 8581: 73ebe2f 8579: 78e5db4 8575: 659ea28
1 parent 23f9589 commit b15dc82

2 files changed

Lines changed: 8 additions & 143 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: f6792c3da2e6e7edd13001f51949f887a2488154
2+
refs/heads/master: 537f8b0cdccf869082ef8b5d580c4ad3ec3a7d59
33
refs/heads/travis: 47e4fee4fd5af9b2a8ce46f23c72ec95f9b195b2
44
refs/heads/gh-pages: 6daca92127d91b7c2c99490080ecf8a13fa94cde
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java

Lines changed: 7 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.google.cloud.pubsub.v1;
1818

19-
import static com.google.cloud.pubsub.v1.MessageDispatcher.PENDING_ACKS_SEND_DELAY;
2019
import static org.junit.Assert.assertEquals;
2120
import static org.junit.Assert.assertTrue;
2221

@@ -28,156 +27,39 @@
2827
import com.google.api.gax.rpc.ApiException;
2928
import com.google.api.gax.rpc.FixedTransportChannelProvider;
3029
import com.google.api.gax.rpc.StatusCode;
31-
import com.google.cloud.pubsub.v1.FakeSubscriberServiceImpl.ModifyAckDeadline;
3230
import com.google.cloud.pubsub.v1.Subscriber.Builder;
33-
import com.google.common.base.Function;
34-
import com.google.common.base.Optional;
35-
import com.google.common.base.Preconditions;
36-
import com.google.common.collect.ImmutableList;
3731
import com.google.pubsub.v1.PubsubMessage;
38-
import com.google.pubsub.v1.PullResponse;
39-
import com.google.pubsub.v1.ReceivedMessage;
40-
import com.google.pubsub.v1.StreamingPullResponse;
4132
import com.google.pubsub.v1.SubscriptionName;
4233
import io.grpc.ManagedChannel;
4334
import io.grpc.Server;
4435
import io.grpc.Status;
4536
import io.grpc.StatusException;
4637
import io.grpc.inprocess.InProcessChannelBuilder;
4738
import io.grpc.inprocess.InProcessServerBuilder;
48-
import java.util.ArrayList;
49-
import java.util.Arrays;
50-
import java.util.Collection;
51-
import java.util.List;
52-
import java.util.concurrent.CountDownLatch;
53-
import java.util.concurrent.LinkedBlockingQueue;
5439
import org.junit.After;
5540
import org.junit.Before;
5641
import org.junit.Rule;
5742
import org.junit.Test;
5843
import org.junit.rules.TestName;
59-
import org.junit.runner.RunWith;
60-
import org.junit.runners.Parameterized;
61-
import org.junit.runners.Parameterized.Parameters;
6244

6345
/** Tests for {@link Subscriber}. */
64-
@RunWith(Parameterized.class)
6546
public class SubscriberTest {
6647

6748
private static final SubscriptionName TEST_SUBSCRIPTION =
6849
SubscriptionName.of("test-project", "test-subscription");
6950

70-
private static final PubsubMessage TEST_MESSAGE =
71-
PubsubMessage.newBuilder().setMessageId("1").build();
72-
73-
private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECS = 2;
74-
75-
private final boolean isStreamingTest;
76-
7751
private ManagedChannel testChannel;
7852
private FakeScheduledExecutorService fakeExecutor;
7953
private FakeSubscriberServiceImpl fakeSubscriberServiceImpl;
8054
private Server testServer;
8155

82-
private TestReceiver testReceiver;
83-
84-
@Parameters
85-
public static Collection<Object[]> data() {
86-
return Arrays.asList(new Object[][] {{true}, {false}});
87-
}
88-
89-
static class TestReceiver implements MessageReceiver {
90-
private final LinkedBlockingQueue<AckReplyConsumer> outstandingMessageReplies =
91-
new LinkedBlockingQueue<>();
92-
private boolean shouldAck = true; // If false, the receiver will <b>nack</b> the messages
93-
private Optional<CountDownLatch> messageCountLatch = Optional.absent();
94-
private Optional<RuntimeException> error = Optional.absent();
95-
private boolean explicitAckReplies;
96-
97-
synchronized void setAckReply() {
98-
this.shouldAck = true;
99-
}
100-
101-
synchronized void setNackReply() {
102-
this.shouldAck = false;
103-
}
104-
105-
synchronized void setErrorReply(RuntimeException error) {
106-
this.error = Optional.of(error);
107-
}
108-
109-
synchronized void setExplicitAck(boolean explicitAckReplies) {
110-
this.explicitAckReplies = explicitAckReplies;
111-
}
112-
113-
synchronized void setExpectedMessages(int expected) {
114-
this.messageCountLatch = Optional.of(new CountDownLatch(expected));
115-
}
116-
117-
void waitForExpectedMessages() throws InterruptedException {
118-
CountDownLatch latch;
119-
synchronized (this) {
120-
if (messageCountLatch.isPresent()) {
121-
latch = messageCountLatch.get();
122-
} else {
123-
return;
56+
private final MessageReceiver testReceiver =
57+
new MessageReceiver() {
58+
@Override
59+
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
60+
consumer.ack();
12461
}
125-
}
126-
latch.await();
127-
}
128-
129-
@Override
130-
public synchronized void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
131-
try {
132-
if (explicitAckReplies) {
133-
try {
134-
outstandingMessageReplies.put(consumer);
135-
} catch (InterruptedException e) {
136-
throw new IllegalStateException(e);
137-
}
138-
} else {
139-
replyTo(consumer);
140-
}
141-
} finally {
142-
if (messageCountLatch.isPresent()) {
143-
messageCountLatch.get().countDown();
144-
}
145-
}
146-
}
147-
148-
public synchronized void replyNextOutstandingMessage() {
149-
Preconditions.checkState(explicitAckReplies);
150-
try {
151-
replyTo(outstandingMessageReplies.take());
152-
} catch (InterruptedException e) {
153-
throw new IllegalStateException(e);
154-
}
155-
}
156-
157-
public synchronized void replyAllOutstandingMessage() {
158-
Preconditions.checkState(explicitAckReplies);
159-
AckReplyConsumer reply;
160-
while ((reply = outstandingMessageReplies.poll()) != null) {
161-
replyTo(reply);
162-
}
163-
}
164-
165-
private synchronized void replyTo(AckReplyConsumer reply) {
166-
if (error.isPresent()) {
167-
throw error.get();
168-
} else {
169-
if (shouldAck) {
170-
reply.ack();
171-
} else {
172-
reply.nack();
173-
}
174-
}
175-
}
176-
}
177-
178-
public SubscriberTest(boolean streamingTest) {
179-
this.isStreamingTest = streamingTest;
180-
}
62+
};
18163

18264
@Rule public TestName testName = new TestName();
18365

@@ -190,8 +72,6 @@ public void setUp() throws Exception {
19072
serverBuilder.addService(fakeSubscriberServiceImpl);
19173
testServer = serverBuilder.build();
19274
testServer.start();
193-
194-
testReceiver = new TestReceiver();
19575
}
19676

19777
@After
@@ -202,11 +82,6 @@ public void tearDown() throws Exception {
20282

20383
@Test
20484
public void testOpenedChannels() throws Exception {
205-
if (!isStreamingTest) {
206-
// This test is not applicable to polling.
207-
return;
208-
}
209-
21085
int expectedChannelCount = 1;
21186

21287
Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver));
@@ -219,11 +94,6 @@ public void testOpenedChannels() throws Exception {
21994

22095
@Test
22196
public void testFailedChannel_recoverableError_channelReopened() throws Exception {
222-
if (!isStreamingTest) {
223-
// This test is not applicable to polling.
224-
return;
225-
}
226-
22797
int expectedChannelCount = 1;
22898

22999
Subscriber subscriber =
@@ -244,11 +114,6 @@ public void testFailedChannel_recoverableError_channelReopened() throws Exceptio
244114

245115
@Test(expected = IllegalStateException.class)
246116
public void testFailedChannel_fatalError_subscriberFails() throws Exception {
247-
if (!isStreamingTest) {
248-
// This test is not applicable to polling.
249-
throw new IllegalStateException("To fullfil test expectation");
250-
}
251-
252117
Subscriber subscriber =
253118
startSubscriber(
254119
getTestSubscriberBuilder(testReceiver)
@@ -276,7 +141,7 @@ public void testFailedChannel_fatalError_subscriberFails() throws Exception {
276141
}
277142

278143
private Subscriber startSubscriber(Builder testSubscriberBuilder) throws Exception {
279-
Subscriber subscriber = testSubscriberBuilder.setUseStreaming(isStreamingTest).build();
144+
Subscriber subscriber = testSubscriberBuilder.setUseStreaming(true).build();
280145
subscriber.startAsync().awaitRunning();
281146
return subscriber;
282147
}

0 commit comments

Comments
 (0)