Skip to content

Commit 22dffdf

Browse files
authored
Implement QuicStreamChannel.bytesBeforeUnwritable() (java-native-access#264)
Motivation: Some users may depend on QuicStreamChannel.bytesBeforeUnwritable() to make decisions on how much they will try to write. Modifications: - Add implementation of QuicStreamChannel.bytesBeforeUnwritable() by keep track of the stream capacity - Add unit test Result: Be able to depend on QuicStreamChannel.bytesBeforeUnwritable()
1 parent d08d177 commit 22dffdf

5 files changed

Lines changed: 219 additions & 49 deletions

File tree

src/main/c/netty_quic_quiche.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,14 @@ static jlong netty_quiche_conn_readable(JNIEnv* env, jclass clazz, jlong conn) {
303303
return (jlong) iter;
304304
}
305305

306+
static jlong netty_quiche_conn_writable(JNIEnv* env, jclass clazz, jlong conn) {
307+
quiche_stream_iter* iter = quiche_conn_writable((quiche_conn *) conn);
308+
if (iter == NULL) {
309+
return -1;
310+
}
311+
return (jlong) iter;
312+
}
313+
306314
static void netty_quiche_stream_iter_free(JNIEnv* env, jclass clazz, jlong iter) {
307315
quiche_stream_iter_free((quiche_stream_iter*) iter);
308316
}
@@ -500,6 +508,7 @@ static const JNINativeMethod fixed_method_table[] = {
500508
{ "quiche_conn_timeout_as_nanos", "(J)J", (void *) netty_quiche_conn_timeout_as_nanos },
501509
{ "quiche_conn_on_timeout", "(J)V", (void *) netty_quiche_conn_on_timeout },
502510
{ "quiche_conn_readable", "(J)J", (void *) netty_quiche_conn_readable },
511+
{ "quiche_conn_writable", "(J)J", (void *) netty_quiche_conn_writable },
503512
{ "quiche_stream_iter_free", "(J)V", (void *) netty_quiche_stream_iter_free },
504513
{ "quiche_stream_iter_next", "(J[J)I", (void *) netty_quiche_stream_iter_next },
505514
{ "quiche_conn_dgram_max_writable_len", "(J)I", (void* ) netty_quiche_conn_dgram_max_writable_len },

src/main/java/io/netty/incubator/codec/quic/Quiche.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,12 @@ static native int quiche_conn_stream_priority(
372372
*/
373373
static native long quiche_conn_readable(long connAddr);
374374

375+
/**
376+
* See
377+
* <a href="https://github.com/cloudflare/quiche/blob/0.6.0/include/quiche.h#L285">quiche_conn_writable</a>.
378+
*/
379+
static native long quiche_conn_writable(long connAddr);
380+
375381
/**
376382
* See
377383
* <a href="https://github.com/cloudflare/quiche/blob/0.6.0/include/quiche.h#L329">quiche_stream_iter_next</a>.

src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ public void operationComplete(ChannelFuture future) {
9999

100100
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
101101
private final long[] readableStreams = new long[128];
102+
private final long[] writableStreams = new long[128];
103+
102104
private final LongObjectMap<QuicheQuicStreamChannel> streams = new LongObjectHashMap<>();
103-
private final Queue<Long> flushPendingQueue = new ArrayDeque<>();
104105
private final QuicheQuicChannelConfig config;
105106
private final boolean server;
106107
private final QuicStreamIdGenerator idGenerator;
@@ -316,7 +317,6 @@ void forceClose() {
316317
state = CLOSED;
317318

318319
closeStreams();
319-
flushPendingQueue.clear();
320320

321321
if (finBuffer != null) {
322322
finBuffer.release();
@@ -782,53 +782,56 @@ void writable() {
782782
}
783783
}
784784

785-
void streamHasPendingWrites(long streamId) {
786-
flushPendingQueue.add(streamId);
785+
int streamCapacity(long streamId) {
786+
if (connection.isClosed()) {
787+
return 0;
788+
}
789+
return Quiche.quiche_conn_stream_capacity(connection.address(), streamId);
787790
}
788791

789792
private boolean handleWritableStreams() {
790-
int pending = flushPendingQueue.size();
791-
if (isConnDestroyed() || pending == 0) {
793+
if (isConnDestroyed()) {
792794
return false;
793795
}
794796
inHandleWritableStreams = true;
795797
try {
796798
long connAddr = connection.address();
797799
boolean mayNeedWrite = false;
800+
798801
if (Quiche.quiche_conn_is_established(connAddr) ||
799802
Quiche.quiche_conn_is_in_early_data(connAddr)) {
800-
// We only want to process the number of channels that were in the queue when we entered
801-
// handleWritableStreams(). Otherwise we may would loop forever as a channel may add itself again
802-
// if the write was again partial.
803-
for (int i = 0; i < pending; i++) {
804-
Long streamId = flushPendingQueue.poll();
805-
if (streamId == null) {
806-
break;
807-
}
808-
// Checking quiche_conn_stream_capacity(...) is cheaper then calling channel.writable() just
809-
// to notice that we can not write again.
810-
int capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
811-
if (capacity == 0) {
812-
// Still not writable, put back in the queue.
813-
flushPendingQueue.add(streamId);
814-
} else {
815-
long sid = streamId;
816-
QuicheQuicStreamChannel channel = streams.get(sid);
817-
if (channel != null) {
818-
if (capacity > 0) {
819-
mayNeedWrite = true;
820-
channel.writable(capacity);
821-
} else {
822-
if (!Quiche.quiche_conn_stream_finished(connAddr, sid)) {
823-
// Only fire an exception if the error was not caused because the stream is
824-
// considered finished.
825-
channel.pipeline().fireExceptionCaught(Quiche.newException(capacity));
803+
long writableIterator = Quiche.quiche_conn_writable(connAddr);
804+
805+
try {
806+
// For streams we always process all streams when at least on read was requested.
807+
for (;;) {
808+
int writable = Quiche.quiche_stream_iter_next(
809+
writableIterator, writableStreams);
810+
for (int i = 0; i < writable; i++) {
811+
long streamId = writableStreams[i];
812+
QuicheQuicStreamChannel streamChannel = streams.get(streamId);
813+
if (streamChannel != null) {
814+
int capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
815+
if (capacity < 0) {
816+
if (!Quiche.quiche_conn_stream_finished(connAddr, streamId)) {
817+
// Only fire an exception if the error was not caused because the stream is
818+
// considered finished.
819+
streamChannel.pipeline().fireExceptionCaught(Quiche.newException(capacity));
820+
}
821+
// Let's close the channel if quiche_conn_stream_capacity(...) returns an error.
822+
streamChannel.forceClose();
823+
} else if (streamChannel.writable(capacity)) {
824+
mayNeedWrite = true;
826825
}
827-
// Let's close the channel if quiche_conn_stream_capacity(...) returns an error.
828-
channel.forceClose();
829826
}
830827
}
828+
if (writable < writableStreams.length) {
829+
// We did handle all writable streams.
830+
break;
831+
}
831832
}
833+
} finally {
834+
Quiche.quiche_stream_iter_free(writableIterator);
832835
}
833836
}
834837
return mayNeedWrite;
@@ -1343,6 +1346,7 @@ private QuicheQuicStreamChannel addNewStreamChannel(long streamId) {
13431346
QuicheQuicChannel.this, streamId);
13441347
QuicheQuicStreamChannel old = streams.put(streamId, streamChannel);
13451348
assert old == null;
1349+
streamChannel.writable(streamCapacity(streamId));
13461350
return streamChannel;
13471351
}
13481352
}

src/main/java/io/netty/incubator/codec/quic/QuicheQuicStreamChannel.java

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ final class QuicheQuicStreamChannel extends DefaultAttributeMap implements QuicS
7474
private volatile boolean inputShutdown;
7575
private volatile boolean outputShutdown;
7676
private volatile QuicStreamPriority priority;
77+
private volatile int capacity;
7778

7879
QuicheQuicStreamChannel(QuicheQuicChannel parent, long streamId) {
7980
this.parent = parent;
@@ -328,12 +329,16 @@ public boolean isWritable() {
328329

329330
@Override
330331
public long bytesBeforeUnwritable() {
331-
return 0;
332+
return capacity;
332333
}
333334

334335
@Override
335336
public long bytesBeforeWritable() {
336-
return 0;
337+
if (writable) {
338+
return 0;
339+
}
340+
// Just return something positive for now
341+
return 8;
337342
}
338343

339344
@Override
@@ -359,8 +364,18 @@ public int compareTo(Channel o) {
359364
/**
360365
* Stream is writable.
361366
*/
362-
void writable(@SuppressWarnings("unused") int capacity) {
363-
((QuicStreamChannelUnsafe) unsafe()).writeQueued();
367+
boolean writable(@SuppressWarnings("unused") int capacity) {
368+
this.capacity = capacity;
369+
boolean mayNeedWrite = ((QuicStreamChannelUnsafe) unsafe()).writeQueued();
370+
updateWritabilityIfNeeded(capacity > 0);
371+
return mayNeedWrite;
372+
}
373+
374+
private void updateWritabilityIfNeeded(boolean newWritable) {
375+
if (writable != newWritable) {
376+
writable = newWritable;
377+
pipeline.fireChannelWritabilityChanged();
378+
}
364379
}
365380

366381
/**
@@ -559,29 +574,32 @@ private void closeIfNeeded(boolean wasFinSent) {
559574
}
560575
}
561576

562-
void writeQueued() {
577+
boolean writeQueued() {
563578
boolean wasFinSent = QuicheQuicStreamChannel.this.finSent;
564579
inWriteQueued = true;
565580
try {
581+
if (queue.isEmpty()) {
582+
return false;
583+
}
584+
boolean written = false;
566585
for (;;) {
567586
Object msg = queue.current();
568587
if (msg == null) {
569588
break;
570589
}
571590
try {
572591
if (!write0(msg)) {
573-
return;
592+
return written;
574593
}
575594
} catch (Exception e) {
576595
queue.remove().setFailure(e);
577596
continue;
578597
}
579598
queue.remove().setSuccess();
599+
written = true;
580600
}
581-
if (!writable) {
582-
writable = true;
583-
pipeline.fireChannelWritabilityChanged();
584-
}
601+
updateWritabilityIfNeeded(true);
602+
return written;
585603
} finally {
586604
closeIfNeeded(wasFinSent);
587605
inWriteQueued = false;
@@ -626,21 +644,24 @@ public void write(Object msg, ChannelPromise promise) {
626644
}
627645

628646
boolean wasFinSent = QuicheQuicStreamChannel.this.finSent;
647+
boolean mayNeedWritabilityUpdate = false;
629648
try {
630649
if (write0(msg)) {
631650
ReferenceCountUtil.release(msg);
632651
promise.setSuccess();
652+
mayNeedWritabilityUpdate = capacity == 0;
633653
} else {
634654
queue.add(msg, promise);
635-
if (writable) {
636-
writable = false;
637-
pipeline.fireChannelWritabilityChanged();
638-
}
655+
mayNeedWritabilityUpdate = true;
639656
}
640657
} catch (Exception e) {
641658
ReferenceCountUtil.release(msg);
642659
promise.setFailure(e);
660+
mayNeedWritabilityUpdate = capacity == 0;
643661
} finally {
662+
if (mayNeedWritabilityUpdate) {
663+
updateWritabilityIfNeeded(false);
664+
}
644665
closeIfNeeded(wasFinSent);
645666
}
646667
}
@@ -673,8 +694,13 @@ private boolean write0(Object msg) throws Exception {
673694
try {
674695
do {
675696
int res = parent().streamSend(streamId(), buffer, fin);
697+
698+
// Update the capacity as well.
699+
int cap = parent.streamCapacity(streamId());
700+
if (cap >= 0) {
701+
capacity = cap;
702+
}
676703
if (Quiche.throwIfError(res) || res == 0) {
677-
parent.streamHasPendingWrites(streamId());
678704
return false;
679705
}
680706
sendSomething = true;

0 commit comments

Comments
 (0)