|
3 | 3 | #include "application.h"
|
4 | 4 | #include <async_wrap-inl.h>
|
5 | 5 | #include <debug_utils-inl.h>
|
| 6 | +#include <ngtcp2/ngtcp2.h> |
6 | 7 | #include <node_bob.h>
|
7 | 8 | #include <node_sockaddr-inl.h>
|
8 | 9 | #include <uv.h>
|
@@ -95,6 +96,20 @@ Maybe<Session::Application_Options> Session::Application_Options::From(
|
95 | 96 | return Just<Application_Options>(options);
|
96 | 97 | }
|
97 | 98 |
|
| 99 | +// ============================================================================ |
| 100 | + |
| 101 | +std::string Session::Application::StreamData::ToString() const { |
| 102 | + DebugIndentScope indent; |
| 103 | + auto prefix = indent.Prefix(); |
| 104 | + std::string res("{"); |
| 105 | + res += prefix + "count: " + std::to_string(count); |
| 106 | + res += prefix + "remaining: " + std::to_string(remaining); |
| 107 | + res += prefix + "id: " + std::to_string(id); |
| 108 | + res += prefix + "fin: " + std::to_string(fin); |
| 109 | + res += indent.Close(); |
| 110 | + return res; |
| 111 | +} |
| 112 | + |
98 | 113 | Session::Application::Application(Session* session, const Options& options)
|
99 | 114 | : session_(session) {}
|
100 | 115 |
|
@@ -189,7 +204,7 @@ Packet* Session::Application::CreateStreamDataPacket() {
|
189 | 204 | return Packet::Create(env(),
|
190 | 205 | session_->endpoint_.get(),
|
191 | 206 | session_->remote_address_,
|
192 |
| - ngtcp2_conn_get_max_tx_udp_payload_size(*session_), |
| 207 | + session_->max_packet_size(), |
193 | 208 | "stream data");
|
194 | 209 | }
|
195 | 210 |
|
@@ -221,141 +236,188 @@ void Session::Application::StreamReset(Stream* stream,
|
221 | 236 | }
|
222 | 237 |
|
223 | 238 | void Session::Application::SendPendingData() {
|
| 239 | + static constexpr size_t kMaxPackets = 32; |
224 | 240 | Debug(session_, "Application sending pending data");
|
225 | 241 | PathStorage path;
|
| 242 | + StreamData stream_data; |
226 | 243 |
|
227 |
| - Packet* packet = nullptr; |
228 |
| - uint8_t* pos = nullptr; |
229 |
| - int err = 0; |
| 244 | + // The maximum size of packet to create. |
| 245 | + const size_t max_packet_size = session_->max_packet_size(); |
230 | 246 |
|
231 |
| - size_t maxPacketCount = std::min(static_cast<size_t>(64000), |
232 |
| - ngtcp2_conn_get_send_quantum(*session_)); |
233 |
| - size_t packetSendCount = 0; |
| 247 | + // The maximum number of packets to send in this call to SendPendingData. |
| 248 | + const size_t max_packet_count = std::min( |
| 249 | + kMaxPackets, ngtcp2_conn_get_send_quantum(*session_) / max_packet_size); |
234 | 250 |
|
235 |
| - const auto updateTimer = [&] { |
236 |
| - Debug(session_, "Application updating the session timer"); |
237 |
| - ngtcp2_conn_update_pkt_tx_time(*session_, uv_hrtime()); |
238 |
| - session_->UpdateTimer(); |
239 |
| - }; |
| 251 | + // The number of packets that have been sent in this call to SendPendingData. |
| 252 | + size_t packet_send_count = 0; |
240 | 253 |
|
241 |
| - const auto congestionLimited = [&](auto packet) { |
242 |
| - auto len = pos - ngtcp2_vec(*packet).base; |
243 |
| - // We are either congestion limited or done. |
244 |
| - if (len) { |
245 |
| - // Some data was serialized into the packet. We need to send it. |
246 |
| - packet->Truncate(len); |
247 |
| - session_->Send(std::move(packet), path); |
248 |
| - } |
| 254 | + Packet* packet = nullptr; |
| 255 | + uint8_t* pos = nullptr; |
| 256 | + uint8_t* begin = nullptr; |
249 | 257 |
|
250 |
| - updateTimer(); |
| 258 | + auto ensure_packet = [&] { |
| 259 | + if (packet == nullptr) { |
| 260 | + packet = CreateStreamDataPacket(); |
| 261 | + if (packet == nullptr) return false; |
| 262 | + pos = begin = ngtcp2_vec(*packet).base; |
| 263 | + } |
| 264 | + DCHECK_NOT_NULL(packet); |
| 265 | + DCHECK_NOT_NULL(pos); |
| 266 | + DCHECK_NOT_NULL(begin); |
| 267 | + return true; |
251 | 268 | };
|
252 | 269 |
|
| 270 | + // We're going to enter a loop here to prepare and send no more than |
| 271 | + // max_packet_count packets. |
253 | 272 | for (;;) {
|
254 |
| - ssize_t ndatalen; |
255 |
| - StreamData stream_data; |
256 |
| - |
257 |
| - err = GetStreamData(&stream_data); |
| 273 | + // ndatalen is the amount of stream data that was accepted into the packet. |
| 274 | + ssize_t ndatalen = 0; |
258 | 275 |
|
259 |
| - if (err < 0) { |
| 276 | + // Make sure we have a packet to write data into. |
| 277 | + if (!ensure_packet()) { |
| 278 | + Debug(session_, "Failed to create packet for stream data"); |
| 279 | + // Doh! Could not create a packet. Time to bail. |
260 | 280 | session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
|
261 | 281 | return session_->Close(Session::CloseMethod::SILENT);
|
262 | 282 | }
|
263 | 283 |
|
264 |
| - if (packet == nullptr) { |
265 |
| - packet = CreateStreamDataPacket(); |
266 |
| - if (packet == nullptr) { |
267 |
| - session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL); |
268 |
| - return session_->Close(Session::CloseMethod::SILENT); |
269 |
| - } |
270 |
| - pos = ngtcp2_vec(*packet).base; |
| 284 | + // The stream_data is the next block of data from the application stream. |
| 285 | + if (GetStreamData(&stream_data) < 0) { |
| 286 | + Debug(session_, "Application failed to get stream data"); |
| 287 | + session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL); |
| 288 | + packet->Done(UV_ECANCELED); |
| 289 | + return session_->Close(Session::CloseMethod::SILENT); |
271 | 290 | }
|
272 | 291 |
|
273 |
| - ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data); |
| 292 | + // If we got here, we were at least successful in checking for stream data. |
| 293 | + // There might not be any stream data to send. |
| 294 | + Debug(session_, "Application using stream data: %s", stream_data); |
| 295 | + |
| 296 | + // Awesome, let's write our packet! |
| 297 | + ssize_t nwrite = |
| 298 | + WriteVStream(&path, pos, &ndatalen, max_packet_size, stream_data); |
| 299 | + Debug(session_, "Application accepted %zu bytes into packet", ndatalen); |
274 | 300 |
|
275 |
| - if (nwrite <= 0) { |
| 301 | + // A negative nwrite value indicates either an error or that there is more |
| 302 | + // data to write into the packet. |
| 303 | + if (nwrite < 0) { |
276 | 304 | switch (nwrite) {
|
277 |
| - case 0: |
278 |
| - if (stream_data.id >= 0) ResumeStream(stream_data.id); |
279 |
| - return congestionLimited(std::move(packet)); |
280 | 305 | case NGTCP2_ERR_STREAM_DATA_BLOCKED: {
|
281 |
| - session().StreamDataBlocked(stream_data.id); |
282 |
| - if (session().max_data_left() == 0) { |
283 |
| - if (stream_data.id >= 0) ResumeStream(stream_data.id); |
284 |
| - return congestionLimited(std::move(packet)); |
285 |
| - } |
286 |
| - CHECK_LE(ndatalen, 0); |
| 306 | + // We could not write any data for this stream into the packet because |
| 307 | + // the flow control for the stream itself indicates that the stream |
| 308 | + // is blocked. We'll skip and move on to the next stream. |
| 309 | + // ndatalen = -1 means that no stream data was accepted into the |
| 310 | + // packet, which is what we want here. |
| 311 | + DCHECK_EQ(ndatalen, -1); |
| 312 | + DCHECK(stream_data.stream); |
| 313 | + session_->StreamDataBlocked(stream_data.id); |
287 | 314 | continue;
|
288 | 315 | }
|
289 | 316 | case NGTCP2_ERR_STREAM_SHUT_WR: {
|
290 |
| - // Indicates that the writable side of the stream has been closed |
| 317 | + // Indicates that the writable side of the stream should be closed |
291 | 318 | // locally or the stream is being reset. In either case, we can't send
|
292 | 319 | // any stream data!
|
293 |
| - CHECK_GE(stream_data.id, 0); |
294 |
| - // We need to notify the stream that the writable side has been closed |
295 |
| - // and no more outbound data can be sent. |
296 |
| - CHECK_LE(ndatalen, 0); |
297 |
| - auto stream = session_->FindStream(stream_data.id); |
298 |
| - if (stream) stream->EndWritable(); |
| 320 | + Debug(session_, |
| 321 | + "Stream %" PRIi64 " should be closed for writing", |
| 322 | + stream_data.id); |
| 323 | + // ndatalen = -1 means that no stream data was accepted into the |
| 324 | + // packet, which is what we want here. |
| 325 | + DCHECK_EQ(ndatalen, -1); |
| 326 | + DCHECK(stream_data.stream); |
| 327 | + stream_data.stream->EndWritable(); |
299 | 328 | continue;
|
300 | 329 | }
|
301 | 330 | case NGTCP2_ERR_WRITE_MORE: {
|
302 |
| - CHECK_GT(ndatalen, 0); |
303 |
| - if (!StreamCommit(&stream_data, ndatalen)) return session_->Close(); |
304 |
| - pos += ndatalen; |
| 331 | + // This return value indicates that we should call into WriteVStream |
| 332 | + // again to write more data into the same packet. |
| 333 | + Debug(session_, "Application should write more to packet"); |
| 334 | + DCHECK_GE(ndatalen, 0); |
| 335 | + if (!StreamCommit(&stream_data, ndatalen)) { |
| 336 | + packet->Done(UV_ECANCELED); |
| 337 | + return session_->Close(CloseMethod::SILENT); |
| 338 | + } |
305 | 339 | continue;
|
306 | 340 | }
|
307 | 341 | }
|
308 | 342 |
|
309 |
| - packet->Done(UV_ECANCELED); |
310 |
| - session_->last_error_ = QuicError::ForNgtcp2Error(nwrite); |
311 |
| - return session_->Close(Session::CloseMethod::SILENT); |
312 |
| - } |
313 |
| - |
314 |
| - pos += nwrite; |
315 |
| - if (ndatalen > 0 && !StreamCommit(&stream_data, ndatalen)) { |
316 |
| - // Since we are closing the session here, we don't worry about updating |
317 |
| - // the pkt tx time. The failed StreamCommit should have updated the |
318 |
| - // last_error_ appropriately. |
| 343 | + // Some other type of error happened. |
| 344 | + DCHECK_EQ(ndatalen, -1); |
| 345 | + Debug(session_, |
| 346 | + "Application encountered error while writing packet: %s", |
| 347 | + ngtcp2_strerror(nwrite)); |
| 348 | + session_->SetLastError(QuicError::ForNgtcp2Error(nwrite)); |
319 | 349 | packet->Done(UV_ECANCELED);
|
320 | 350 | return session_->Close(Session::CloseMethod::SILENT);
|
| 351 | + } else if (ndatalen >= 0) { |
| 352 | + // We wrote some data into the packet. We need to update the flow control |
| 353 | + // by committing the data. |
| 354 | + if (!StreamCommit(&stream_data, ndatalen)) { |
| 355 | + packet->Done(UV_ECANCELED); |
| 356 | + return session_->Close(CloseMethod::SILENT); |
| 357 | + } |
321 | 358 | }
|
322 | 359 |
|
323 |
| - if (stream_data.id >= 0 && ndatalen < 0) ResumeStream(stream_data.id); |
| 360 | + // When nwrite is zero, it means we are congestion limited. |
| 361 | + // We should stop trying to send additional packets. |
| 362 | + if (nwrite == 0) { |
| 363 | + Debug(session_, "Congestion limited."); |
| 364 | + // There might be a partial packet already prepared. If so, send it. |
| 365 | + size_t datalen = pos - begin; |
| 366 | + if (datalen) { |
| 367 | + Debug(session_, "Packet has %zu bytes to send", datalen); |
| 368 | + // At least some data had been written into the packet. We should send |
| 369 | + // it. |
| 370 | + packet->Truncate(datalen); |
| 371 | + session_->Send(packet, path); |
| 372 | + } else { |
| 373 | + packet->Done(UV_ECANCELED); |
| 374 | + } |
324 | 375 |
|
325 |
| - packet->Truncate(nwrite); |
326 |
| - session_->Send(std::move(packet), path); |
| 376 | + // If there was stream data selected, we should reschedule it to try |
| 377 | + // sending again. |
| 378 | + if (stream_data.id >= 0) ResumeStream(stream_data.id); |
327 | 379 |
|
328 |
| - pos = nullptr; |
| 380 | + return session_->UpdatePacketTxTime(); |
| 381 | + } |
329 | 382 |
|
330 |
| - if (++packetSendCount == maxPacketCount) { |
331 |
| - break; |
| 383 | + // At this point we have a packet prepared to send. |
| 384 | + pos += nwrite; |
| 385 | + size_t datalen = pos - begin; |
| 386 | + Debug(session_, "Sending packet with %zu bytes", datalen); |
| 387 | + packet->Truncate(datalen); |
| 388 | + session_->Send(packet, path); |
| 389 | + |
| 390 | + // If we have sent the maximum number of packets, we're done. |
| 391 | + if (++packet_send_count == max_packet_count) { |
| 392 | + return session_->UpdatePacketTxTime(); |
332 | 393 | }
|
333 |
| - } |
334 | 394 |
|
335 |
| - updateTimer(); |
| 395 | + // Prepare to loop back around to prepare a new packet. |
| 396 | + packet = nullptr; |
| 397 | + pos = begin = nullptr; |
| 398 | + } |
336 | 399 | }
|
337 | 400 |
|
338 | 401 | ssize_t Session::Application::WriteVStream(PathStorage* path,
|
339 |
| - uint8_t* buf, |
| 402 | + uint8_t* dest, |
340 | 403 | ssize_t* ndatalen,
|
| 404 | + size_t max_packet_size, |
341 | 405 | const StreamData& stream_data) {
|
342 |
| - CHECK_LE(stream_data.count, kMaxVectorCount); |
343 |
| - uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_NONE; |
344 |
| - if (stream_data.remaining > 0) flags |= NGTCP2_WRITE_STREAM_FLAG_MORE; |
| 406 | + DCHECK_LE(stream_data.count, kMaxVectorCount); |
| 407 | + uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE; |
345 | 408 | if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
|
346 |
| - ssize_t ret = ngtcp2_conn_writev_stream( |
347 |
| - *session_, |
348 |
| - &path->path, |
349 |
| - nullptr, |
350 |
| - buf, |
351 |
| - ngtcp2_conn_get_max_tx_udp_payload_size(*session_), |
352 |
| - ndatalen, |
353 |
| - flags, |
354 |
| - stream_data.id, |
355 |
| - stream_data.buf, |
356 |
| - stream_data.count, |
357 |
| - uv_hrtime()); |
358 |
| - return ret; |
| 409 | + ngtcp2_pkt_info pi; |
| 410 | + return ngtcp2_conn_writev_stream(*session_, |
| 411 | + &path->path, |
| 412 | + &pi, |
| 413 | + dest, |
| 414 | + max_packet_size, |
| 415 | + ndatalen, |
| 416 | + flags, |
| 417 | + stream_data.id, |
| 418 | + stream_data.buf, |
| 419 | + stream_data.count, |
| 420 | + uv_hrtime()); |
359 | 421 | }
|
360 | 422 |
|
361 | 423 | // The DefaultApplication is the default implementation of Session::Application
|
|
0 commit comments