Introducing auto/timed flush for new Batcher.#744
Introducing auto/timed flush for new Batcher.#744igorbernstein2 merged 1 commit intogoogleapis:masterfrom
Conversation
|
@igorbernstein2 Please have a look |
Codecov Report
@@ Coverage Diff @@
## master #744 +/- ##
==========================================
+ Coverage 77.72% 78% +0.27%
- Complexity 1094 1100 +6
==========================================
Files 198 198
Lines 4799 4823 +24
Branches 377 382 +5
==========================================
+ Hits 3730 3762 +32
+ Misses 898 889 -9
- Partials 171 172 +1
Continue to review full report at Codecov.
|
igorbernstein2
left a comment
There was a problem hiding this comment.
This needs a bit of work. Before updating the code, lets talk through how to address the issues.
The biggest issue here is the race condition between the scheduled flush thread and the caller's thread. The scheduled flush thread can interleave with the caller adding an element and the caller triggering a flush. This can lead to data loss.
Another issue is that scheduling the a flush for each element adds unnecessary overhead. I think it might better to track last flush time and have a single recurring task per batcher rather then per element.
Codecov Report
@@ Coverage Diff @@
## master #744 +/- ##
===========================================
+ Coverage 77.94% 78.14% +0.2%
- Complexity 1099 1101 +2
===========================================
Files 198 198
Lines 4806 4832 +26
Branches 379 383 +4
===========================================
+ Hits 3746 3776 +30
+ Misses 889 886 -3
+ Partials 171 170 -1
Continue to review full report at Codecov.
|
Codecov Report
@@ Coverage Diff @@
## master #744 +/- ##
==========================================
+ Coverage 77.72% 78% +0.27%
- Complexity 1094 1100 +6
==========================================
Files 198 198
Lines 4799 4823 +24
Branches 377 382 +5
==========================================
+ Hits 3730 3762 +32
+ Misses 898 889 -9
- Partials 171 172 +1
Continue to review full report at Codecov.
|
igorbernstein2
left a comment
There was a problem hiding this comment.
A few nits, but LGTM overall @vam-google, can you take a pass?
This PR adds DelayThreshold in Batcher without any default delay value. DelayThreshold cannot be less than 1 as 0 or less throws IllegalStateEx. - Accepting user provided executor to run the auto-flush. - Ongoing Runnable will be unscheduled after Batcher is GCed(PushCurrentBatchRunnable). - Added `elementLock` to skip race condition between user thread and gax-thread. - Empty batches are never sent to UnaryCallable - Added java docs for Delay threshold **Test to verify:** - Auto flush and blocking flush. - Elements are not leaking to more than one batch even if concurrent flushing happens. - Empty batches are never sent. - Refactored Mockito based test for Exception in the descriptor. Feedback Updates - Fix test case for blocking flush - Removed testBlockingFlush by dividing it to two individual test case. - Updated Java docs according to feedback
475f431 to
7c6462e
Compare
igorbernstein2
left a comment
There was a problem hiding this comment.
This looks good to me
| @Override | ||
| public void close() throws InterruptedException { | ||
| isClosed = true; | ||
| if (isClosed) return; |
There was a problem hiding this comment.
|
I'm going to merge this for now to unblock further work. |
This is related to new Batching API and is a followUp PR after #734
This change contains:
DelayThresholdinBatchingSettingswithout any default delay value.BatcherImplnow accepts user-providedscheduledExecutor, which auto triggerssendBatch()after a given delay.