Skip to content

Commit 8514c8d

Browse files
dpcollins-googlesduskis
authored andcommitted
---
yaml --- r: 20141 b: refs/heads/autosynth-os-login c: 1e18a7d h: refs/heads/master i: 20139: 415a781
1 parent 637158b commit 8514c8d

4 files changed

Lines changed: 4 additions & 218 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ refs/heads/autosynth-firestore: d1a44f9acc302750e37b008ecb9c1aa535cc94df
135135
refs/heads/autosynth-iot: f03bdd338a9056ca3b7ea6d9ca901649ba9aab78
136136
refs/heads/autosynth-kms: 2828edfe3d2c53dd6e71912eae8a53c87bf40c87
137137
refs/heads/autosynth-language: c3d990dd34d81e7e935041e7147fb9dd27f8a557
138-
refs/heads/autosynth-os-login: 921be7b872e947b14eceb8597ecb1838c29f5abb
138+
refs/heads/autosynth-os-login: 1e18a7d8f967c8b749eb99020dedf5b05506e764
139139
refs/heads/autosynth-redis: 0cdb2e47359d51b73763bcea8af3de62aa99119b
140140
refs/heads/autosynth-scheduler: d97f8743ba965c7d5e492c8dc1f51d023104e260
141141
refs/heads/autosynth-spanner: 9bff86d057df31e04c76d72865e8e073ac5794fb

branches/autosynth-os-login/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutureCallback;
2323
import com.google.api.core.ApiFutures;
24-
import com.google.api.core.InternalApi;
2524
import com.google.api.core.SettableApiFuture;
2625
import com.google.api.gax.batching.FlowController;
2726
import com.google.api.gax.core.Distribution;
@@ -43,7 +42,6 @@
4342
import com.google.pubsub.v1.StreamingPullRequest;
4443
import com.google.pubsub.v1.StreamingPullResponse;
4544
import io.grpc.Status;
46-
import java.util.ArrayList;
4745
import java.util.Deque;
4846
import java.util.List;
4947
import java.util.concurrent.ScheduledExecutorService;
@@ -317,48 +315,4 @@ public void onFailure(Throwable t) {
317315
ApiFutures.addCallback(future, loggingCallback);
318316
}
319317
}
320-
321-
@InternalApi
322-
static List<StreamingPullRequest> partitionAckOperations(
323-
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions, int size) {
324-
int numExtensions = 0;
325-
for (PendingModifyAckDeadline modify : ackDeadlineExtensions) {
326-
numExtensions += modify.ackIds.size();
327-
}
328-
int numChanges = Math.max(numExtensions, acksToSend.size());
329-
int numRequests = numChanges / size + (numChanges % size == 0 ? 0 : 1);
330-
331-
List<StreamingPullRequest.Builder> requests = new ArrayList<>(numRequests);
332-
for (int i = 0; i < numRequests; i++) {
333-
requests.add(StreamingPullRequest.newBuilder());
334-
}
335-
336-
int reqCount = 0;
337-
for (List<String> acksChunk : Lists.partition(acksToSend, size)) {
338-
requests.get(reqCount).addAllAckIds(acksChunk);
339-
reqCount++;
340-
}
341-
342-
reqCount = 0;
343-
int ackCount = 0;
344-
for (PendingModifyAckDeadline modify : ackDeadlineExtensions) {
345-
for (String ackId : modify.ackIds) {
346-
requests
347-
.get(reqCount)
348-
.addModifyDeadlineSeconds(modify.deadlineExtensionSeconds)
349-
.addModifyDeadlineAckIds(ackId);
350-
ackCount++;
351-
if (ackCount == size) {
352-
reqCount++;
353-
ackCount = 0;
354-
}
355-
}
356-
}
357-
358-
List<StreamingPullRequest> ret = new ArrayList<>(requests.size());
359-
for (StreamingPullRequest.Builder builder : requests) {
360-
ret.add(builder.build());
361-
}
362-
return ret;
363-
}
364318
}

branches/autosynth-os-login/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 3 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import java.util.LinkedList;
4949
import java.util.List;
5050
import java.util.concurrent.ScheduledExecutorService;
51-
import java.util.concurrent.ScheduledFuture;
5251
import java.util.logging.Level;
5352
import java.util.logging.Logger;
5453
import javax.annotation.Nullable;
@@ -94,6 +93,7 @@ public class Subscriber extends AbstractApiService {
9493
@InternalApi static final int MAX_ACK_DEADLINE_SECONDS = 600;
9594
@InternalApi static final int MIN_ACK_DEADLINE_SECONDS = 10;
9695
private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60);
96+
private static final Duration ACK_EXPIRATION_PADDING = Duration.ofSeconds(5);
9797

