Skip to content

Commit 39114a4

Browse files
authored
Move streaming reader out of Blob into a dedicated BufReader class (#197)
1 parent a0d8515 commit 39114a4

File tree

9 files changed

+296
-192
lines changed

9 files changed

+296
-192
lines changed

builtins/web/blob.cpp

Lines changed: 36 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -2,44 +2,19 @@
22
#include "file.h"
33
#include "builtin.h"
44
#include "encode.h"
5-
#include "extension-api.h"
65
#include "rust-encoding.h"
6+
#include "streams/buf-reader.h"
77
#include "streams/native-stream-source.h"
88

99
#include "js/UniquePtr.h"
1010
#include "js/ArrayBuffer.h"
1111
#include "js/Conversions.h"
1212
#include "js/experimental/TypedData.h"
13-
#include "js/HashTable.h"
14-
#include "js/Stream.h"
1513
#include "js/TypeDecls.h"
1614
#include "js/Value.h"
1715

1816
namespace {
1917

20-
static api::Engine *ENGINE;
21-
22-
JSObject *new_array_buffer_from_span(JSContext *cx, std::span<const uint8_t> span) {
23-
auto buf = mozilla::MakeUnique<uint8_t[]>(span.size());
24-
if (!buf) {
25-
JS_ReportOutOfMemory(cx);
26-
return nullptr;
27-
}
28-
29-
std::copy(span.begin(), span.end(), buf.get());
30-
31-
auto array_buffer = JS::NewArrayBufferWithContents(
32-
cx, span.size(), buf.get(), JS::NewArrayBufferOutOfMemory::CallerMustFreeMemory);
33-
if (!array_buffer) {
34-
JS_ReportOutOfMemory(cx);
35-
return nullptr;
36-
}
37-
38-
// `array_buffer` now owns `buf`
39-
std::ignore = (buf.release());
40-
return array_buffer;
41-
}
42-
4318
template <typename T> bool validate_type(T *chars, size_t strlen) {
4419
for (size_t i = 0; i < strlen; i++) {
4520
T c = chars[i];
@@ -153,6 +128,8 @@ namespace blob {
153128

154129
using js::Vector;
155130
using file::File;
131+
using streams::BufReader;
132+
using streams::NativeStreamSource;
156133

157134
#define DEFINE_BLOB_METHOD(name) \
158135
bool Blob::name(JSContext *cx, unsigned argc, JS::Value *vp) { \
@@ -190,94 +167,45 @@ const JSPropertySpec Blob::properties[] = {
190167
JS_PS_END,
191168
};
192169

193-
class StreamTask final : public api::AsyncTask {
194-
Heap<JSObject *> source_;
195-
196-
static constexpr size_t CHUNK_SIZE = 8192;
197-
198-
public:
199-
explicit StreamTask(const HandleObject source, api::Engine *engine) : source_(source) {
200-
handle_ = IMMEDIATE_TASK_HANDLE;
201-
}
202-
203-
[[nodiscard]] bool run(api::Engine *engine) override {
204-
JSContext *cx = engine->cx();
205-
RootedObject owner(cx, streams::NativeStreamSource::owner(source_));
206-
RootedObject stream(cx, streams::NativeStreamSource::stream(source_));
207-
RootedValue ret(cx);
208-
209-
auto readers = Blob::readers(owner);
210-
auto rdr = readers->lookup(source_);
211-
212-
if (!rdr) {
213-
return false;
214-
}
215-
216-
auto chunk = rdr->value().read(CHUNK_SIZE);
217-
auto chunk_size = chunk.size();
218-
219-
if (chunk.empty()) {
220-
if (!JS::ReadableStreamClose(cx, stream)) {
221-
return false;
222-
}
223-
224-
readers->remove(source_);
225-
return cancel(engine);
226-
}
227-
228-
RootedObject array_buffer(cx, new_array_buffer_from_span(cx, chunk));
229-
if (!array_buffer) {
230-
return false;
231-
}
232-
233-
RootedObject bytes_buffer(cx, JS_NewUint8ArrayWithBuffer(cx, array_buffer, 0, chunk_size));
234-
if (!bytes_buffer) {
235-
return false;
236-
}
237-
238-
RootedValue enqueue_val(cx);
239-
enqueue_val.setObject(*bytes_buffer);
240-
if (!JS::ReadableStreamEnqueue(cx, stream, enqueue_val)) {
241-
return false;
242-
}
170+
JSObject *Blob::data_to_owned_array_buffer(JSContext *cx, HandleObject self) {
171+
auto src = Blob::blob(self);
172+
auto size = src->length();
243173

244-
return cancel(engine);
174+
auto buf = mozilla::MakeUnique<uint8_t[]>(size);
175+
if (!buf) {
176+
JS_ReportOutOfMemory(cx);
177+
return nullptr;
245178
}
246179

247-
[[nodiscard]] bool cancel(api::Engine *engine) override {
248-
handle_ = INVALID_POLLABLE_HANDLE;
249-
return true;
180+
auto array_buffer = JS::NewArrayBufferWithContents(
181+
cx, size, src->begin(), JS::NewArrayBufferOutOfMemory::CallerMustFreeMemory);
182+
if (!array_buffer) {
183+
JS_ReportOutOfMemory(cx);
184+
return nullptr;
250185
}
251186

252-
void trace(JSTracer *trc) override { TraceEdge(trc, &source_, "Source for Blob StreamTask"); }
253-
};
254-
255-
JSObject *Blob::data_to_owned_array_buffer(JSContext *cx, HandleObject self) {
256-
size_t total_size = blob_size(self);
257-
size_t bytes_read = 0;
258-
259-
return Blob::data_to_owned_array_buffer(cx, self, 0, total_size, &bytes_read);
187+
// `array_buffer` now owns `buf`
188+
std::ignore = (buf.release());
189+
return array_buffer;
260190
}
261191

262-
JSObject *Blob::data_to_owned_array_buffer(JSContext *cx, HandleObject self, size_t offset,
263-
size_t size, size_t *bytes_read) {
264-
auto blob = Blob::blob(self);
265-
auto blob_size = blob->length();
266-
*bytes_read = 0;
192+
bool Blob::read_blob_slice(JSContext *cx, HandleObject self, std::span<uint8_t> buf,
193+
size_t start, size_t *read, bool *done) {
194+
auto src = Blob::blob(self);
267195

268-
MOZ_ASSERT(offset <= blob_size);
196+
if (start >= src->length()) {
197+
*read = 0;
198+
*done = true;
199+
return true;
200+
}
269201

270-
size_t available_bytes = blob_size - offset;
271-
size_t read_size = std::min(size, available_bytes);
202+
size_t available = src->length() - start;
203+
size_t to_read = std::min(buf.size(), available);
272204

273-
auto span = std::span<uint8_t>(blob->begin() + offset, read_size);
274-
auto array_buffer = new_array_buffer_from_span(cx, span);
275-
if (!array_buffer) {
276-
return nullptr;
277-
}
205+
std::copy_n(src->begin() + start, to_read, buf.data());
206+
*read = to_read;
278207

279-
*bytes_read = read_size;
280-
return array_buffer;
208+
return true;
281209
}
282210

283211
DEFINE_BLOB_METHOD(arrayBuffer)
@@ -382,28 +310,15 @@ bool Blob::slice(JSContext *cx, HandleObject self, const CallArgs &args, Mutable
382310
}
383311

384312
bool Blob::stream(JSContext *cx, HandleObject self, MutableHandleValue rval) {
385-
auto native_stream = streams::NativeStreamSource::create(cx, self, JS::UndefinedHandleValue,
386-
stream_pull, stream_cancel);
387-
388-
JS::RootedObject source(cx, native_stream);
389-
if (!source) {
313+
RootedObject reader(cx, BufReader::create(cx, self, read_blob_slice));
314+
if (!reader) {
390315
return false;
391316
}
392317

393-
auto readers = Blob::readers(self);
394-
auto blob = Blob::blob(self);
395-
auto span = std::span<uint8_t>(blob->begin(), blob->length());
318+
RootedObject native_stream(cx, BufReader::stream(reader));
319+
RootedObject default_stream(cx, NativeStreamSource::stream(native_stream));
396320

397-
if (!readers->put(source, BlobReader(span))) {
398-
return false;
399-
}
400-
401-
JS::RootedObject stream(cx, streams::NativeStreamSource::stream(native_stream));
402-
if (!stream) {
403-
return false;
404-
}
405-
406-
rval.setObject(*stream);
321+
rval.setObject(*default_stream);
407322
return true;
408323
}
409324

@@ -476,21 +391,6 @@ bool Blob::type_get(JSContext *cx, unsigned argc, JS::Value *vp) {
476391
return true;
477392
}
478393

479-
bool Blob::stream_cancel(JSContext *cx, JS::CallArgs args, JS::HandleObject stream,
480-
JS::HandleObject owner, JS::HandleValue reason) {
481-
args.rval().setUndefined();
482-
return true;
483-
}
484-
485-
bool Blob::stream_pull(JSContext *cx, JS::CallArgs args, JS::HandleObject source,
486-
JS::HandleObject owner, JS::HandleObject controller) {
487-
488-
ENGINE->queue_async_task(new StreamTask(source, ENGINE));
489-
490-
args.rval().setUndefined();
491-
return true;
492-
}
493-
494394
Blob::ByteBuffer *Blob::blob(JSObject *self) {
495395
MOZ_ASSERT(is_instance(self));
496396
auto blob = static_cast<ByteBuffer *>(
@@ -509,15 +409,6 @@ JSString *Blob::type(JSObject *self) {
509409
return JS::GetReservedSlot(self, static_cast<size_t>(Blob::Slots::Type)).toString();
510410
}
511411

512-
Blob::ReadersMap *Blob::readers(JSObject *self) {
513-
MOZ_ASSERT(is_instance(self));
514-
auto readers = static_cast<ReadersMap *>(
515-
JS::GetReservedSlot(self, static_cast<size_t>(Blob::Slots::Readers)).toPrivate());
516-
517-
MOZ_ASSERT(readers);
518-
return readers;
519-
}
520-
521412
Blob::LineEndings Blob::line_endings(JSObject *self) {
522413
MOZ_ASSERT(is_instance(self));
523414
return static_cast<LineEndings>(
@@ -687,15 +578,13 @@ JSObject *Blob::create(JSContext *cx, UniqueChars data, size_t data_len, HandleS
687578
SetReservedSlot(self, static_cast<uint32_t>(Slots::Data), JS::PrivateValue(blob));
688579
SetReservedSlot(self, static_cast<uint32_t>(Slots::Type), JS::StringValue(type));
689580
SetReservedSlot(self, static_cast<uint32_t>(Slots::Endings), JS::Int32Value(LineEndings::Transparent));
690-
SetReservedSlot(self, static_cast<uint32_t>(Slots::Readers), JS::PrivateValue(new ReadersMap));
691581
return self;
692582
}
693583

694584
bool Blob::init(JSContext *cx, HandleObject self, HandleValue blobParts, HandleValue opts) {
695585
SetReservedSlot(self, static_cast<uint32_t>(Slots::Type), JS_GetEmptyStringValue(cx));
696586
SetReservedSlot(self, static_cast<uint32_t>(Slots::Endings), JS::Int32Value(LineEndings::Transparent));
697587
SetReservedSlot(self, static_cast<uint32_t>(Slots::Data), JS::PrivateValue(new ByteBuffer));
698-
SetReservedSlot(self, static_cast<uint32_t>(Slots::Readers), JS::PrivateValue(new ReadersMap));
699588

700589
// Walk the blob parts and append them to the blob's buffer.
701590
if (blobParts.isNull()) {
@@ -742,21 +631,9 @@ void Blob::finalize(JS::GCContext *gcx, JSObject *self) {
742631
if (blob) {
743632
free(blob);
744633
}
745-
746-
auto readers = Blob::readers(self);
747-
if (readers) {
748-
free(readers);
749-
}
750-
}
751-
752-
void Blob::trace(JSTracer *trc, JSObject *self) {
753-
MOZ_ASSERT(is_instance(self));
754-
auto readers = Blob::readers(self);
755-
readers->trace(trc);
756634
}
757635

758636
bool install(api::Engine *engine) {
759-
ENGINE = engine;
760637
return Blob::init_class(engine->cx(), engine->global());
761638
}
762639

builtins/web/blob.h

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,14 @@
44
#include "builtin.h"
55
#include "extension-api.h"
66
#include "js/AllocPolicy.h"
7-
#include "js/GCHashTable.h"
87
#include "js/TypeDecls.h"
98
#include "js/Vector.h"
109

1110
namespace builtins {
1211
namespace web {
1312
namespace blob {
1413

15-
class BlobReader {
16-
std::span<const uint8_t> data_;
17-
std::size_t position_;
18-
19-
public:
20-
explicit BlobReader(std::span<const uint8_t> data) : data_(data), position_(0) {}
21-
22-
std::size_t remaining() const { return data_.size() - position_; }
23-
24-
std::span<const uint8_t> read(std::size_t size) {
25-
size = std::min(size, remaining());
26-
auto result = data_.subspan(position_, size);
27-
28-
position_ += size;
29-
return result;
30-
}
31-
32-
void trace(JSTracer *trc) {}
33-
};
34-
35-
class Blob : public TraceableBuiltinImpl<Blob> {
14+
class Blob : public FinalizableBuiltinImpl<Blob> {
3615
static bool arrayBuffer(JSContext *cx, unsigned argc, JS::Value *vp);
3716
static bool bytes(JSContext *cx, unsigned argc, JS::Value *vp);
3817
static bool slice(JSContext *cx, unsigned argc, JS::Value *vp);
@@ -54,17 +33,14 @@ class Blob : public TraceableBuiltinImpl<Blob> {
5433
enum Slots { Data, Type, Endings, Readers, Count };
5534
enum LineEndings { Transparent, Native };
5635

57-
using HeapObj = Heap<JSObject *>;
5836
using ByteBuffer = js::Vector<uint8_t, 0, js::SystemAllocPolicy>;
59-
using ReadersMap = JS::GCHashMap<HeapObj, BlobReader, js::StableCellHasher<HeapObj>, js::SystemAllocPolicy>;
6037

6138
static bool arrayBuffer(JSContext *cx, HandleObject self, MutableHandleValue rval);
6239
static bool bytes(JSContext *cx, HandleObject self, MutableHandleValue rval);
6340
static bool stream(JSContext *cx, HandleObject self, MutableHandleValue rval);
6441
static bool text(JSContext *cx, HandleObject self, MutableHandleValue rval);
6542
static bool slice(JSContext *cx, HandleObject self, const CallArgs &args, MutableHandleValue rval);
6643

67-
static ReadersMap *readers(JSObject *self);
6844
static ByteBuffer *blob(JSObject *self);
6945
static size_t blob_size(JSObject *self);
7046
static JSString *type(JSObject *self);
@@ -77,21 +53,15 @@ class Blob : public TraceableBuiltinImpl<Blob> {
7753
static bool init_options(JSContext *cx, HandleObject self, HandleValue opts);
7854
static bool init(JSContext *cx, HandleObject self, HandleValue blobParts, HandleValue opts);
7955

80-
static bool stream_cancel(JSContext *cx, JS::CallArgs args, JS::HandleObject stream,
81-
JS::HandleObject owner, JS::HandleValue reason);
82-
static bool stream_pull(JSContext *cx, JS::CallArgs args, JS::HandleObject source,
83-
JS::HandleObject body_owner, JS::HandleObject controller);
84-
8556
static JSObject *data_to_owned_array_buffer(JSContext *cx, HandleObject self);
86-
static JSObject *data_to_owned_array_buffer(JSContext *cx, HandleObject self, size_t offset,
87-
size_t size, size_t *bytes_read);
57+
static bool read_blob_slice(JSContext *cx, HandleObject self, std::span<uint8_t>,
58+
size_t start, size_t *read, bool *done);
8859

8960
static JSObject *create(JSContext *cx, UniqueChars data, size_t data_len, HandleString type);
9061

9162
static bool init_class(JSContext *cx, HandleObject global);
9263
static bool constructor(JSContext *cx, unsigned argc, Value *vp);
9364
static void finalize(JS::GCContext *gcx, JSObject *self);
94-
static void trace(JSTracer *trc, JSObject *self);
9565
};
9666

9767
bool install(api::Engine *engine);

0 commit comments

Comments
 (0)