Skip to content

Commit c6462e4

Browse files
authored
feat(cubestore-driver): Queue - use exclusive flag with new Cube Store (cube-js#10488)
1 parent 4f29432 commit c6462e4

7 files changed

Lines changed: 120 additions & 4 deletions

File tree

packages/cubejs-backend-shared/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export * from './helpers';
1515
export * from './machine-id';
1616
export * from './type-helpers';
1717
export * from './shared-types';
18+
export * from './semver';
1819
export * from './http-utils';
1920
export * from './cli';
2021
export * from './proxy';
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
interface VersionPart {
2+
num: number;
3+
pre: string;
4+
}
5+
6+
function parseVersionParts(v: string): VersionPart[] {
7+
return v.split('.').map((segment) => {
8+
const idx = segment.indexOf('-');
9+
if (idx === -1) {
10+
return { num: parseInt(segment, 10) || 0, pre: '' };
11+
}
12+
13+
return {
14+
num: parseInt(segment.substring(0, idx), 10) || 0,
15+
pre: segment.substring(idx + 1),
16+
};
17+
});
18+
}
19+
20+
export function isVersionGte(version: string | null, minVersion: string): boolean {
21+
if (!version) {
22+
return false;
23+
}
24+
25+
const parts = parseVersionParts(version);
26+
const minParts = parseVersionParts(minVersion);
27+
28+
for (let i = 0; i < Math.max(parts.length, minParts.length); i++) {
29+
const a = parts[i] || { num: 0, pre: '' };
30+
const b = minParts[i] || { num: 0, pre: '' };
31+
32+
if (a.num > b.num) {
33+
return true;
34+
}
35+
36+
if (a.num < b.num) {
37+
return false;
38+
}
39+
40+
// The same numeric part — pre-release is less than no pre-release
41+
if (a.pre && !b.pre) {
42+
return false;
43+
}
44+
45+
if (!a.pre && b.pre) {
46+
return true;
47+
}
48+
}
49+
50+
return true;
51+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { isVersionGte } from '../src';
2+
3+
describe('isVersionGte', () => {
4+
test.each([
5+
// null version
6+
[null, '1.0.0', false],
7+
// equal versions
8+
['1.6.22', '1.6.22', true],
9+
['0.0.0', '0.0.0', true],
10+
// greater (major, minor, patch)
11+
['2.0.0', '1.6.22', true],
12+
['1.7.0', '1.6.22', true],
13+
['1.6.23', '1.6.22', true],
14+
// lesser (major, minor, patch)
15+
['0.9.99', '1.0.0', false],
16+
['1.5.99', '1.6.0', false],
17+
['1.6.21', '1.6.22', false],
18+
// different segment lengths
19+
['1.6', '1.6.0', true],
20+
['1.6.0', '1.6', true],
21+
['1.5', '1.6.0', false],
22+
// pre-release is less than the same clean version
23+
['1.6.22-alpha', '1.6.22', false],
24+
['1.6.22-beta', '1.6.22', false],
25+
['1.6.22-rc.1', '1.6.22', false],
26+
// pre-release of a higher version still passes
27+
['1.6.23-alpha', '1.6.22', true],
28+
['2.0.0-beta', '1.6.22', true],
29+
// clean version is greater than pre-release
30+
['1.6.22', '1.6.22-alpha', true],
31+
// both pre-release, same numeric — equal
32+
['1.6.22-alpha', '1.6.22-beta', true],
33+
])('isVersionGte(%j, %j) === %j', (version, minVersion, expected) => {
34+
expect(isVersionGte(version, minVersion)).toBe(expected);
35+
});
36+
});

packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,19 @@ import {
1414
QueryOptions,
1515
ExternalDriverCompatibilities, TableStructure, TableColumnQueryResult,
1616
} from '@cubejs-backend/base-driver';
17-
import { AsyncDebounce, getEnv } from '@cubejs-backend/shared';
17+
import { AsyncDebounce, getEnv, isVersionGte } from '@cubejs-backend/shared';
1818
import { format as formatSql, escape } from 'sqlstring';
1919
import fetch from 'node-fetch';
2020

2121
import { ConnectionConfig } from './types';
2222
import { WebSocketConnection } from './WebSocketConnection';
2323

24+
type CubeStoreCapability = 'queueExclusive';
25+
26+
const CubeStoreCapabilityMinVersion: Record<CubeStoreCapability, string> = {
27+
queueExclusive: '1.6.22',
28+
};
29+
2430
const GenericTypeToCubeStore: Record<string, string> = {
2531
string: 'varchar(255)',
2632
text: 'varchar(255)',
@@ -76,6 +82,12 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
7682
this.connection = new WebSocketConnection(`${this.baseUrl}/ws`);
7783
}
7884

85+
public async hasCapability(capability: CubeStoreCapability): Promise<boolean> {
86+
const minVersion = CubeStoreCapabilityMinVersion[capability];
87+
88+
return isVersionGte(await this.connection.getCubeStoreVersion(), minVersion);
89+
}
90+
7991
public async testConnection() {
8092
await this.query('SELECT 1', []);
8193
}

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
8282
values.push(this.prefixKey(this.redisHash(queryKey)));
8383
values.push(JSON.stringify(data));
8484

85-
const rows = await this.driver.query(`QUEUE ADD PRIORITY ?${options.orphanedTimeout ? ' ORPHANED ?' : ''} ? ?`, values);
85+
const exclusive = queryKey.persistent && await this.driver.hasCapability('queueExclusive');
86+
const rows = await this.driver.query(`QUEUE ADD${exclusive ? ' EXCLUSIVE' : ''} PRIORITY ?${options.orphanedTimeout ? ' ORPHANED ?' : ''} ? ?`, values);
8687
if (rows && rows.length) {
8788
return [
8889
rows[0].added === 'true' ? 1 : 0,

packages/cubejs-cubestore-driver/src/WebSocketConnection.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ export class WebSocketConnection {
4242

4343
private readonly connectionId: string;
4444

45+
private cubeStoreVersion: string | null = null;
46+
4547
public constructor(url: string) {
4648
this.url = url;
4749
this.messageCounter = 1;
@@ -57,6 +59,10 @@ export class WebSocketConnection {
5759
headers['x-process-id'] = getProcessUid();
5860

5961
const webSocket = new WebSocket(this.url, { headers }) as CubeStoreWebSocket;
62+
webSocket.on('upgrade', (response: any) => {
63+
this.cubeStoreVersion = response.headers['x-cubestore-version'] || null;
64+
});
65+
6066
webSocket.readyPromise = new Promise<CubeStoreWebSocket>((resolve, reject) => {
6167
webSocket.lastHeartBeat = new Date();
6268
const pingInterval = setInterval(() => {
@@ -284,6 +290,14 @@ export class WebSocketConnection {
284290
return this.sendMessage(messageId, builder.asUint8Array());
285291
}
286292

293+
public async getCubeStoreVersion(): Promise<string> {
294+
if (this.webSocket) {
295+
await this.webSocket.readyPromise;
296+
}
297+
298+
return this.cubeStoreVersion ?? '0.0.0';
299+
}
300+
287301
public close() {
288302
if (this.webSocket) {
289303
this.webSocket.close();

rust/cubestore/cubestore/src/http/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ impl HttpServer {
147147
.and_then(move |tx: mpsc::Sender<(mpsc::Sender<Arc<HttpMessage>>, SqlQueryContext, HttpMessage)>, sql_query_context: SqlQueryContext, ws: Ws| async move {
148148
let tx_to_move = tx.clone();
149149
let sql_query_context = sql_query_context.clone();
150-
Result::<_, Rejection>::Ok(ws.max_frame_size(max_frame_size).max_message_size(max_message_size).on_upgrade(async move |mut web_socket| {
150+
let reply = ws.max_frame_size(max_frame_size).max_message_size(max_message_size).on_upgrade(async move |mut web_socket| {
151151
let process_id = sql_query_context.process_id.as_deref().unwrap_or("None");
152152
trace!("WebSocket connection established (process_id: {})", process_id);
153153
let (response_tx, mut response_rx) = mpsc::channel::<Arc<HttpMessage>>(10000);
@@ -207,7 +207,8 @@ impl HttpServer {
207207
}
208208
};
209209
};
210-
}))
210+
});
211+
Result::<_, Rejection>::Ok(warp::reply::with_header(reply, "X-CubeStore-Version", env!("CARGO_PKG_VERSION")))
211212
});
212213

213214
let auth_filter_to_move = auth_filter.clone();

0 commit comments

Comments
 (0)