Skip to content

Commit e7d57af

Browse files
committed
Update gzip-stream
1 parent 13b8d63 commit e7d57af

3 files changed

Lines changed: 55 additions & 208 deletions

File tree

examples/gzip-stream/README.md

Lines changed: 15 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -25,82 +25,43 @@ The `flush` method allows any internally buffered data to be processed before co
2525

2626
```js
2727
function compress() {
28-
const compressor = compressNew();
28+
const stream = new CompressStream();
2929

3030
return new Transform({
3131
transform(chunk, encoding, callback) {
32-
compressChunk(compressor, encoding, chunk)
32+
stream
33+
.compress(encoding, chunk)
3334
.then(data => callback(null, data))
3435
.catch(callback);
3536
},
3637

3738
flush(callback) {
38-
compressFinish(compressor)
39+
stream
40+
.finish()
3941
.then(data => callback(null, data))
4042
.catch(callback);
4143
}
4244
});
4345
}
4446
```
4547

46-
The glue code exports a single function `compress` that creates a `Transform` stream delegating the implementation to Neon functions. Since these functions return promises, they are adapted to the `callback` style continuation that `Transform` expects.
48+
The glue code exports a single function `compress` responsible for creating a `Transform` stream, delegating to the `CompressStream` class and adapting [`Promise`s](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise) to callbacks.
4749

4850
## Neon
4951

50-
The Neon module exports three functions:
52+
The Neon module exports a `CompressSteam` class with two methods:
5153

