Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions packages/cubejs-client-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -786,19 +786,47 @@ class CubeApi {

const [schema, ...data] = response.error.split('\n');

let parsedSchema: any;
try {
const parsedSchema = JSON.parse(schema);
return {
schema: parsedSchema.schema,
data: data
.filter((d: string) => d.trim().length)
.map((d: string) => JSON.parse(d).data)
.reduce((a: any, b: any) => a.concat(b), []),
...(parsedSchema.lastRefreshTime ? { lastRefreshTime: parsedSchema.lastRefreshTime } : {}),
};
parsedSchema = JSON.parse(schema);
} catch (err) {
// Schema line isn't valid JSON — the whole `error` payload is a real error.
throw new Error(response.error);
}

const rows: any[] = [];

for (const line of data) {
if (line.trim().length) {
let parsed: any;
try {
parsed = JSON.parse(line);
} catch (err) {
// A non-JSON line after a valid schema means a malformed payload — fall
// back to surfacing the raw response rather than dropping rows silently.
throw new Error(response.error);
}

// The stream can interleave an error chunk after the schema (e.g. a
// post-processing/cast error surfaced mid-result). Such a line has no
// `data`, so the previous `JSON.parse(d).data` concat pushed an `undefined`
// "phantom" row and silently swallowed the failure. Surface it instead —
// matching how `cubeSqlStream` classifies `error` chunks.
if (parsed.error) {
throw new Error(parsed.error);
}

if (parsed.data) {
rows.push(...parsed.data);
}
}
}

return {
schema: parsedSchema.schema,
data: rows,
...(parsedSchema.lastRefreshTime ? { lastRefreshTime: parsedSchema.lastRefreshTime } : {}),
};
},
options,
callback
Expand Down
33 changes: 33 additions & 0 deletions packages/cubejs-client-core/test/CubeApi.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,21 @@ describe('CubeApi cubeSql', () => {
JSON.stringify({ data: [['Active']] }),
].join('\n');

// The backend streams a schema chunk, then (on a post-processing failure) an error
// chunk. The error must surface as a rejection instead of being concatenated as an
// `undefined` phantom row.
const cubeSqlResponseBodyWithError = [
JSON.stringify({
schema: [
{ name: 'created_date', column_type: 'String' },
],
}),
JSON.stringify({
error: 'Post-Processing Error: Cast error: Error parsing \'2026-05-01\' as timestamp',
requestId: '2fbe44e4-df6f-420d-ae39-376c802323b4-span-1',
}),
].join('\n');

test('should parse lastRefreshTime from response', async () => {
vi.spyOn(HttpTransport.prototype, 'request').mockImplementation(() => ({
subscribe: (cb) => Promise.resolve(cb({
Expand Down Expand Up @@ -471,6 +486,24 @@ describe('CubeApi cubeSql', () => {
expect(res.schema).toEqual([{ name: 'status', column_type: 'String' }]);
expect(res.data).toEqual([['Active']]);
});

test('should surface an error chunk that follows the schema instead of swallowing it', async () => {
vi.spyOn(HttpTransport.prototype, 'request').mockImplementation(() => ({
subscribe: (cb) => Promise.resolve(cb({
status: 200,
text: () => Promise.resolve(JSON.stringify({ error: cubeSqlResponseBodyWithError })),
} as any,
async () => undefined as any))
}));

const cubeApi = new CubeApi('token', {
apiUrl: 'http://localhost:4000/cubejs-api/v1',
});

await expect(
cubeApi.cubeSql('SELECT created_date FROM deals')
).rejects.toThrow('Post-Processing Error: Cast error: Error parsing \'2026-05-01\' as timestamp');
});
});

describe('CubeApi with baseRequestId', () => {
Expand Down
Loading