Add StreamMessage.endWith(finalizer)#5198
Conversation
91f20b8 to
9557357
Compare
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #5198 +/- ##
===========================================
+ Coverage 0 73.96% +73.96%
- Complexity 0 20030 +20030
===========================================
Files 0 1721 +1721
Lines 0 73885 +73885
Branches 0 9402 +9402
===========================================
+ Hits 0 54651 +54651
- Misses 0 14788 +14788
- Partials 0 4446 +4446
☔ View full report in Codecov by Sentry. |
| downstream.onNext(finalizer.apply(cause)); | ||
| close0(null); | ||
| } else { | ||
| close(cause); |
There was a problem hiding this comment.
I think we can also call close0(cause) right away.
| * Emits the last value depending on whether this {@link StreamMessage} | ||
| * completes successfully or exceptionally. |
There was a problem hiding this comment.
I think it doesn't emit the value depending on whether it is completed successfully or exceptionally. It always emits a value. 😉
How about removing depending on?
65358ba to
643b16d
Compare
643b16d to
1dabea7
Compare
| @Nullable | ||
| private final T tail; | ||
| @Nullable | ||
| private final Function<@Nullable Throwable, ? extends T> finalizer; |
There was a problem hiding this comment.
How about completely removing tail and using finalizer only?
If trailers are empty or null, we may set a special function such as
static final Function<@Nullable Throwable, ? extends T> nullTrailers = () -> null;
if (finalizer == nullTrailers) {
// complete immediately.
}There was a problem hiding this comment.
// T tail -> removed
Func<> finallizer
..
public onError() { // upstream error
if (finalizer != null) {
// If we use finalizer only, upstream error always replaced with finalizer.apply, can't close(cause);
downstream.onNext(finalizer.apply(cause));
close0(null); // downstream.onComplete() with finalizer
} else {
close0(cause); // downstream.onError() with tail
}
}I worry that if we remove tail and use finalizer only, upstream error always replaced with finalizer.
But on our current logic with tail, upstream error is propagated by close0(cause) WDYT? 🤔
There was a problem hiding this comment.
Good point. How about allowing finalizer not to handle the cause?
If finalizer.apply(cause) returns null, we can assume users do not want to recover the exception and finish the stream exceptionally.
There was a problem hiding this comment.
I applied this change myself. PTAL. 🙇♂️
There was a problem hiding this comment.
oh sorry for late check, thanks for your update! 🙇
trustin
left a comment
There was a problem hiding this comment.
What would be the expected behavior when finalizer.apply() throws an exception? Could you make sure that the stream is closed even if a finalizer throws an exception so that the stream is not stalled? (+ test case please)
| try { | ||
| tail = finalizer.apply(cause); | ||
| } catch (Throwable ex) { | ||
| close0(ex); |
There was a problem hiding this comment.
How about leaving a log and closing with the original cause?
f08874f to
ac75feb
Compare
| * <p>Note that if {@code null} is returned by the {@link Function}, the {@link StreamMessage} will complete | ||
| * successfully without emitting an additional value when this stream is complete successfully, | ||
| * or complete exceptionally when this stream complete exceptionally. |
There was a problem hiding this comment.
@Test
void finalizer_return_null_upstream_thrown() {
// given
final StreamMessage<Integer> streamMessage = StreamMessage.of(2, 3, 4, 5);
final StreamMessage<Integer> aborted = streamMessage
.peek(val -> {
if (val == 5) {
throw new RuntimeException();
}
});
final Function<Throwable, Integer> finalizer = unused -> null;
final StreamMessage<Integer> publisher = new SurroundingPublisher<>(1, aborted, finalizer);
// when & then
StepVerifier.create(publisher)
.expectNext(1, 2, 3, 4)
.expectError(RuntimeException.class) // 👈 👈 upstreamException is propagated when finalizer return null
.verify();
}@ikhoon , If finalizer returns null and upstream thrown, finalize() propagate exception to downstream with close(upstreamException).
So I updated the comment in more detail~!
027baf2 to
72bb48f
Compare
|
Thanks a lot for adding this nice feature, @injae-kim! 🚀 🚀 🚀 |
Related issue line#4816 ### Motivation: > As a future work of line#4727, users might want to dynamically emit the last message depending on whether the upstream publisher completes successfully or exceptionally. - On line#4816, it's good to add `StreamMessage.endWith(finalizer)` ### Modifications: - Add `StreamMessage.endWith(finalizer)` ### Result: - Closes line#4816 - Now user can dynamically emit the last value depending on whether the `StreamMessage` completes successfully or exceptionally by using `StreamMessage.endWith(finalizer)`
Related issue #4816
Motivation:
StreamMessage#4816, it's good to addStreamMessage.endWith(finalizer)Modifications:
StreamMessage.endWith(finalizer)Result:
StreamMessage#4816StreamMessagecompletes successfully or exceptionally by usingStreamMessage.endWith(finalizer)