Skip to content

Commit ccf1d07

Browse files
committed
feat: run query as a micro task to prevent race conditions with emitter
1 parent 921c0bc commit ccf1d07

1 file changed

Lines changed: 59 additions & 57 deletions

File tree

packages/cbjs/src/queryexecutor.ts

Lines changed: 59 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -67,71 +67,73 @@ export class QueryExecutor<T extends CouchbaseClusterTypes = CouchbaseClusterTyp
6767
});
6868
});
6969

70-
exec((cppErr, resp) => {
71-
const err = errorFromCpp(cppErr);
72-
if (err) {
73-
emitter.emit('error', err);
74-
emitter.emit('end');
75-
return;
76-
}
70+
queueMicrotask(() => {
71+
exec((cppErr, resp) => {
72+
const err = errorFromCpp(cppErr);
73+
if (err) {
74+
emitter.emit('error', err);
75+
emitter.emit('end');
76+
return;
77+
}
7778

78-
invariant(resp);
79+
invariant(resp);
7980

80-
const rowParser = parser ?? JSON.parse;
81+
const rowParser = parser ?? JSON.parse;
8182

82-
resp.rows.forEach((row) => {
83-
emitter.emit('row', rowParser(row) as TRow);
84-
});
83+
resp.rows.forEach((row) => {
84+
emitter.emit('row', rowParser(row) as TRow);
85+
});
8586

86-
{
87-
const metaData = resp.meta;
88-
89-
let warnings: QueryWarning[];
90-
if (metaData.warnings) {
91-
warnings = metaData.warnings.map(
92-
(warningData) =>
93-
new QueryWarning({
94-
code: warningData.code,
95-
message: warningData.message,
96-
})
97-
);
98-
} else {
99-
warnings = [];
100-
}
87+
{
88+
const metaData = resp.meta;
89+
90+
let warnings: QueryWarning[];
91+
if (metaData.warnings) {
92+
warnings = metaData.warnings.map(
93+
(warningData) =>
94+
new QueryWarning({
95+
code: warningData.code,
96+
message: warningData.message,
97+
})
98+
);
99+
} else {
100+
warnings = [];
101+
}
101102

102-
let metrics: QueryMetrics | undefined;
103-
if (metaData.metrics) {
104-
const metricsData = metaData.metrics;
105-
106-
metrics = new QueryMetrics({
107-
elapsedTime: metricsData.elapsed_time,
108-
executionTime: metricsData.execution_time,
109-
sortCount: metricsData.sort_count || 0,
110-
resultCount: metricsData.result_count || 0,
111-
resultSize: metricsData.result_size || 0,
112-
mutationCount: metricsData.mutation_count || 0,
113-
errorCount: metricsData.error_count || 0,
114-
warningCount: metricsData.warning_count || 0,
115-
});
116-
} else {
117-
metrics = undefined;
118-
}
103+
let metrics: QueryMetrics | undefined;
104+
if (metaData.metrics) {
105+
const metricsData = metaData.metrics;
106+
107+
metrics = new QueryMetrics({
108+
elapsedTime: metricsData.elapsed_time,
109+
executionTime: metricsData.execution_time,
110+
sortCount: metricsData.sort_count || 0,
111+
resultCount: metricsData.result_count || 0,
112+
resultSize: metricsData.result_size || 0,
113+
mutationCount: metricsData.mutation_count || 0,
114+
errorCount: metricsData.error_count || 0,
115+
warningCount: metricsData.warning_count || 0,
116+
});
117+
} else {
118+
metrics = undefined;
119+
}
119120

120-
const meta = new QueryMetaData<WithMetrics>({
121-
requestId: metaData.request_id,
122-
clientContextId: metaData.client_context_id,
123-
status: metaData.status as QueryStatus,
124-
signature: metaData.signature ? JSON.parse(metaData.signature) : undefined,
125-
warnings,
126-
metrics: metrics as If<WithMetrics, QueryMetrics, undefined>,
127-
profile: metaData.profile ? JSON.parse(metaData.profile) : undefined,
128-
});
121+
const meta = new QueryMetaData<WithMetrics>({
122+
requestId: metaData.request_id,
123+
clientContextId: metaData.client_context_id,
124+
status: metaData.status as QueryStatus,
125+
signature: metaData.signature ? JSON.parse(metaData.signature) : undefined,
126+
warnings,
127+
metrics: metrics as If<WithMetrics, QueryMetrics, undefined>,
128+
profile: metaData.profile ? JSON.parse(metaData.profile) : undefined,
129+
});
129130

130-
emitter.emit('meta', meta);
131-
}
131+
emitter.emit('meta', meta);
132+
}
132133

133-
emitter.emit('end');
134-
return;
134+
emitter.emit('end');
135+
return;
136+
});
135137
});
136138

137139
return emitter;

0 commit comments

Comments
 (0)