Skip to content

Data streams v2#1985

Open
1egoman wants to merge 36 commits into
mainfrom
data-streams-v2
Open

Data streams v2#1985
1egoman wants to merge 36 commits into
mainfrom
data-streams-v2

Conversation

@1egoman

@1egoman 1egoman commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Warning

This change relies on some new SFU updates to be able to function. Checkout main (really, livekit/livekit@86a79f8 or later), build + run the SFU locally, and point any test app at this local SFU for testing.

Overview

Data streams v2 consists of some updates to data streams to add three things:

  1. Single packet data streams for short (< 15kb), finite length payloads
  2. DEFLATE compression, when both the sender and all receivers support compression/decompression
  3. A maximum 15kb size for data stream attributes to close this as a DOS vector

This pull request contains the initial web implementation and initial data streams spec which will be used to propagate this across all other client sdks.

Performance

Both 1 and 2 together at least doubles ⚠️ data stream throughput across the spectrum of network conenctions and payload sizes.

Before:
image

After:
image

1egoman added 30 commits June 4, 2026 12:58
Using sendText means that future optimizations to send a single data
stream packet can be enabled.
…t / split into 15k MTU

Doing this makes data arrive a bit faster since it's not waiting for
compressed bytes to accumulate before sending out a "fully filled" packet
Tag each data stream chunk with a "compression index" which if > 0
points to a given DecompressionStream which should be used for
preprocessing received bytes.
…s whole data stream

fflate allows flushing the compressed bytes midway through so that they
can be emitted "together" as a chunk in the output stream, which a
DecompressionStream('deflate-raw') can properly consume.
…nStream for single packet sends

So now fflate is only used in the multi packet data stream case, and
could be more easily broken out into its own concrete dependency block.
…ally make much of a difference for agent transcriptions

And I don't think it would be too hard to re-introduce in the future as
an opt in setting, either with fflate, or if compressionstream gets an
explicit "flush" option
@changeset-bot

changeset-bot Bot commented Jun 23, 2026

Copy link
Copy Markdown

⚠️ No Changeset found

Latest commit: d77161f

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Comment thread examples/data-stream-benchmark/api.ts Dismissed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: The diff is quite large, and a big reason why it is is because I have both this example app AND the spec checked in. I'll remove these before an eventual merge which will make the diff a lot more reasonable.

Feel free to ignore this examples/data-stream-benchmark directory when reviewing.

Image

Comment on lines +12 to +20
/** The data-streams-v2 wire signals carried directly on the header: the compression flag and the
* inline single-packet payload. Both used to live in reserved header attributes; they are now
* first-class protobuf fields on `DataStream.Header`. */
export interface StreamHeaderV2Fields {
/** Compression applied to the inline/chunked payload. Defaults to `NONE` when omitted. */
compression?: DataStream_CompressionType;
/** The full payload smuggled into the header for single-packet (inline) sends. */
inlineContent?: Uint8Array;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: get rid of this and inline these two fields directly into the relevant function calls. Or name this something more generic if I decide to keep it.

mimeType: 'text/plain',
timestamp: Date.now(),
topic: options?.topic ?? '',
size: totalTextLength, // NOTE: size is always the pre-compression byte length

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: I'm not 100% sure this is the right call, making the data stream header size always the uncompressed byte length. This means effectively that the value could be larger than the literal compressed bytes sent, which is a little weird.

The reason why I decided to do it is it is impossible to determine the compressed byte length ahead of time before compressing all the data. So if this were the compressed length, you'd have to receive all file chunks, compress each one, then buffer all in memory before you could finish the stream and determine the full length, which obviously isn't going to work.

Comment on lines +161 to +162
// set text part of progress to 1
handleProgress(1, 0);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: add some tests for the onProgress stuff. I'm not sure I've fully exercised it to make sure I didn't break anything here. Though worth noting I did find some bugs where in some paths onProgress wasn't being called before so I'm actually not convinced it was working properly before either.

Comment on lines +295 to +299
// Incremental text streams are never compressed (CompressionStream does not support flushing
// mid-stream); one-shot compression lives in sendText.
//
// Note that a future streamText could send a context-takeover style deflate-raw stream with
// intermedia explicit `Z_SYNC_FLUSH`s - receivers already will handle this properly today.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers - this is an important callout. The Outgoing- half of this change doesn't support compression for streamed data streams (only sendText / sendFile) but the infrastructure exists for this on the Incoming- half so this could be added in the future in a fully backwards compatible way without needing a new clientProtocol bump.

Comment on lines +333 to 335
// FIXME: make this a global event to ensure "max listener" warning won't get logged for lots of
// in flight data streams.
engine.once(EngineEvent.Closing, onEngineClose);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: fix this pre-existing bug as part of this larger change, I think right now there will me a "max listener" warning if you try to send too many data streams concurrently.

Comment on lines +364 to +368
// Phase 1: Try to send as a single packet data stream
//
// This is not being done explictly for files, because it's challenging to determine ahead of
// time how well the file contents will compress (and whether the total output will be under the
// MTU). Revisit this in the future though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important callout for reviewers - sendFile has some special behavior (it doesn't do single packet data streams) because it's difficult / impossible to determine if the whole file will fit into a single packet without buffering in memory aggressively.

Comment on lines +90 to +96
export async function deflateRawDecompress(data: Uint8Array): Promise<Uint8Array> {
const ds = new DecompressionStream('deflate-raw');
const writer = ds.writable.getWriter();
writer.write(data as NonSharedUint8Array);
writer.close();
return collect(ds.readable);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: I opted to use deflate-raw instead of deflate or gzip because deflate-raw is the most compact representation and the extra metadata that deflate includes (checksum and ordering) is superfluous in a SCTP delivered reliable data channel context.

Comment thread DATA_STREAMS_SPEC.md

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: here is the (mostly complete) spec for this feature.

@1egoman 1egoman requested a review from ladvoc June 23, 2026 21:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants