Skip to content

Commit 57186e5

Browse files
committed
quic: fix a handful of bugs and missing functionality
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode/Opus 4.6 PR-URL: #62387 Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
1 parent e9b5214 commit 57186e5

31 files changed

+597
-282
lines changed

lib/internal/blob.js

Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -439,63 +439,76 @@ function createBlobReaderStream(reader) {
439439
// There really should only be one read at a time so using an
440440
// array here is purely defensive.
441441
this.pendingPulls = [];
442+
// Register a wakeup callback that the C++ side can invoke
443+
// when new data is available after a STATUS_BLOCK.
444+
reader.setWakeup(() => {
445+
if (this.pendingPulls.length > 0) {
446+
this.readNext(c);
447+
}
448+
});
442449
},
443450
pull(c) {
444451
const { promise, resolve, reject } = PromiseWithResolvers();
445452
this.pendingPulls.push({ resolve, reject });
446-
const readNext = () => {
447-
reader.pull((status, buffer) => {
448-
// If pendingPulls is empty here, the stream had to have
449-
// been canceled, and we don't really care about the result.
450-
// We can simply exit.
451-
if (this.pendingPulls.length === 0) {
452-
return;
453-
}
454-
if (status === 0) {
455-
// EOS
456-
c.close();
457-
// This is to signal the end for byob readers
458-
// see https://streams.spec.whatwg.org/#example-rbs-pull
459-
c.byobRequest?.respond(0);
460-
const pending = this.pendingPulls.shift();
461-
pending.resolve();
462-
return;
463-
} else if (status < 0) {
464-
// The read could fail for many different reasons when reading
465-
// from a non-memory resident blob part (e.g. file-backed blob).
466-
// The error details the system error code.
467-
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
468-
const pending = this.pendingPulls.shift();
469-
c.error(error);
470-
pending.reject(error);
453+
this.readNext(c);
454+
return promise;
455+
},
456+
readNext(c) {
457+
reader.pull((status, buffer) => {
458+
// If pendingPulls is empty here, the stream had to have
459+
// been canceled, and we don't really care about the result.
460+
// We can simply exit.
461+
if (this.pendingPulls.length === 0) {
462+
return;
463+
}
464+
if (status === 0) {
465+
// EOS
466+
c.close();
467+
// This is to signal the end for byob readers
468+
// see https://streams.spec.whatwg.org/#example-rbs-pull
469+
c.byobRequest?.respond(0);
470+
const pending = this.pendingPulls.shift();
471+
pending.resolve();
472+
return;
473+
} else if (status < 0) {
474+
// The read could fail for many different reasons when reading
475+
// from a non-memory resident blob part (e.g. file-backed blob).
476+
// The error details the system error code.
477+
const error =
478+
lazyDOMException('The blob could not be read',
479+
'NotReadableError');
480+
const pending = this.pendingPulls.shift();
481+
c.error(error);
482+
pending.reject(error);
483+
return;
484+
} else if (status === 2) {
485+
// STATUS_BLOCK: No data available yet. The wakeup callback
486+
// registered in start() will re-invoke readNext when data
487+
// arrives.
488+
return;
489+
}
490+
// ReadableByteStreamController.enqueue errors if we submit a
491+
// 0-length buffer. We need to check for that here.
492+
if (buffer !== undefined && buffer.byteLength !== 0) {
493+
c.enqueue(new Uint8Array(buffer));
494+
}
495+
// We keep reading until we either reach EOS, some error, or
496+
// we hit the flow rate of the stream (c.desiredSize).
497+
// We use setImmediate here because we have to allow the event
498+
// loop to turn in order to process any pending i/o. Using
499+
// queueMicrotask won't allow the event loop to turn.
500+
setImmediate(() => {
501+
if (c.desiredSize < 0) {
502+
// A manual backpressure check.
503+
if (this.pendingPulls.length !== 0) {
504+
const pending = this.pendingPulls.shift();
505+
pending.resolve();
506+
}
471507
return;
472508
}
473-
// ReadableByteStreamController.enqueue errors if we submit a 0-length
474-
// buffer. We need to check for that here.
475-
if (buffer !== undefined && buffer.byteLength !== 0) {
476-
c.enqueue(new Uint8Array(buffer));
477-
}
478-
// We keep reading until we either reach EOS, some error, or we
479-
// hit the flow rate of the stream (c.desiredSize).
480-
// We use set immediate here because we have to allow the event
481-
// loop to turn in order to process any pending i/o. Using
482-
// queueMicrotask won't allow the event loop to turn.
483-
setImmediate(() => {
484-
if (c.desiredSize < 0) {
485-
// A manual backpressure check.
486-
if (this.pendingPulls.length !== 0) {
487-
// A case of waiting pull finished (= not yet canceled)
488-
const pending = this.pendingPulls.shift();
489-
pending.resolve();
490-
}
491-
return;
492-
}
493-
readNext();
494-
});
509+
this.readNext(c);
495510
});
496-
};
497-
readNext();
498-
return promise;
511+
});
499512
},
500513
cancel(reason) {
501514
// Reject any currently pending pulls here.

lib/internal/quic/quic.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,18 @@ setCallbacks({
477477
this[kOwner][kSessionTicket](ticket);
478478
},
479479

480+
/**
481+
* Called when the client receives a NEW_TOKEN frame from the server.
482+
* The token can be used for future connections to the same server
483+
* address to skip address validation.
484+
* @param {Buffer} token The opaque token data
485+
* @param {SocketAddress} address The remote server address
486+
*/
487+
onSessionNewToken(token, address) {
488+
debug('session new token callback', this[kOwner]);
489+
// TODO(@jasnell): Emit to JS for storage and future reconnection use
490+
},
491+
480492
/**
481493
* Called when the session receives a session version negotiation request
482494
* @param {*} version

lib/internal/quic/state.js

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ const {
7777
IDX_STATE_STREAM_FIN_RECEIVED,
7878
IDX_STATE_STREAM_READ_ENDED,
7979
IDX_STATE_STREAM_WRITE_ENDED,
80-
IDX_STATE_STREAM_PAUSED,
8180
IDX_STATE_STREAM_RESET,
8281
IDX_STATE_STREAM_HAS_OUTBOUND,
8382
IDX_STATE_STREAM_HAS_READER,
@@ -113,7 +112,6 @@ assert(IDX_STATE_STREAM_FIN_SENT !== undefined);
113112
assert(IDX_STATE_STREAM_FIN_RECEIVED !== undefined);
114113
assert(IDX_STATE_STREAM_READ_ENDED !== undefined);
115114
assert(IDX_STATE_STREAM_WRITE_ENDED !== undefined);
116-
assert(IDX_STATE_STREAM_PAUSED !== undefined);
117115
assert(IDX_STATE_STREAM_RESET !== undefined);
118116
assert(IDX_STATE_STREAM_HAS_OUTBOUND !== undefined);
119117
assert(IDX_STATE_STREAM_HAS_READER !== undefined);
@@ -475,11 +473,6 @@ class QuicStreamState {
475473
return !!DataViewPrototypeGetUint8(this.#handle, IDX_STATE_STREAM_WRITE_ENDED);
476474
}
477475

478-
/** @type {boolean} */
479-
get paused() {
480-
if (this.#handle.byteLength === 0) return undefined;
481-
return !!DataViewPrototypeGetUint8(this.#handle, IDX_STATE_STREAM_PAUSED);
482-
}
483476

484477
/** @type {boolean} */
485478
get reset() {
@@ -561,7 +554,6 @@ class QuicStreamState {
561554
finReceived: this.finReceived,
562555
readEnded: this.readEnded,
563556
writeEnded: this.writeEnded,
564-
paused: this.paused,
565557
reset: this.reset,
566558
hasOutbound: this.hasOutbound,
567559
hasReader: this.hasReader,
@@ -590,7 +582,6 @@ class QuicStreamState {
590582
finReceived: this.finReceived,
591583
readEnded: this.readEnded,
592584
writeEnded: this.writeEnded,
593-
paused: this.paused,
594585
reset: this.reset,
595586
hasOutbound: this.hasOutbound,
596587
hasReader: this.hasReader,

src/node_blob.cc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ Local<FunctionTemplate> Blob::Reader::GetConstructorTemplate(Environment* env) {
320320
Blob::Reader::kInternalFieldCount);
321321
tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader"));
322322
SetProtoMethod(env->isolate(), tmpl, "pull", Pull);
323+
SetProtoMethod(env->isolate(), tmpl, "setWakeup", SetWakeup);
323324
env->set_blob_reader_constructor_template(tmpl);
324325
}
325326
return tmpl;
@@ -410,6 +411,21 @@ void Blob::Reader::Pull(const FunctionCallbackInfo<Value>& args) {
410411
std::move(next), node::bob::OPTIONS_END, nullptr, 0));
411412
}
412413

414+
void Blob::Reader::SetWakeup(
415+
const FunctionCallbackInfo<Value>& args) {
416+
Blob::Reader* reader;
417+
ASSIGN_OR_RETURN_UNWRAP(&reader, args.This());
418+
CHECK(args[0]->IsFunction());
419+
reader->wakeup_.Reset(args.GetIsolate(), args[0].As<Function>());
420+
}
421+
422+
void Blob::Reader::NotifyPull() {
423+
if (wakeup_.IsEmpty() || !env()->can_call_into_js()) return;
424+
HandleScope handle_scope(env()->isolate());
425+
Local<Function> fn = wakeup_.Get(env()->isolate());
426+
MakeCallback(fn, 0, nullptr);
427+
}
428+
413429
BaseObjectPtr<BaseObject>
414430
Blob::BlobTransferData::Deserialize(
415431
Environment* env,
@@ -590,6 +606,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
590606
registry->Register(Blob::GetDataObject);
591607
registry->Register(Blob::RevokeObjectURL);
592608
registry->Register(Blob::Reader::Pull);
609+
registry->Register(Blob::Reader::SetWakeup);
593610
registry->Register(Concat);
594611
registry->Register(BlobFromFilePath);
595612
}

src/node_blob.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ class Blob : public BaseObject {
8282
static BaseObjectPtr<Reader> Create(Environment* env,
8383
BaseObjectPtr<Blob> blob);
8484
static void Pull(const v8::FunctionCallbackInfo<v8::Value>& args);
85+
static void SetWakeup(const v8::FunctionCallbackInfo<v8::Value>& args);
86+
void NotifyPull();
8587

8688
explicit Reader(Environment* env,
8789
v8::Local<v8::Object> obj,
@@ -95,6 +97,7 @@ class Blob : public BaseObject {
9597
std::shared_ptr<DataQueue::Reader> inner_;
9698
BaseObjectPtr<Blob> strong_ptr_;
9799
bool eos_ = false;
100+
v8::Global<v8::Function> wakeup_;
98101
};
99102

100103
BaseObject::TransferMode GetTransferMode() const override;

src/quic/application.cc

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ ssize_t Session::Application::WriteVStream(PathStorage* path,
452452
if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
453453
return ngtcp2_conn_writev_stream(*session_,
454454
&path->path,
455+
// TODO(@jasnell): ECN blocked on libuv
455456
nullptr,
456457
dest,
457458
max_packet_size,
@@ -511,10 +512,13 @@ class DefaultApplication final : public Session::Application {
511512
stream_data->count = 0;
512513
stream_data->fin = 0;
513514
stream_data->stream.reset();
514-
stream_data->remaining = 0;
515515
Debug(&session(), "Default application getting stream data");
516516
DCHECK_NOT_NULL(stream_data);
517517
// If the queue is empty, there aren't any streams with data yet
518+
519+
// If the connection-level flow control window is exhausted,
520+
// there is no point in pulling stream data.
521+
if (!session().max_data_left()) return 0;
518522
if (stream_queue_.IsEmpty()) return 0;
519523

520524
const auto get_length = [](auto vec, size_t count) {
@@ -554,9 +558,7 @@ class DefaultApplication final : public Session::Application {
554558

555559
if (count > 0) {
556560
stream->Schedule(&stream_queue_);
557-
stream_data->remaining = get_length(data, count);
558561
} else {
559-
stream_data->remaining = 0;
560562
}
561563

562564
// Not calling done here because we defer committing
@@ -581,15 +583,6 @@ class DefaultApplication final : public Session::Application {
581583

582584
void ResumeStream(int64_t id) override { ScheduleStream(id); }
583585

584-
bool ShouldSetFin(const StreamData& stream_data) override {
585-
auto const is_empty = [](const ngtcp2_vec* vec, size_t cnt) {
586-
size_t i = 0;
587-
for (size_t n = 0; n < cnt; n++) i += vec[n].len;
588-
return i > 0;
589-
};
590-
591-
return stream_data.stream && is_empty(stream_data, stream_data.count);
592-
}
593586

594587
void BlockStream(int64_t id) override {
595588
if (auto stream = session().FindStream(id)) [[likely]] {
@@ -598,10 +591,9 @@ class DefaultApplication final : public Session::Application {
598591
}
599592

600593
bool StreamCommit(StreamData* stream_data, size_t datalen) override {
601-
if (datalen == 0) return true;
602594
DCHECK_NOT_NULL(stream_data);
603595
CHECK(stream_data->stream);
604-
stream_data->stream->Commit(datalen);
596+
stream_data->stream->Commit(datalen, stream_data->fin);
605597
return true;
606598
}
607599

@@ -616,11 +608,6 @@ class DefaultApplication final : public Session::Application {
616608
}
617609
}
618610

619-
void UnscheduleStream(int64_t id) {
620-
if (auto stream = session().FindStream(id)) [[likely]] {
621-
stream->Unschedule();
622-
}
623-
}
624611

625612
Stream::Queue stream_queue_;
626613
};

src/quic/application.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ class Session::Application : public MemoryRetainer {
120120

121121
virtual int GetStreamData(StreamData* data) = 0;
122122
virtual bool StreamCommit(StreamData* data, size_t datalen) = 0;
123-
virtual bool ShouldSetFin(const StreamData& data) = 0;
124123

125124
inline Environment* env() const { return session().env(); }
126125
inline Session& session() {
@@ -148,14 +147,18 @@ class Session::Application : public MemoryRetainer {
148147
struct Session::Application::StreamData final {
149148
// The actual number of vectors in the struct, up to kMaxVectorCount.
150149
size_t count = 0;
151-
size_t remaining = 0;
152150
// The stream identifier. If this is a negative value then no stream is
153151
// identified.
154152
int64_t id = -1;
155153
int fin = 0;
156154
ngtcp2_vec data[kMaxVectorCount]{};
157155
BaseObjectPtr<Stream> stream;
158156

157+
static_assert(sizeof(ngtcp2_vec) == sizeof(nghttp3_vec) &&
158+
alignof(ngtcp2_vec) == alignof(nghttp3_vec) &&
159+
offsetof(ngtcp2_vec, base) == offsetof(nghttp3_vec, base) &&
160+
offsetof(ngtcp2_vec, len) == offsetof(nghttp3_vec, len),
161+
"ngtcp2_vec and nghttp3_vec must have identical layout");
159162
inline operator nghttp3_vec*() {
160163
return reinterpret_cast<nghttp3_vec*>(data);
161164
}

src/quic/bindingdata.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class Packet;
4343
V(session_datagram_status, SessionDatagramStatus) \
4444
V(session_handshake, SessionHandshake) \
4545
V(session_new, SessionNew) \
46+
V(session_new_token, SessionNewToken) \
4647
V(session_path_validation, SessionPathValidation) \
4748
V(session_ticket, SessionTicket) \
4849
V(session_version_negotiation, SessionVersionNegotiation) \
@@ -70,6 +71,7 @@ class Packet;
7071
V(cubic, "cubic") \
7172
V(disable_stateless_reset, "disableStatelessReset") \
7273
V(enable_connect_protocol, "enableConnectProtocol") \
74+
V(enable_early_data, "enableEarlyData") \
7375
V(enable_datagrams, "enableDatagrams") \
7476
V(enable_tls_trace, "tlsTrace") \
7577
V(endpoint, "Endpoint") \
@@ -121,6 +123,7 @@ class Packet;
121123
V(stream, "Stream") \
122124
V(success, "success") \
123125
V(tls_options, "tls") \
126+
V(token, "token") \
124127
V(token_expiration, "tokenExpiration") \
125128
V(token_secret, "tokenSecret") \
126129
V(transport_params, "transportParams") \

src/quic/cid.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,14 @@ const CID CID::kInvalid{};
8585
// CID::Hash
8686

8787
size_t CID::Hash::operator()(const CID& cid) const {
88+
// Uses the Boost hash_combine strategy: XOR each byte with the golden
89+
// ratio constant 0x9e3779b9 (derived from the fractional part of the
90+
// golden ratio, (sqrt(5)-1)/2 * 2^32) plus bit-shifted accumulator
91+
// state. This provides good avalanche properties for short byte
92+
// sequences like connection IDs (1-20 bytes).
8893
size_t hash = 0;
8994
for (size_t n = 0; n < cid.length(); n++) {
90-
hash ^= std::hash<uint8_t>{}(cid.ptr_->data[n] + 0x9e3779b9 + (hash << 6) +
91-
(hash >> 2));
95+
hash ^= cid.ptr_->data[n] + 0x9e3779b9 + (hash << 6) + (hash >> 2);
9296
}
9397
return hash;
9498
}

src/quic/data.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ using v8::Undefined;
2828
using v8::Value;
2929

3030
namespace quic {
31-
int DebugIndentScope::indent_ = 0;
31+
thread_local int DebugIndentScope::indent_ = 0;
3232

3333
Path::Path(const SocketAddress& local, const SocketAddress& remote) {
3434
ngtcp2_addr_init(&this->local, local.data(), local.length());

0 commit comments

Comments
 (0)