Batcher implementation with Synchronized Batcher#flush(without triggers)#716
Batcher implementation with Synchronized Batcher#flush(without triggers)#716igorbernstein2 merged 9 commits intogoogleapis:masterfrom
Batcher#flush(without triggers)#716Conversation
|
@igorbernstein2 Please have a look. |
Codecov Report
@@ Coverage Diff @@
## master #716 +/- ##
============================================
+ Coverage 75.74% 77.65% +1.91%
- Complexity 1041 1091 +50
============================================
Files 196 197 +1
Lines 4679 4776 +97
Branches 363 371 +8
============================================
+ Hits 3544 3709 +165
+ Misses 975 898 -77
- Partials 160 169 +9
Continue to review full report at Codecov.
|
igorbernstein2
left a comment
There was a problem hiding this comment.
I add some first pass comments. Will take a deeper look in the next couple of days
| sendBatch(); | ||
| while (numOfRpcs.get() > 0) { | ||
| LockSupport.parkNanos(DEFAULT_FINISH_WAIT_NANOS); | ||
| } |
There was a problem hiding this comment.
I don't love this. Can we use a semaphore instead?
There was a problem hiding this comment.
I have added a semaphore implementation 0 permits. Please let me know if this impl looks good to you
private final AtomicInteger numOfRpcs = new AtomicInteger(0);
There was a problem hiding this comment.
Hmm, semaphore doesn't seem quite right for this. Sorry for misleading you on that
Maybe something like this:
AtomicInteger numOutstandingBatches = new AtomicInteger();
Object flushLock = new Object();
void sendBatch() {
numOutstandingBatches.increment();
}
void onBatchCompletion() {
if (numOutstandingBatches.decrementAndGet() == 0) {
flushLock.notifyAll();
}
}
void flush() {
sendBatch();
awaitAllOutstandingBatches();
}
void awaitAllOutstandingBatches() {
while(numOutstandingBatches.get() > 0) {
synchronized(lock) {
flushLock.wait();
}
}
}There was a problem hiding this comment.
Thanks for the detailed steps, I have implemented with the same for Batcher#flush(). Please have a look.
| * This class represent one logical Batch. It accumulates all the elements and it's corresponding | ||
| * future element results for one batch. | ||
| */ | ||
| class Batch { |
There was a problem hiding this comment.
My intention was not to declare type params for inner class... but Have updated it to private static class.
There was a problem hiding this comment.
Please add a comment to make that clear
…gers) - This implementation does not include any triggers for sending received elements for batching.(This will be added in followup PRs). - User has to explicitly call the `Batcher#flush()` to send the accumulated request for batching. - Here `BatcherImpl` expects `v2.BatchingDescriptor`, `v2.BatchingCallSetting`, UnaryCallable and a request prototype containing repetitive data which would be copied over to each batch along with the elements. - `v2.BatchingCallSettings` is an extension from existing `BatchingCallSettings`.
…chingCallSettings.java`
|
@igorbernstein2 Also, I decided to remove |
| sendBatch(); | ||
| while (numOfRpcs.get() > 0) { | ||
| LockSupport.parkNanos(DEFAULT_FINISH_WAIT_NANOS); | ||
| } |
There was a problem hiding this comment.
Hmm, semaphore doesn't seem quite right for this. Sorry for misleading you on that
Maybe something like this:
AtomicInteger numOutstandingBatches = new AtomicInteger();
Object flushLock = new Object();
void sendBatch() {
numOutstandingBatches.increment();
}
void onBatchCompletion() {
if (numOutstandingBatches.decrementAndGet() == 0) {
flushLock.notifyAll();
}
}
void flush() {
sendBatch();
awaitAllOutstandingBatches();
}
void awaitAllOutstandingBatches() {
while(numOutstandingBatches.get() > 0) {
synchronized(lock) {
flushLock.wait();
}
}
}
igorbernstein2
left a comment
There was a problem hiding this comment.
LGTM, except for the timeout.
I'll assume that you will deal with synchronization when implementing timed flush
@vam-google can you take a look as well?
| private void awaitAllOutstandingBatches() throws InterruptedException { | ||
| while (numOfOutstandingBatches.get() > 0) { | ||
| synchronized (flushLock) { | ||
| flushLock.wait(DEFAULT_WAIT_TIME_MS); |
There was a problem hiding this comment.
I'm not sure I understand why we need a timeout
There was a problem hiding this comment.
I added this as a defense, Also inspired by existing CBT-OperationalAccountant
There was a problem hiding this comment.
What is it defending against?
There was a problem hiding this comment.
I added this for the case of RPC taking too long, which would enable this thread to keep on checking the numOfOutStandingBatch.
I now realized that this would not add any meaning. The only update we are waiting here would be from onBatchComplete's notifyAll(), which will execute on all cases.
Thanks a lot for pointing it out. I will remove this timeout.
| private boolean isClosed = false; | ||
|
|
||
| private BatcherImpl(Builder<ElementT, ElementResultT, RequestT, ResponseT> builder) { | ||
| this.prototype = checkNotNull(builder.prototype, "RequestPrototype cannot be null."); |
There was a problem hiding this comment.
Please do not put period (.) at the end of messages like this to stay consistent with the rest of the codebase (applies to other places as well, like line 113).
Also the message probably should not use the class names to indicate what is null (either use field names i.e prototype instead of RequestPrototype, or regular langugae to describe what is null, otherwise it looks like the "class" itself is null).
| } | ||
|
|
||
| /** Builder for a BatcherImpl. */ | ||
| public static class Builder<ElementT, ElementResultT, RequestT, ResponseT> { |
There was a problem hiding this comment.
can this be @Autovalue? I'm not a big fan of @Autovalue but we already use it extensively in gax-java, so lets reuse it here if possible to reduce boilerplate code typing.
There was a problem hiding this comment.
I don't consider BatcherImpl is a value object. So i'm not sure how AutoValue can help here
There was a problem hiding this comment.
I don't think it has to be a value object, it can be anything, which can be constructed with a builder.
Now when I look at it, do we even need a builder here? I'm just trying to figure out if we can reduce boilerplate code here (either by generating builder with the autovalue annotation or not having it at all).
There was a problem hiding this comment.
I dont think a builder is necessary here. Given the choice between AutoValue and removing the builder I would prefer removing the builder
There was a problem hiding this comment.
I have removed the Builder from the current change and annotated this class with @AutoValue.
Just a side note, We may need to come back on this when introducing flow controllers. As after implementing triggers, we would be expecting 6 arguments(i.e. existing ones + BatchingSettings, FlowController, and an Executor).
There was a problem hiding this comment.
TLDR version: please remove AutoValue for the time being.
Longer version:
I've given this more thought, and I really think that AutoValue is really the wrong tool for this. In the best practices it states
Don't use AutoValue to implement value semantics unless you really want value semantics. In particular, you should never care about the difference between two equal instances.
This is definitely not the case for BatcherImpl since a Batcher has mutable state and we do care which instance we call flush on. Also, using AutoValue here implies that equality will use value semantics and be determined by the prototype, callable & batchingDescriptor:
BatcherImpl.Builder builder = ...;
Batcher b1 = builder.build();
Batcher b2 = builder.build();
b2.add(...)
b1.equals(b2) == true; Which makes no sense. I think the only meaningful equality here is referential and goes against the best practices of AutoValue.
Since you already removed the Builder until a later PR, please remove the use of AutoValue here since it buys you very little. Once that PR comes up I'll chat with vam about the best path forward.
|
|
||
| private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); | ||
| private final Object flushLock = new Object(); | ||
| private boolean isClosed = false; |
There was a problem hiding this comment.
This is only used by the single caller thread, so it doesn't need to be volatile
There was a problem hiding this comment.
Ok, but it is practically free to make it such, ans feels safer (a little bit of defensive programming never hurts in concurrent programming).
There was a problem hiding this comment.
I generally prefer to avoid defensive programming because you end up hiding the intention of the code. But I agree with you that volatile is free here. So if you feel strongly, then I won't object
There was a problem hiding this comment.
I would still add volatile
There was a problem hiding this comment.
@rahulKQL can you add the volatile and I will merge this PR
There was a problem hiding this comment.
I am sorry, I haven't marked this ongoing conversation as closed.
I have already added a volatile for isClosed flag.
|
@vam-google, @igorbernstein2, I have tried to address your feedback, Please have a fresh look at this change. |
Now BatcherImpl is created with @autovalue. Also made `isClosed` field as volatile.
|
@vam-google Thanks for the review! |
This is a follow-up PR for #692
What this PR contains
Batcher#flush()to send the accumulated request for batching.BatcherImplexpectsv2.BatchingDescriptor,UnaryCallableand arequest prototypecontaining repetitive data which would be copied over to each batch along with the bundle of elements.v2.BatchingCallSettingsis an extension from existingBatchingCallSettings.250ms.Follow-ups
Once this change is merged I will raise a separate PR with triggers, thresholds, and flowController for
BatcherImpl.