Skip to content

Commit 73b80f7

Browse files
committed
pr comment
1 parent ad9dac4 commit 73b80f7

2 files changed

Lines changed: 48 additions & 55 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import io.grpc.stub.ClientCallStreamObserver;
4141
import io.grpc.stub.ClientCalls;
4242
import io.grpc.stub.ClientResponseObserver;
43+
import java.util.ArrayList;
4344
import java.util.List;
4445
import java.util.concurrent.ScheduledExecutorService;
4546
import java.util.concurrent.TimeUnit;
@@ -211,39 +212,40 @@ private boolean isAlive() {
211212
@Override
212213
public void sendAckOperations(
213214
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) {
214-
StreamingPullRequest[] requests =
215+
List<StreamingPullRequest> requests =
215216
partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES);
216217
for (StreamingPullRequest request : requests) {
217218
requestObserver.onNext(request);
218219
}
219220
}
220221

221222
@VisibleForTesting
222-
static StreamingPullRequest[] partitionAckOperations(
223+
static List<StreamingPullRequest> partitionAckOperations(
223224
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions, int size) {
224225
int numExtensions = 0;
225226
for (PendingModifyAckDeadline modify : ackDeadlineExtensions) {
226227
numExtensions += modify.ackIds.size();
227228
}
228-
int numRequests = Math.max(numExtensions, acksToSend.size());
229-
numRequests = numRequests / size + (numRequests % size == 0 ? 0 : 1);
229+
int numChanges = Math.max(numExtensions, acksToSend.size());
230+
int numRequests = numChanges / size + (numChanges % size == 0 ? 0 : 1);
230231

231-
StreamingPullRequest.Builder[] requests = new StreamingPullRequest.Builder[numRequests];
232+
List<StreamingPullRequest.Builder> requests = new ArrayList<>(numRequests);
232233
for (int i = 0; i < numRequests; i++) {
233-
requests[i] = StreamingPullRequest.newBuilder();
234+
requests.add(StreamingPullRequest.newBuilder());
234235
}
235236

236237
int reqCount = 0;
237238
for (List<String> acksChunk : Lists.partition(acksToSend, size)) {
238-
requests[reqCount].addAllAckIds(acksChunk);
239+
requests.get(reqCount).addAllAckIds(acksChunk);
239240
reqCount++;
240241
}
241242

242243
reqCount = 0;
243244
int ackCount = 0;
244245
for (PendingModifyAckDeadline modify : ackDeadlineExtensions) {
245246
for (String ackId : modify.ackIds) {
246-
requests[reqCount]
247+
requests
248+
.get(reqCount)
247249
.addModifyDeadlineSeconds(modify.deadlineExtensionSeconds)
248250
.addModifyDeadlineAckIds(ackId);
249251
ackCount++;
@@ -254,9 +256,9 @@ static StreamingPullRequest[] partitionAckOperations(
254256
}
255257
}
256258

257-
StreamingPullRequest[] ret = new StreamingPullRequest[requests.length];
258-
for (int i = 0; i < requests.length; i++) {
259-
ret[i] = requests[i].build();
259+
List<StreamingPullRequest> ret = new ArrayList<>(requests.size());
260+
for (StreamingPullRequest.Builder builder : requests) {
261+
ret.add(builder.build());
260262
}
261263
return ret;
262264
}

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnectionTest.java

Lines changed: 35 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,102 +19,93 @@
1919
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline;
2020
import com.google.common.truth.Truth;
2121
import com.google.pubsub.v1.StreamingPullRequest;
22-
import java.util.ArrayList;
2322
import java.util.Arrays;
2423
import java.util.Collections;
2524
import java.util.List;
2625
import org.junit.Test;
2726

2827
public class StreamingSubscriberConnectionTest {
29-
private static List<StreamingPullRequest> partitionAckOperations(
30-
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions, int size) {
31-
List<StreamingPullRequest> requests = new ArrayList<>();
32-
for (StreamingPullRequest req :
33-
StreamingSubscriberConnection.partitionAckOperations(
34-
acksToSend, ackDeadlineExtensions, size)) {
35-
requests.add(req);
36-
}
37-
return requests;
38-
}
39-
4028
@Test
4129
public void testPartitionAckOperations() {
4230
List<StreamingPullRequest> requests;
4331

4432
requests =
45-
partitionAckOperations(
33+
StreamingSubscriberConnection.partitionAckOperations(
4634
Collections.<String>emptyList(), Collections.<PendingModifyAckDeadline>emptyList(), 3);
4735
Truth.assertThat(requests).isEmpty();
4836

4937
requests =
50-
partitionAckOperations(
38+
StreamingSubscriberConnection.partitionAckOperations(
5139
Arrays.asList("a", "b", "c"), Collections.<PendingModifyAckDeadline>emptyList(), 3);
5240
Truth.assertThat(requests)
53-
.contains(
54-
StreamingPullRequest.newBuilder().addAckIds("a").addAckIds("b").addAckIds("c").build());
41+
.containsExactly(
42+
StreamingPullRequest.newBuilder().addAckIds("a").addAckIds("b").addAckIds("c").build())
43+
.inOrder();
5544

5645
requests =
57-
partitionAckOperations(
46+
StreamingSubscriberConnection.partitionAckOperations(
5847
Arrays.asList("a", "b", "c", "d"),
5948
Collections.<PendingModifyAckDeadline>emptyList(),
6049
3);
6150
Truth.assertThat(requests)
62-
.contains(
63-
StreamingPullRequest.newBuilder().addAckIds("a").addAckIds("b").addAckIds("c").build());
64-
Truth.assertThat(requests).contains(StreamingPullRequest.newBuilder().addAckIds("d").build());
51+
.containsExactly(
52+
StreamingPullRequest.newBuilder().addAckIds("a").addAckIds("b").addAckIds("c").build(),
53+
StreamingPullRequest.newBuilder().addAckIds("d").build())
54+
.inOrder();
6555

6656
requests =
67-
partitionAckOperations(
57+
StreamingSubscriberConnection.partitionAckOperations(
6858
Arrays.asList("a", "b", "c", "d"),
69-
Arrays.asList(new PendingModifyAckDeadline(42, "A")),
59+
Arrays.asList(new PendingModifyAckDeadline(42, "w")),
7060
3);
7161
Truth.assertThat(requests)
72-
.contains(
62+
.containsExactly(
7363
StreamingPullRequest.newBuilder()
7464
.addAckIds("a")
7565
.addAckIds("b")
7666
.addAckIds("c")
77-
.addModifyDeadlineAckIds("A")
67+
.addModifyDeadlineAckIds("w")
7868
.addModifyDeadlineSeconds(42)
79-
.build());
80-
Truth.assertThat(requests).contains(StreamingPullRequest.newBuilder().addAckIds("d").build());
69+
.build(),
70+
StreamingPullRequest.newBuilder().addAckIds("d").build())
71+
.inOrder();
8172

8273
requests =
83-
partitionAckOperations(
84-
Arrays.asList("a"), Arrays.asList(new PendingModifyAckDeadline(42, "A", "B")), 3);
74+
StreamingSubscriberConnection.partitionAckOperations(
75+
Arrays.asList("a"), Arrays.asList(new PendingModifyAckDeadline(42, "w", "x")), 3);
8576
Truth.assertThat(requests)
86-
.contains(
77+
.containsExactly(
8778
StreamingPullRequest.newBuilder()
8879
.addAckIds("a")
89-
.addModifyDeadlineAckIds("A")
80+
.addModifyDeadlineAckIds("w")
9081
.addModifyDeadlineSeconds(42)
91-
.addModifyDeadlineAckIds("B")
82+
.addModifyDeadlineAckIds("x")
9283
.addModifyDeadlineSeconds(42)
93-
.build());
84+
.build())
85+
.inOrder();
9486

9587
requests =
96-
partitionAckOperations(
88+
StreamingSubscriberConnection.partitionAckOperations(
9789
Arrays.asList("a"),
9890
Arrays.asList(
99-
new PendingModifyAckDeadline(42, "A", "B"),
100-
new PendingModifyAckDeadline(43, "C", "D")),
91+
new PendingModifyAckDeadline(42, "w", "x"),
92+
new PendingModifyAckDeadline(43, "y", "z")),
10193
3);
10294
Truth.assertThat(requests)
103-
.contains(
95+
.containsExactly(
10496
StreamingPullRequest.newBuilder()
10597
.addAckIds("a")
106-
.addModifyDeadlineAckIds("A")
98+
.addModifyDeadlineAckIds("w")
10799
.addModifyDeadlineSeconds(42)
108-
.addModifyDeadlineAckIds("B")
100+
.addModifyDeadlineAckIds("x")
109101
.addModifyDeadlineSeconds(42)
110-
.addModifyDeadlineAckIds("C")
102+
.addModifyDeadlineAckIds("y")
111103
.addModifyDeadlineSeconds(43)
112-
.build());
113-
Truth.assertThat(requests)
114-
.contains(
104+
.build(),
115105
StreamingPullRequest.newBuilder()
116-
.addModifyDeadlineAckIds("D")
106+
.addModifyDeadlineAckIds("z")
117107
.addModifyDeadlineSeconds(43)
118-
.build());
108+
.build())
109+
.inOrder();
119110
}
120111
}

0 commit comments

Comments
 (0)