9898
private static final ScheduledExecutorService SHARED_SYSTEM_EXECUTOR =
9999
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(6).build().getExecutor();
@@ -102,7 +102,6 @@ public class Subscriber extends AbstractApiService {
102102

103103
private final String subscriptionName;
104104
private final FlowControlSettings flowControlSettings;
105-
private final Duration ackExpirationPadding;
106105
private final Duration maxAckExtensionPeriod;
107106
private final ScheduledExecutorService executor;
108107
@Nullable private final ScheduledExecutorService alarmsExecutor;
@@ -120,20 +119,12 @@ public class Subscriber extends AbstractApiService {
120119
new LinkedList<>();
121120
private final ApiClock clock;
122121
private final List<AutoCloseable> closeables = new ArrayList<>();
123-
private ScheduledFuture<?> ackDeadlineUpdater;
124122

125123
private Subscriber(Builder builder) {
126124
receiver = builder.receiver;
127125
flowControlSettings = builder.flowControlSettings;
128126
subscriptionName = builder.subscriptionName;
129127

130-
Preconditions.checkArgument(
131-
builder.ackExpirationPadding.compareTo(Duration.ZERO) > 0, "padding must be positive");
132-
Preconditions.checkArgument(
133-
builder.ackExpirationPadding.compareTo(Duration.ofSeconds(MIN_ACK_DEADLINE_SECONDS)) < 0,
134-
"padding must be less than %s seconds",
135-
MIN_ACK_DEADLINE_SECONDS);
136-
ackExpirationPadding = builder.ackExpirationPadding;
137128
maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
138129
clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock();
139130

@@ -228,12 +219,6 @@ public String getSubscriptionNameString() {
228219
return subscriptionName;
229220
}
230221

231-
/** Acknowledgement expiration padding. See {@link Builder#setAckExpirationPadding}. */
232-
@InternalApi
233-
Duration getAckExpirationPadding() {
234-
return ackExpirationPadding;
235-
}
236-
237222
/** The flow control settings the Subscriber is configured with. */
238223
public FlowControlSettings getFlowControlSettings() {
239224
return flowControlSettings;
@@ -331,14 +316,14 @@ public void run() {
331316
.start();
332317
}
333318

334-
private void startStreamingConnections() throws IOException {
319+
private void startStreamingConnections() {
335320
synchronized (streamingSubscriberConnections) {
336321
for (int i = 0; i < numPullers; i++) {
337322
streamingSubscriberConnections.add(
338323
new StreamingSubscriberConnection(
339324
subscriptionName,
340325
receiver,
341-
ackExpirationPadding,
326+
ACK_EXPIRATION_PADDING,
342327
maxAckExtensionPeriod,
343328
ackLatencyDistribution,
344329
subStub,
@@ -372,9 +357,6 @@ public void failed(State from, Throwable failure) {
372357

373358
private void stopAllStreamingConnections() {
374359
stopConnections(streamingSubscriberConnections);
375-
if (ackDeadlineUpdater != null) {
376-
ackDeadlineUpdater.cancel(true);
377-
}
378360
}
379361

380362
private void startConnections(
@@ -410,8 +392,6 @@ private void stopConnections(List<? extends ApiService> connections) {
410392

411393
/** Builder of {@link Subscriber Subscribers}. */
412394
public static final class Builder {
413-
private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.ofMillis(100);
414-
private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.ofSeconds(5);
415395
private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
416396

417397
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
@@ -423,7 +403,6 @@ public static final class Builder {
423403
String subscriptionName;
424404
MessageReceiver receiver;
425405

426-
Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;
427406
Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
428407

429408
FlowControlSettings flowControlSettings =
@@ -437,8 +416,6 @@ public static final class Builder {
437416
.setKeepAliveTime(Duration.ofMinutes(5))
438417
.build();
439418
HeaderProvider headerProvider = new NoHeaderProvider();
440-
HeaderProvider internalHeaderProvider =
441-
SubscriptionAdminSettings.defaultApiClientHeaderProviderBuilder().build();
442419
CredentialsProvider credentialsProvider =
443420
SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build();
444421
Optional<ApiClock> clock = Optional.absent();
@@ -478,21 +455,6 @@ public Builder setHeaderProvider(HeaderProvider headerProvider) {
478455
return this;
479456
}
480457

481-
/**
482-
* Sets the static header provider for getting internal (library-defined) headers. The header
483-
* provider will be called during client construction only once. The headers returned by the
484-
* provider will be cached and supplied as is for each request issued by the constructed client.
485-
* Some reserved headers can be overridden (e.g. Content-Type) or merged with the default value
486-
* (e.g. User-Agent) by the underlying transport layer.
487-
*
488-
* @param internalHeaderProvider the internal header provider
489-
* @return the builder
490-
*/
491-
Builder setInternalHeaderProvider(HeaderProvider internalHeaderProvider) {
492-
this.internalHeaderProvider = Preconditions.checkNotNull(internalHeaderProvider);
493-
return this;
494-
}
495-
496458
/**
497459
* Sets the flow control settings.
498460
*
@@ -523,25 +485,6 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
523485
return this;
524486
}
525487

526-
/**
527-
* Set acknowledgement expiration padding.
528-
*
529-
* <p>This is the time accounted before a message expiration is to happen, so the {@link
530-
* Subscriber} is able to send an ack extension beforehand.
531-
*
532-
* <p>This padding duration is configurable so you can account for network latency. A reasonable
533-
* number must be provided so messages don't expire because of network latency between when the
534-
* ack extension is required and when it reaches the Pub/Sub service.
535-
*
536-
* @param ackExpirationPadding must be greater or equal to {@link #MIN_ACK_EXPIRATION_PADDING}
537-
*/
538-
@InternalApi
539-
Builder setAckExpirationPadding(Duration ackExpirationPadding) {
540-
Preconditions.checkArgument(ackExpirationPadding.compareTo(MIN_ACK_EXPIRATION_PADDING) >= 0);
541-
this.ackExpirationPadding = ackExpirationPadding;
542-
return this;
543-
}
544-
545488
/**
546489
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
547490
*

branches/autosynth-os-login/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java

Lines changed: 0 additions & 111 deletions
This file was deleted.

0 commit comments

Comments
 (0)