52-
* [`compressNew`](#compressnew)
53-
* [`compressChunk`](#compresschunkcompressstream-chunk-encoding-callback)
54-
* [`compressFinish`](#compressfinishcompressstream-callback)
54+
* [`compress`](#compresschunk-encoding)
55+
* [`finish`](#finish)
5556

56-
### `compressNew()`
57+
### `new CompressStream(level)`
5758

58-
```rust
59-
fn compress_new(mut cx: FunctionContext) -> JsResult<JsBox<CompressStream>> {
60-
let stream = CompressStream::new(Compression::best());
59+
Creates a new instance of the `CompressStream` class with an optional gzip level.
6160

62-
Ok(cx.boxed(stream))
63-
}
64-
```
65-
66-
`compressNew` creates an instance of the stateful Rust struct, `CompressStream`, and returns it wrapped in a [`JsBox`](https://docs.rs/neon/latest/neon/types/struct.JsBox.html). Each of the other two methods expects `CompressStream` as the first argument. This pattern is similar to using [`Function.prototype.call`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Function/call) on a class method to manually bind `this`.
67-
68-
### `compressChunk(compressStream, chunk, encoding, callback)`
69-
70-
```rust
71-
fn compress_chunk(mut cx: FunctionContext) -> JsResult<JsPromise> {
72-
let stream = (&**cx.argument::<JsBox<CompressStream>>(0)?).clone();
73-
let chunk = cx.argument::<JsTypedArray<u8>>(2)?
74-
.as_slice(&cx)
75-
.to_vec();
76-
77-
let promise = cx
78-
.task(move || stream.write(chunk))
79-
.promise(CompressStream::and_buffer);
80-
81-
Ok(promise)
82-
}
83-
```
84-
85-
`compressChunk` accepts the instance of the `CompressStream` struct and the other arguments to the [`transform`](#transformchunk-encoding-callback) function. The chunk is cloned to a `Vec<u8>` and passed to a task to execute on the Node worker pool. The asynchronous task compresses the data and passes the compressed data to the `.promise(|cx, result| { ... })` callback. The callback to `promise` is executed on the JavaScript main thread and converts the compressed `Vec<u8>` to a `JsBuffer` and resolves the promise.
86-
87-
`CompressChunk::and_buffer` is used to create a `Buffer`. `ArrayBuffer` cannot be used because stream chunks are required to be an instance of `Uint8Array`. `Buffer` is a subclass of `Uint8Array`.
88-
89-
### `compressFinish(compressStream, callback)`
61+
### `compress(chunk, encoding)`
9062

91-
fn compress_finish(mut cx: FunctionContext) -> JsResult<JsPromise> {
92-
let stream = (&**cx.argument::<JsBox<CompressStream>>(0)?).clone();
63+
Compresses a chunk, returning flushed data as an `ArrayBuffer`.
9364

94-
```rust
95-
fn compress_finish(mut cx: FunctionContext) -> JsResult<JsPromise> {
96-
let stream = (&**cx.argument::<JsBox<CompressStream>>(0)?).clone();
97-
98-
let promise = cx
99-
.task(move || stream.finish())
100-
.promise(CompressStream::and_buffer);
101-
102-
Ok(promise)
103-
}
104-
```
65+
### `finish()`
10566

106-
`compressFinish` works very similar to [`compressChunkl`](#compresschunkcompressstream-chunk-encoding-callback), except it is provided the arguments to [`flush`](#flushcallback) which does not include any data. Instead, the remaining buffered data is compressed, a CRC is calculated, and the compressed gzip data is completed.
67+
`finish` works very similar to [`compress`](#compresschunk-encoding), except it is provided the arguments to [`flush`](#flushcallback) which does not include any data. Instead, the remaining buffered data is compressed, a CRC is calculated, and the compressed gzip data is completed.

examples/gzip-stream/index.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,26 @@
44
// source and produces new bytes for a destination sink.
55
const { Transform } = require("stream");
66

7-
const { compressNew, compressChunk, compressFinish } = require("./index.node");
7+
const { CompressStream } = require("./index.node");
88

99
// Creates a gzip compression transform stream, implemented asynchronously in Rust
1010
function compress() {
1111
// Create a native streaming gzip compressing with Neon
12-
const compressor = compressNew();
12+
const stream = new CompressStream();
1313

1414
return new Transform({
1515
// Compress a chunk of data by delegating to `compressChunk`
1616
transform(chunk, encoding, callback) {
17-
compressChunk(compressor, encoding, chunk)
17+
stream
18+
.compress(encoding, chunk)
1819
.then(data => callback(null, data))
1920
.catch(callback);
2021
},
2122

2223
// Complete the compression by delegating to `compressFinish`
2324
flush(callback) {
24-
compressFinish(compressor)
25+
stream
26+
.finish()
2527
.then(data => callback(null, data))
2628
.catch(callback);
2729
}

examples/gzip-stream/src/lib.rs

Lines changed: 34 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
use std::error::Error;
2-
use std::fmt;
3-
use std::io::{self, Write};
4-
use std::sync::{Arc, Mutex, MutexGuard, TryLockError};
1+
use std::{
2+
io::Write,
3+
sync::{Arc, Mutex, MutexGuard},
4+
};
55

66
use flate2::{Compression, write::GzEncoder};
7-
use neon::prelude::*;
8-
use neon::types::{JsUint8Array, buffer::TypedArray};
9-
use std::fmt::Debug;
7+
use neon::types::extract::Error;
108

119
#[derive(Clone)]
1210
// Holds state for a gzip compression stream
@@ -20,163 +18,49 @@ struct CompressStream {
2018
encoder: Arc<Mutex<GzEncoder<Vec<u8>>>>,
2119
}
2220

21+
#[neon::export(class)]
2322
impl CompressStream {
24-
// Create a new instance of a `CompressStream`
25-
fn new(level: Compression) -> Self {
23+
fn new(level: Option<u32>) -> Self {
24+
let level = Compression::new(level.unwrap_or(9));
2625
let encoder = GzEncoder::new(Vec::new(), level);
2726

2827
Self {
2928
encoder: Arc::new(Mutex::new(encoder)),
3029
}
3130
}
3231

33-
// Attempt to obtain a mutable reference to the stream
34-
fn lock(&self) -> Result<MutexGuard<'_, GzEncoder<Vec<u8>>>, CompressError> {
35-
// Use `try_lock` instead of `lock` because multiple concurrent calls
36-
// to the encoder is undefined. The caller *must* be careful to serially
37-
// write to the stream. `Transform` provides this guarantee by applying
38-
// backpressure and buffering. The `Mutex` should always be unlocked.
39-
Ok(self.encoder.try_lock()?)
40-
}
41-
42-
// Write a chunk of data to the encoder
43-
fn write(self, data: Vec<u8>) -> Result<Self, CompressError> {
44-
self.lock()?.write_all(&data)?;
45-
Ok(self)
46-
}
47-
48-
// Finish compressing. Multiple calls to this function will error or panic.
49-
fn finish(self) -> Result<Self, CompressError> {
50-
self.lock()?.try_finish()?;
51-
Ok(self)
52-
}
53-
54-
// After each call to `write` or `finish`, data may be written to the internal
55-
// buffer. This function copies the written data out and resets the buffer
56-
// to empty.
57-
fn and_buffer(
58-
mut cx: TaskContext,
59-
// Return value from `cx.task(..)` closure
60-
result: Result<Self, CompressError>,
61-
) -> JsResult<JsUint8Array> {
62-
let stream = result.or_else(|err| cx.throw_error(err))?;
63-
let mut guard = stream.lock().or_else(|err| cx.throw_error(err))?;
64-
65-
let data = guard.get_mut();
66-
let output = JsUint8Array::from_slice(&mut cx, data)?;
67-
68-
data.truncate(0);
69-
70-
Ok(output)
71-
}
72-
}
73-
74-
// Types placed in a `JsBox`, an opaque pointer for passing Rust data from Rust to
75-
// JavaScript and back, must implement the `Finalize` trait.
76-
//
77-
// The `Finalize` trait optionally provides a hook for executing code when the value
78-
// is garbage collected.
79-
impl Finalize for CompressStream {}
80-
81-
#[derive(Debug)]
82-
// All errors will be converted to JavaScript exceptions with the `Display`
83-
// implementation as the `Error` message.
84-
struct CompressError(String);
85-
86-
impl From<io::Error> for CompressError {
87-
fn from(err: io::Error) -> Self {
88-
Self(err.to_string())
89-
}
90-
}
91-
92-
impl<T> From<TryLockError<T>> for CompressError {
93-
fn from(err: TryLockError<T>) -> Self {
94-
Self(err.to_string())
32+
#[neon(task)]
33+
fn compress(self, _encoding: (), chunk: Vec<u8>) -> Result<Vec<u8>, Error> {
34+
let mut guard = self.lock()?;
35+
guard.write_all(&chunk)?;
36+
Ok(next_chunk(&mut guard))
9537
}
96-
}
9738

98-
impl fmt::Display for CompressError {
99-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
100-
f.write_str(&self.0)
39+
#[neon(task)]
40+
fn finish(self) -> Result<Vec<u8>, Error> {
41+
let mut guard = self.lock()?;
42+
guard.try_finish()?;
43+
Ok(next_chunk(&mut guard))
10144
}
10245
}
10346

104-
impl Error for CompressError {}
105-
106-
impl AsRef<str> for CompressError {
107-
fn as_ref(&self) -> &str {
108-
&self.0
47+
// Rust allows multiple `impl` blocks. Rust methods that should not be exposed to JavaScript
48+
// can be written here.
49+
impl CompressStream {
50+
// Attempt to obtain a mutable reference to the stream
51+
fn lock(&self) -> Result<MutexGuard<'_, GzEncoder<Vec<u8>>>, Error> {
52+
// Use `try_lock` instead of `lock` because multiple concurrent calls
53+
// to the encoder is undefined. The caller *must* be careful to serially
54+
// write to the stream. `Transform` provides this guarantee by applying
55+
// backpressure and buffering. The `Mutex` should always be unlocked.
56+
self.encoder
57+
.try_lock()
58+
.map_err(|_| Error::new("GzEncoder already locked"))
10959
}
11060
}
11161

112-
// Create a boxed `CompressStream` that can be passed to JavaScript and back
113-
fn compress_new(mut cx: FunctionContext) -> JsResult<JsBox<CompressStream>> {
114-
// Best compression because why not?
115-
let stream = CompressStream::new(Compression::best());
116-
117-
// `cx.boxed` creates an opaque pointer that can cross the FFI boundary
118-
Ok(cx.boxed(stream))
119-
}
120-
121-
// Compress a chunk of data on the Node worker thread pool, returning a promise
122-
// that may contain compressed data.
123-
fn compress_chunk(mut cx: FunctionContext) -> JsResult<JsPromise> {
124-
// This is some funky syntax, but it's taking the first argument to the function,
125-
// attempting to downcast it as a `JsBox<CompressStream>` and finally calling
126-
// the `Clone` implementation on `CompressStream`. The `&**` is due to a couple
127-
// of smart pointers. Smart pointers are types that implement the
128-
// [`Deref`](https://doc.rust-lang.org/std/ops/trait.Deref.html) trait.
129-
//
130-
// The outer type is `neon::handle::Handle`. `Handle` is a type used by Neon for
131-
// ensuring that references to JavaScript values cannot be held after they have
132-
// been garbage collected. The next type is `JsBox` which is a smart pointer for
133-
// holding a reference to Rust data in JavaScript. The `**` dereferences these two
134-
// types, giving a `CompressStream`. However, it's impossible to move out of a
135-
// `JsBox`, so a reference is immediately taken with `&`. Finally, we can call the
136-
// `clone` implementation on `CompressStream`.
137-
let stream = (**cx.argument::<JsBox<CompressStream>>(0)?).clone();
138-
139-
// The 2nd argument is `encoding`. However, gzip is encoding agnostic and we do not need it.
140-
// let encoding = cx.argument::<JsString>(1)?;
141-
142-
// Grab the 3rd argument as a `Uint8Array`. The data is immediately converted to a
143-
// `Vec<u8>` by borrowing as a `&[u8]` and cloning.
144-
let chunk = cx.argument::<JsTypedArray<u8>>(2)?.as_slice(&cx).to_vec();
145-
146-
let promise = cx
147-
// Create a task to execute on the Node worker pool
148-
.task(move || stream.write(chunk))
149-
// Convert the result of the task into an `ArrayBuffer` and resolve the promise
150-
// on the JavaScript main thread.
151-
.promise(CompressStream::and_buffer);
152-
153-
Ok(promise)
154-
}
155-
156-
// Complete compressing the data and get the remaining output
157-
fn compress_finish(mut cx: FunctionContext) -> JsResult<JsPromise> {
158-
// Get a shallow clone of `CompressStream`; same as in `compress_chunk`
159-
// This is an alternative to the `**` syntax used earlier. Instead, it uses auto-deref
160-
// and universal call syntax for the `clone` call to coerce to proper type.
161-
let stream = CompressStream::clone(&*cx.argument::<JsBox<CompressStream>>(0)?);
162-
163-
let promise = cx
164-
// Finish the stream on the Node worker pool
165-
.task(move || stream.finish())
166-
// Convert the remaining output into an `ArrayBuffer` and resolve the promise
167-
// on the JavaScript main thread.
168-
.promise(CompressStream::and_buffer);
169-
170-
Ok(promise)
171-
}
172-
173-
#[neon::main]
174-
// Called once when the module is loaded
175-
fn main(mut cx: ModuleContext) -> NeonResult<()> {
176-
// Export each of the Neon functions as part of the module
177-
cx.export_function("compressNew", compress_new)?;
178-
cx.export_function("compressChunk", compress_chunk)?;
179-
cx.export_function("compressFinish", compress_finish)?;
180-
181-
Ok(())
62+
fn next_chunk(encoder: &mut GzEncoder<Vec<u8>>) -> Vec<u8> {
63+
let chunk = encoder.get_mut().clone();
64+
encoder.get_mut().truncate(0);
65+
chunk
18266
}

0 commit comments

Comments
 (0)