Skip to content

Add StreamMessage.endWith(finalizer)#5198

Merged
minwoox merged 10 commits intoline:mainfrom
injae-kim:stream-message-end-with
Oct 25, 2023
Merged

Add StreamMessage.endWith(finalizer)#5198
minwoox merged 10 commits intoline:mainfrom
injae-kim:stream-message-end-with

Conversation

@injae-kim
Copy link
Copy Markdown
Contributor

Related issue #4816

Motivation:

As a future work of #4727,
users might want to dynamically emit the last message depending on whether the upstream publisher completes successfully or exceptionally.

Modifications:

  • Add StreamMessage.endWith(finalizer)

Result:

@injae-kim injae-kim force-pushed the stream-message-end-with branch from 91f20b8 to 9557357 Compare September 21, 2023 15:17
@codecov
Copy link
Copy Markdown

codecov Bot commented Sep 21, 2023

Codecov Report

Attention: 7 lines in your changes are missing coverage. Please review.

Comparison is base (def2dd7) 0.00% compared to head (461b511) 73.96%.
Report is 49 commits behind head on main.

❗ Current head 461b511 differs from pull request most recent head 72bb48f. Consider uploading reports for the commit 72bb48f to get more accurate results

Additional details and impacted files
@@             Coverage Diff             @@
##             main    #5198       +/-   ##
===========================================
+ Coverage        0   73.96%   +73.96%     
- Complexity      0    20030    +20030     
===========================================
  Files           0     1721     +1721     
  Lines           0    73885    +73885     
  Branches        0     9402     +9402     
===========================================
+ Hits            0    54651    +54651     
- Misses          0    14788    +14788     
- Partials        0     4446     +4446     
Files Coverage Δ
...java/com/linecorp/armeria/common/HttpResponse.java 90.00% <100.00%> (ø)
...orp/armeria/common/PublisherBasedHttpResponse.java 100.00% <100.00%> (ø)
.../linecorp/armeria/common/stream/StreamMessage.java 77.01% <100.00%> (ø)
.../java/com/linecorp/armeria/common/HttpRequest.java 74.45% <0.00%> (ø)
...a/internal/common/stream/SurroundingPublisher.java 87.21% <71.42%> (ø)

... and 1716 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@minwoox minwoox added this to the 1.26.0 milestone Sep 25, 2023
Copy link
Copy Markdown
Contributor

@minwoox minwoox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks!

downstream.onNext(finalizer.apply(cause));
close0(null);
} else {
close(cause);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also call close0(cause) right away.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ updated!

Comment on lines +1079 to +1080
* Emits the last value depending on whether this {@link StreamMessage}
* completes successfully or exceptionally.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it doesn't emit the value depending on whether it is completed successfully or exceptionally. It always emits a value. 😉
How about removing depending on?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ updated!

Copy link
Copy Markdown
Contributor

@jrhee17 jrhee17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good once @minwoox 's comments are addressed. Thanks! 🙇 👍 🙇

@injae-kim injae-kim force-pushed the stream-message-end-with branch from 65358ba to 643b16d Compare October 9, 2023 05:34
@injae-kim injae-kim force-pushed the stream-message-end-with branch from 643b16d to 1dabea7 Compare October 9, 2023 05:57
@injae-kim injae-kim requested a review from ikhoon October 10, 2023 09:40
Comment on lines +57 to +60
@Nullable
private final T tail;
@Nullable
private final Function<@Nullable Throwable, ? extends T> finalizer;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about completely removing tail and using finalizer only?
If trailers are empty or null, we may set a special function such as

static final Function<@Nullable Throwable, ? extends T> nullTrailers = () -> null;

if (finalizer == nullTrailers) {
   // complete immediately. 
}

Copy link
Copy Markdown
Contributor Author

@injae-kim injae-kim Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// T tail -> removed
Func<> finallizer
..

public onError() { // upstream error
    if (finalizer != null) {
        // If we use finalizer only, upstream error always replaced with finalizer.apply, can't close(cause);
        downstream.onNext(finalizer.apply(cause));
        close0(null); // downstream.onComplete() with finalizer
    } else {
        close0(cause); // downstream.onError() with tail
    }
}

I worry that if we remove tail and use finalizer only, upstream error always replaced with finalizer.
But on our current logic with tail, upstream error is propagated by close0(cause) WDYT? 🤔

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. How about allowing finalizer not to handle the cause?
If finalizer.apply(cause) returns null, we can assume users do not want to recover the exception and finish the stream exceptionally.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I applied this change myself. PTAL. 🙇‍♂️

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry for late check, thanks for your update! 🙇

Comment thread core/src/main/java/com/linecorp/armeria/common/PublisherBasedHttpResponse.java Outdated
Copy link
Copy Markdown
Member

@trustin trustin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the expected behavior when finalizer.apply() throws an exception? Could you make sure that the stream is closed even if a finalizer throws an exception so that the stream is not stalled? (+ test case please)

Comment thread core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java Outdated
try {
tail = finalizer.apply(cause);
} catch (Throwable ex) {
close0(ex);
Copy link
Copy Markdown
Contributor

@minwoox minwoox Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about leaving a log and closing with the original cause?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

@injae-kim injae-kim force-pushed the stream-message-end-with branch from f08874f to ac75feb Compare October 24, 2023 14:32
Comment on lines +1102 to +1104
* <p>Note that if {@code null} is returned by the {@link Function}, the {@link StreamMessage} will complete
* successfully without emitting an additional value when this stream is complete successfully,
* or complete exceptionally when this stream complete exceptionally.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    @Test
    void finalizer_return_null_upstream_thrown() {
        // given
        final StreamMessage<Integer> streamMessage = StreamMessage.of(2, 3, 4, 5);
        final StreamMessage<Integer> aborted = streamMessage
                .peek(val -> {
                    if (val == 5) {
                        throw new RuntimeException();
                    }
                });
        final Function<Throwable, Integer> finalizer = unused -> null;
        final StreamMessage<Integer> publisher = new SurroundingPublisher<>(1, aborted, finalizer);

        // when & then
        StepVerifier.create(publisher)
                    .expectNext(1, 2, 3, 4)
                    .expectError(RuntimeException.class) // 👈 👈 upstreamException is propagated when finalizer return null
                    .verify();
    }

@ikhoon , If finalizer returns null and upstream thrown, finalize() propagate exception to downstream with close(upstreamException).

So I updated the comment in more detail~!

@injae-kim injae-kim force-pushed the stream-message-end-with branch from 027baf2 to 72bb48f Compare October 24, 2023 15:03
Copy link
Copy Markdown
Contributor

@ikhoon ikhoon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @injae-kim! ❤️❤️

@minwoox minwoox merged commit dd5b086 into line:main Oct 25, 2023
@minwoox
Copy link
Copy Markdown
Contributor

minwoox commented Oct 25, 2023

Thanks a lot for adding this nice feature, @injae-kim! 🚀 🚀 🚀

Bue-von-hon pushed a commit to Bue-von-hon/armeria that referenced this pull request Nov 10, 2023
Related issue line#4816 

### Motivation:
> As a future work of line#4727,
users might want to dynamically emit the last message depending on whether the upstream publisher completes successfully or exceptionally.

- On line#4816, it's good to add `StreamMessage.endWith(finalizer)`

### Modifications:

- Add `StreamMessage.endWith(finalizer)`

### Result:

- Closes line#4816 
- Now user can dynamically emit the last value depending on whether the `StreamMessage` completes successfully or exceptionally by using `StreamMessage.endWith(finalizer)`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Conditionally emit the last message of a StreamMessage

5 participants