Skip to content

Commit 27a9397

Browse files
author
Brian Chen
authored
feat: add max/min throttling options to BulkWriterOptions (#400)
1 parent 5444fed commit 27a9397

6 files changed

Lines changed: 280 additions & 35 deletions

File tree

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ final class BulkWriter implements AutoCloseable {
5353
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping
5454
* up traffic</a>
5555
*/
56-
private static final int STARTING_MAXIMUM_OPS_PER_SECOND = 500;
56+
static final int DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND = 500;
5757

5858
/**
5959
* The rate by which to increase the capacity as specified by the 500/50/5 rule.
@@ -97,22 +97,49 @@ final class BulkWriter implements AutoCloseable {
9797
private final ExponentialRetryAlgorithm backoff;
9898
private TimedAttemptSettings nextAttempt;
9999

100-
BulkWriter(FirestoreImpl firestore, boolean enableThrottling) {
100+
BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) {
101101
this.firestore = firestore;
102102
this.backoff =
103103
new ExponentialRetryAlgorithm(
104104
firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock());
105105
this.nextAttempt = backoff.createFirstAttempt();
106106
this.firestoreExecutor = firestore.getClient().getExecutor();
107107

108-
if (enableThrottling) {
109-
rateLimiter =
108+
if (!options.getThrottlingEnabled()) {
109+
this.rateLimiter =
110110
new RateLimiter(
111-
STARTING_MAXIMUM_OPS_PER_SECOND,
112-
RATE_LIMITER_MULTIPLIER,
113-
RATE_LIMITER_MULTIPLIER_MILLIS);
111+
Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE);
114112
} else {
115-
rateLimiter = new RateLimiter(Integer.MAX_VALUE, Double.MAX_VALUE, Integer.MAX_VALUE);
113+
double startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND;
114+
double maxRate = Double.POSITIVE_INFINITY;
115+
116+
if (options.getInitialOpsPerSecond() != null) {
117+
startingRate = options.getInitialOpsPerSecond();
118+
}
119+
120+
if (options.getMaxOpsPerSecond() != null) {
121+
maxRate = options.getMaxOpsPerSecond();
122+
}
123+
124+
// The initial validation step ensures that the maxOpsPerSecond is greater than
125+
// initialOpsPerSecond. If this inequality is true, that means initialOpsPerSecond was not
126+
// set and maxOpsPerSecond is less than the default starting rate.
127+
if (maxRate < startingRate) {
128+
startingRate = maxRate;
129+
}
130+
131+
// Ensure that the batch size is not larger than the number of allowed
132+
// operations per second.
133+
if (startingRate < maxBatchSize) {
134+
this.maxBatchSize = (int) startingRate;
135+
}
136+
137+
this.rateLimiter =
138+
new RateLimiter(
139+
(int) startingRate,
140+
RATE_LIMITER_MULTIPLIER,
141+
RATE_LIMITER_MULTIPLIER_MILLIS,
142+
(int) maxRate);
116143
}
117144
}
118145

@@ -679,4 +706,9 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) {
679706
void setMaxBatchSize(int size) {
680707
maxBatchSize = size;
681708
}
709+
710+
@VisibleForTesting
711+
RateLimiter getRateLimiter() {
712+
return rateLimiter;
713+
}
682714
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java

Lines changed: 112 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,123 @@
1616

1717
package com.google.cloud.firestore;
1818

19-
import javax.annotation.Nonnull;
19+
import com.google.auto.value.AutoValue;
20+
import javax.annotation.Nullable;
2021

21-
/** Options used to disable request throttling in BulkWriter. */
22-
final class BulkWriterOptions {
23-
24-
private final boolean enableThrottling;
25-
26-
private BulkWriterOptions(boolean enableThrottling) {
27-
this.enableThrottling = enableThrottling;
28-
}
22+
/** Options used to configure request throttling in BulkWriter. */
23+
@AutoValue
24+
abstract class BulkWriterOptions {
25+
/**
26+
* Return whether throttling is enabled.
27+
*
28+
* @return Whether throttling is enabled.
29+
*/
30+
abstract boolean getThrottlingEnabled();
2931

30-
boolean isThrottlingEnabled() {
31-
return enableThrottling;
32-
}
32+
/**
33+
* Returns the initial maximum number of operations per second allowed by the throttler.
34+
*
35+
* @return The initial maximum number of operations per second allowed by the throttler.
36+
*/
37+
@Nullable
38+
abstract Double getInitialOpsPerSecond();
3339

3440
/**
35-
* An options object that will disable throttling in the created BulkWriter.
41+
* Returns the maximum number of operations per second allowed by the throttler.
3642
*
37-
* @return The BulkWriterOptions object.
43+
* <p>The throttler's allowed operations per second does not ramp up past the specified operations
44+
* per second.
45+
*
46+
* @return The maximum number of operations per second allowed by the throttler.
3847
*/
39-
@Nonnull
40-
public static BulkWriterOptions withThrottlingDisabled() {
41-
return new BulkWriterOptions(false);
48+
@Nullable
49+
abstract Double getMaxOpsPerSecond();
50+
51+
static Builder builder() {
52+
return new AutoValue_BulkWriterOptions.Builder()
53+
.setMaxOpsPerSecond(null)
54+
.setInitialOpsPerSecond(null)
55+
.setThrottlingEnabled(true);
56+
}
57+
58+
abstract Builder toBuilder();
59+
60+
@AutoValue.Builder
61+
abstract static class Builder {
62+
/**
63+
* Sets whether throttling should be enabled. By default, throttling is enabled.
64+
*
65+
* @param enabled Whether throttling should be enabled.
66+
*/
67+
abstract Builder setThrottlingEnabled(boolean enabled);
68+
69+
/**
70+
* Set the initial maximum number of operations per second allowed by the throttler.
71+
*
72+
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the
73+
* throttler.
74+
*/
75+
abstract Builder setInitialOpsPerSecond(@Nullable Double initialOpsPerSecond);
76+
77+
/**
78+
* Set the initial maximum number of operations per second allowed by the throttler.
79+
*
80+
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the
81+
* throttler.
82+
*/
83+
Builder setInitialOpsPerSecond(int initialOpsPerSecond) {
84+
return setInitialOpsPerSecond(new Double(initialOpsPerSecond));
85+
}
86+
87+
/**
88+
* Set the maximum number of operations per second allowed by the throttler.
89+
*
90+
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler.
91+
* The throttler's allowed operations per second does not ramp up past the specified
92+
* operations per second.
93+
*/
94+
abstract Builder setMaxOpsPerSecond(@Nullable Double maxOpsPerSecond);
95+
96+
/**
97+
* Set the maximum number of operations per second allowed by the throttler.
98+
*
99+
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler.
100+
* The throttler's allowed operations per second does not ramp up past the specified
101+
* operations per second.
102+
*/
103+
Builder setMaxOpsPerSecond(int maxOpsPerSecond) {
104+
return setMaxOpsPerSecond(new Double(maxOpsPerSecond));
105+
}
106+
107+
abstract BulkWriterOptions autoBuild();
108+
109+
BulkWriterOptions build() {
110+
BulkWriterOptions options = autoBuild();
111+
Double initialRate = options.getInitialOpsPerSecond();
112+
Double maxRate = options.getMaxOpsPerSecond();
113+
114+
if (initialRate != null && initialRate < 1) {
115+
throw FirestoreException.invalidState(
116+
"Value for argument 'initialOpsPerSecond' must be greater than 1, but was: "
117+
+ initialRate.intValue());
118+
}
119+
120+
if (maxRate != null && maxRate < 1) {
121+
throw FirestoreException.invalidState(
122+
"Value for argument 'maxOpsPerSecond' must be greater than 1, but was: "
123+
+ maxRate.intValue());
124+
}
125+
126+
if (maxRate != null && initialRate != null && initialRate > maxRate) {
127+
throw FirestoreException.invalidState(
128+
"'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'.");
129+
}
130+
131+
if (!options.getThrottlingEnabled() && (maxRate != null || initialRate != null)) {
132+
throw FirestoreException.invalidState(
133+
"Cannot set 'initialOpsPerSecond' or 'maxOpsPerSecond' when 'throttlingEnabled' is set to false.");
134+
}
135+
return options;
136+
}
42137
}
43138
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,12 @@ public WriteBatch batch() {
9494

9595
@Nonnull
9696
BulkWriter bulkWriter() {
97-
return new BulkWriter(this, /* enableThrottling= */ true);
97+
return new BulkWriter(this, BulkWriterOptions.builder().setThrottlingEnabled(true).build());
9898
}
9999

100100
@Nonnull
101101
BulkWriter bulkWriter(BulkWriterOptions options) {
102-
return new BulkWriter(this, options.isThrottlingEnabled());
102+
return new BulkWriter(this, options);
103103
}
104104

105105
@Nonnull

google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,31 +38,48 @@ class RateLimiter {
3838
private final double multiplier;
3939
private final int multiplierMillis;
4040
private final long startTimeMillis;
41+
private final int maximumRate;
4142

4243
private int availableTokens;
4344
private long lastRefillTimeMillis;
4445

45-
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis) {
46-
this(initialCapacity, multiplier, multiplierMillis, new Date().getTime());
46+
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, int maximumRate) {
47+
this(initialCapacity, multiplier, multiplierMillis, maximumRate, new Date().getTime());
4748
}
4849

4950
/**
5051
* @param initialCapacity Initial maximum number of operations per second.
5152
* @param multiplier Rate by which to increase the capacity.
5253
* @param multiplierMillis How often the capacity should increase in milliseconds.
54+
* @param maximumRate Maximum number of allowed operations per second. The number of tokens added
55+
* per second will never exceed this number.
5356
* @param startTimeMillis The starting time in epoch milliseconds that the rate limit is based on.
5457
* Used for testing the limiter.
5558
*/
56-
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, long startTimeMillis) {
59+
RateLimiter(
60+
int initialCapacity,
61+
double multiplier,
62+
int multiplierMillis,
63+
int maximumRate,
64+
long startTimeMillis) {
5765
this.initialCapacity = initialCapacity;
5866
this.multiplier = multiplier;
5967
this.multiplierMillis = multiplierMillis;
68+
this.maximumRate = maximumRate;
6069
this.startTimeMillis = startTimeMillis;
6170

6271
this.availableTokens = initialCapacity;
6372
this.lastRefillTimeMillis = startTimeMillis;
6473
}
6574

75+
public int getInitialCapacity() {
76+
return initialCapacity;
77+
}
78+
79+
public int getMaximumRate() {
80+
return maximumRate;
81+
}
82+
6683
public boolean tryMakeRequest(int numOperations) {
6784
return tryMakeRequest(numOperations, new Date().getTime());
6885
}
@@ -132,7 +149,10 @@ private void refillTokens(long requestTimeMillis) {
132149
public int calculateCapacity(long requestTimeMillis) {
133150
long millisElapsed = requestTimeMillis - startTimeMillis;
134151
int operationsPerSecond =
135-
(int) (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity);
152+
Math.min(
153+
(int)
154+
(Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity),
155+
maximumRate);
136156
return operationsPerSecond;
137157
}
138158
}

0 commit comments

Comments
 (0)