Skip to content

Commit 7f6d94b

Browse files
OEvgenyCopilotCopilot
authored
feat: markdown streaming (#5799)
* feat: markdown streaming * Changelog * Add event-stream-adapter element * Fix candidate for missing definitions * Add test * Flaky * Add actual streaming test * Update packages/bundle/src/markdown/private/createDecorate.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update __tests__/html2/markdown/vnext/markdownStreaming.html Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: split committed/active blocks at micromark-event level instead of DOM lastElementChild Agent-Logs-Url: https://github.com/microsoft/BotFramework-WebChat/sessions/0cc28430-8345-487c-a77b-dd62abdb9973 Co-authored-by: OEvgeny <2841858+OEvgeny@users.noreply.github.com> * Revert "fix: split committed/active blocks at micromark-event level instead of DOM lastElementChild" This reverts commit 55d1cc9. * Fix schemas * Fix comments * fix print stats * Rework and small test * Improve * Bring back trim() for the old path * Fix more edges with spacing and hopefully stabilize snaps --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: OEvgeny <2841858+OEvgeny@users.noreply.github.com>
1 parent b64f7a3 commit 7f6d94b

34 files changed

+40407
-109
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ Breaking changes in this release:
161161
- Added pull-based capabilities system for dynamically discovering adapter capabilities at runtime, in PR [#5679](https://github.com/microsoft/BotFramework-WebChat/pull/5679), by [@pranavjoshi001](https://github.com/pranavjoshi001)
162162
- Added Speech-to-Speech (S2S) support for real-time voice conversations, in PR [#5654](https://github.com/microsoft/BotFramework-WebChat/pull/5654), by [@pranavjoshi](https://github.com/pranavjoshi001)
163163
- Added core mute/unmute functionality for speech-to-speech via `useRecorder` hook (silent chunks keep server connection alive), in PR [#5688](https://github.com/microsoft/BotFramework-WebChat/pull/5688), by [@pranavjoshi](https://github.com/pranavjoshi001)
164+
- 🧪 Added incremental streaming Markdown renderer for livestreaming, in PR [#5799](https://github.com/microsoft/BotFramework-WebChat/pull/5799), by [@OEvgeny](https://github.com/OEvgeny)
164165

165166
### Changed
166167

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
<!DOCTYPE html>
2+
<html lang="en">
3+
<head>
4+
<meta charset="UTF-8">
5+
<title>Event Stream Adapter Custom Element</title>
6+
</head>
7+
<body>
8+
<script type="module">
9+
import { customElement } from '/assets/custom-element/custom-element.js';
10+
import { getActivityLivestreamingMetadata } from 'botframework-webchat-core';
11+
import { EventSourceParserStream } from 'eventsource-parser/stream';
12+
import createStreamCoalescer from '/assets/esm/adapter/createStreamCoalescer.js';
13+
import { forIterator } from '/assets/esm/adapter/demuxChainOfThought.js';
14+
15+
function createObservable(subscribe) {
16+
return Object.freeze({ subscribe });
17+
}
18+
19+
customElement('event-stream-adapter', currentDocument =>
20+
class EventStreamAdapterElement extends HTMLElement {
21+
static get observedAttributes() { return ['compat', 'href']; }
22+
23+
#abortController = null;
24+
#activities = [];
25+
#activityBuffer = [];
26+
#activityObservers = [];
27+
#activityWaiters = new Set();
28+
#directLine;
29+
30+
constructor() {
31+
super();
32+
33+
const self = this;
34+
35+
this.#directLine = Object.freeze({
36+
activity$: createObservable(observerOrNext => {
37+
const observer = typeof observerOrNext === 'function'
38+
? { next: observerOrNext }
39+
: observerOrNext;
40+
41+
for (const activity of self.#activityBuffer) {
42+
observer.next(activity);
43+
}
44+
45+
self.#activityObservers.push(observer);
46+
47+
return Object.freeze({
48+
unsubscribe() {
49+
const i = self.#activityObservers.indexOf(observer);
50+
i !== -1 && self.#activityObservers.splice(i, 1);
51+
}
52+
});
53+
}),
54+
55+
connectionStatus$: createObservable(observerOrNext => {
56+
const observer = typeof observerOrNext === 'function'
57+
? { next: observerOrNext }
58+
: observerOrNext;
59+
60+
observer.next(0);
61+
observer.next(1);
62+
observer.next(2);
63+
64+
return Object.freeze({ unsubscribe() {} });
65+
}),
66+
67+
end() {},
68+
69+
postActivity() {
70+
return createObservable(observerOrNext => {
71+
const observer = typeof observerOrNext === 'function'
72+
? { next: observerOrNext }
73+
: observerOrNext;
74+
75+
observer.next(crypto.randomUUID());
76+
observer.complete?.();
77+
78+
return Object.freeze({ unsubscribe() {} });
79+
});
80+
}
81+
});
82+
}
83+
84+
get compat() {
85+
return this.getAttribute('compat') || 'webchat';
86+
}
87+
88+
get directLine() {
89+
return this.#directLine;
90+
}
91+
92+
connectedCallback() {
93+
this.#load();
94+
}
95+
96+
disconnectedCallback() {
97+
this.#abortController?.abort();
98+
this.#abortController = null;
99+
}
100+
101+
#emitToAdapter(activity) {
102+
this.#activityBuffer.push(activity);
103+
104+
for (const observer of this.#activityObservers) {
105+
observer.next(activity);
106+
}
107+
}
108+
109+
#emitEvent(activity) {
110+
this.#activities.push(activity);
111+
112+
for (const resolve of this.#activityWaiters) {
113+
resolve();
114+
}
115+
116+
this.#activityWaiters.clear();
117+
118+
const meta = getActivityLivestreamingMetadata(activity);
119+
const element = this;
120+
const detail = { activity };
121+
122+
meta && Object.defineProperty(detail, 'activities', {
123+
enumerable: true,
124+
get: () => element.#iterateStreamActivities(meta.sessionId)
125+
});
126+
127+
this.dispatchEvent(new CustomEvent('activity', {
128+
bubbles: true,
129+
detail: Object.freeze(detail)
130+
}));
131+
}
132+
133+
async *#iterateStreamActivities(sessionId) {
134+
let cursor = 0;
135+
136+
for (;;) {
137+
while (cursor < this.#activities.length) {
138+
const activity = this.#activities[cursor++];
139+
const activityMeta = getActivityLivestreamingMetadata(activity);
140+
141+
if (!activityMeta || activityMeta.sessionId !== sessionId) {
142+
continue;
143+
}
144+
145+
yield activity;
146+
147+
if (activityMeta.type === 'final activity') {
148+
return;
149+
}
150+
}
151+
152+
await new Promise(resolve => this.#activityWaiters.add(resolve));
153+
}
154+
}
155+
156+
async #load() {
157+
const href = this.getAttribute('href');
158+
159+
if (!href) {
160+
return;
161+
}
162+
163+
this.#abortController?.abort();
164+
this.#abortController = new AbortController();
165+
166+
const { signal } = this.#abortController;
167+
const compat = this.compat;
168+
const useAdapter = compat === 'webchat' || compat === 'both';
169+
const useEvents = compat === 'events' || compat === 'both';
170+
171+
try {
172+
for await (const activity of forIterator({}, this.#fetchStreamed(href, signal))) {
173+
if (signal.aborted) {
174+
break;
175+
}
176+
177+
useAdapter && this.#emitToAdapter(activity);
178+
useEvents && this.#emitEvent(activity.raw);
179+
}
180+
} catch (error) {
181+
signal.aborted || console.error('event-stream-adapter:', error);
182+
}
183+
}
184+
185+
async *#fetchStreamed(href, signal) {
186+
const typingMap = new Map();
187+
const res = await fetch(new URL(href, location.href), { signal });
188+
189+
yield* res.body
190+
.pipeThrough(new TextDecoderStream())
191+
.pipeThrough(new EventSourceParserStream())
192+
.pipeThrough(
193+
new TransformStream({
194+
transform({ data, event }, controller) {
195+
if (event === 'end') {
196+
controller.terminate();
197+
return;
198+
}
199+
200+
if (event !== 'activity') {
201+
return;
202+
}
203+
204+
const activity = JSON.parse(data);
205+
206+
activity.raw = { ...activity };
207+
208+
if (
209+
activity.type === 'typing' &&
210+
activity.text &&
211+
activity.channelData?.streamType === 'streaming'
212+
) {
213+
const streamId = activity.channelData?.streamId || activity.id;
214+
let accumulated = typingMap.get(streamId) || '';
215+
216+
if (activity.channelData?.chunkType === 'delta') {
217+
accumulated += activity.text;
218+
activity.text = accumulated;
219+
} else {
220+
accumulated = activity.text;
221+
}
222+
223+
typingMap.set(streamId, accumulated);
224+
}
225+
226+
controller.enqueue(activity);
227+
}
228+
})
229+
)
230+
.pipeThrough(createStreamCoalescer());
231+
}
232+
}
233+
);
234+
</script>
235+
</body>
236+
</html>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/* eslint-disable security/detect-object-injection */
2+
/*!
3+
* Copyright (C) Microsoft Corporation. All rights reserved.
4+
*/
5+
6+
const NextLivestreamSequence = Symbol();
7+
const PreviousActivitySymbol = Symbol();
8+
const SessionIdSymbol = Symbol();
9+
10+
export default class LivestreamSession {
11+
constructor(sessionId) {
12+
this[NextLivestreamSequence] = 1;
13+
this[PreviousActivitySymbol] = undefined;
14+
this[SessionIdSymbol] = sessionId;
15+
}
16+
17+
/**
18+
* Last string, useful for decompressing delta-compressed chunks.
19+
*/
20+
get previousActivity() {
21+
return this[PreviousActivitySymbol];
22+
}
23+
24+
set previousActivity(value) {
25+
this[PreviousActivitySymbol] = value;
26+
}
27+
28+
/**
29+
* Activity ID of the session (and the first activity.)
30+
*
31+
* @type {string}
32+
*/
33+
get sessionId() {
34+
return this[SessionIdSymbol];
35+
}
36+
37+
get isConcluded() {
38+
return this[NextLivestreamSequence] === Infinity;
39+
}
40+
41+
/** @return {number} */
42+
getNextLivestreamSequence(
43+
/** @type {boolean | undefined} */
44+
isFinal = false
45+
) {
46+
if (isFinal) {
47+
this.previousActivity = undefined;
48+
49+
return (this[NextLivestreamSequence] = Infinity);
50+
}
51+
52+
return this[NextLivestreamSequence]++;
53+
}
54+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/* eslint-disable security/detect-object-injection */
2+
3+
/*!
4+
* Copyright (C) Microsoft Corporation. All rights reserved.
5+
*/
6+
7+
import LivestreamSession from './LivestreamSession.js';
8+
9+
const ActiveLivestreamSymbol = Symbol();
10+
11+
export default class LivestreamSessionManager {
12+
constructor() {
13+
this[ActiveLivestreamSymbol] = new Map();
14+
}
15+
16+
*concludeAll() {
17+
for (const [sessionId, session] of this[ActiveLivestreamSymbol]) {
18+
if (!session.isConcluded) {
19+
const { previousActivity } = session;
20+
const entitiesWithoutStreamInfo = (previousActivity?.entities ?? []).filter(
21+
({ type }) => type !== 'streaminfo'
22+
);
23+
24+
yield Object.freeze({
25+
...previousActivity,
26+
channelData: Object.freeze({
27+
...previousActivity?.channelData,
28+
chunkType: undefined,
29+
streamId: sessionId,
30+
streamSequence: undefined,
31+
streamType: 'final'
32+
}),
33+
entities: Object.freeze([...entitiesWithoutStreamInfo]),
34+
id: `${sessionId}/final`,
35+
text: previousActivity?.text,
36+
type: 'message'
37+
});
38+
}
39+
}
40+
}
41+
42+
has(livestreamSessionId) {
43+
return this[ActiveLivestreamSymbol].has(livestreamSessionId);
44+
}
45+
46+
*sequence(livestreamSessionId, activity, isFinal = false) {
47+
let livestreamSession = this[ActiveLivestreamSymbol].get(livestreamSessionId);
48+
49+
if (!livestreamSession) {
50+
livestreamSession = new LivestreamSession(livestreamSessionId);
51+
52+
this[ActiveLivestreamSymbol].set(livestreamSessionId, livestreamSession);
53+
}
54+
55+
if (livestreamSession.isConcluded) {
56+
return;
57+
}
58+
59+
const streamSequence = livestreamSession.getNextLivestreamSequence(isFinal);
60+
const entitiesWithoutStreamInfo = (activity.entities ?? []).filter(({ type }) => type !== 'streaminfo');
61+
62+
// We assume the chat adapter will do delta decompression.
63+
livestreamSession.previousActivity = activity;
64+
65+
yield Object.freeze({
66+
...activity,
67+
channelData: Object.freeze({
68+
...activity.channelData,
69+
chunkType: undefined,
70+
streamId: streamSequence === 1 ? undefined : livestreamSessionId,
71+
streamSequence: streamSequence === Infinity ? undefined : streamSequence,
72+
streamType: streamSequence === Infinity ? 'final' : 'streaming'
73+
}),
74+
entities: Object.freeze([...entitiesWithoutStreamInfo]),
75+
id: streamSequence === 1 ? livestreamSessionId : activity.id,
76+
text: livestreamSession.previousActivity.text,
77+
type: streamSequence === Infinity ? 'message' : 'typing'
78+
});
79+
}
80+
}

0 commit comments

Comments
 (0)