Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions packages/cubejs-backend-shared/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
import genericPool, { Pool as GenericPool, Factory, Options } from 'generic-pool';

export class PoolTimeoutError extends Error {
public readonly poolName: string;

public constructor(poolName: string) {
super(`ResourceRequest timed out (pool: ${poolName})`);
this.name = 'PoolTimeoutError';
this.poolName = poolName;
public constructor(
protected readonly poolName: string
) {
super(`ResourceRequest timed out (${poolName})`);
}
}

Expand Down
11 changes: 8 additions & 3 deletions packages/cubejs-base-driver/src/BaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,14 @@ const DbTypeValueMatcher: Record<string, ((v: any) => boolean)> = {
text: () => true
};

/**
* Base driver class.
*/
export function createPoolName(driverName: string, dataSource: string, preAggregations: boolean = false): string {
if (preAggregations) {
return `${driverName}#${dataSource}@preAggregations`;
}

return `${driverName}#${dataSource}`;
}

export abstract class BaseDriver implements DriverInterface {
private readonly testConnectionTimeoutValue: number = 10000;

Expand Down
27 changes: 17 additions & 10 deletions packages/cubejs-postgres-driver/src/PostgresDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import {
BaseDriver,
DownloadQueryResultsOptions, DownloadTableMemoryData, DriverInterface,
GenericDataBaseType, IndexesSQL, TableStructure, StreamOptions,
StreamTableDataWithTypes, QueryOptions, DownloadQueryResultsResult, DriverCapabilities, TableColumn,
StreamTableDataWithTypes, QueryOptions, DownloadQueryResultsResult, DriverCapabilities, TableColumn, createPoolName,
} from '@cubejs-backend/base-driver';
import { QueryStream } from './QueryStream';
import { PgClient, PgClientConfig } from './PgClient';
import { ConnectionError, PostgresError } from './errors';

const GenericTypeToPostgres: Record<GenericDataBaseType, string> = {
string: 'text',
Expand Down Expand Up @@ -138,8 +139,9 @@ export class PostgresDriver<Config extends PostgresDriverConfiguration = Postgre
...config
};

this.pool = new Pool<PgClient>('postgres', {
create: async () => this.createConnection(poolConfig),
const poolName = createPoolName('postgres', dataSource, preAggregations);
this.pool = new Pool<PgClient>(poolName, {
create: async () => this.createConnection(poolConfig, poolName),
validate: async (client) => {
if (client.isEnding() || client.isEnded()) {
return false;
Expand Down Expand Up @@ -180,10 +182,15 @@ export class PostgresDriver<Config extends PostgresDriverConfiguration = Postgre
this.enabled = true;
}

protected async createConnection(poolConfig: PgClientConfig): Promise<PgClient> {
protected async createConnection(poolConfig: PgClientConfig, poolName: string): Promise<PgClient> {
const client = new PgClient(poolConfig);
client.on('error', (err) => this.databasePoolError(err));
await client.connect();

try {
await client.connect();
} catch (e: unknown) {
throw new ConnectionError(e as Error, poolName);
}

return client;
}
Expand Down Expand Up @@ -275,7 +282,7 @@ export class PostgresDriver<Config extends PostgresDriverConfiguration = Postgre
await conn.query('SELECT $1::int AS number', ['1']);
} catch (e) {
if ((e as Error).toString().indexOf('no pg_hba.conf entry for host') !== -1) {
throw new Error(`Please use CUBEJS_DB_SSL=true to connect: ${(e as Error).toString()}`);
throw new PostgresError(`Please use CUBEJS_DB_SSL=true to connect: ${(e as Error).toString()}`, { cause: e as Error });
}

throw e;
Expand Down Expand Up @@ -328,7 +335,7 @@ export class PostgresDriver<Config extends PostgresDriverConfiguration = Postgre
return fields.map((f) => {
const postgresType = this.getPostgresTypeForField(f.dataTypeID);
if (!postgresType) {
throw new Error(
throw new PostgresError(
`Unable to detect type for field "${f.name}" with dataTypeID: ${f.dataTypeID}`
);
}
Expand Down Expand Up @@ -384,7 +391,7 @@ export class PostgresDriver<Config extends PostgresDriverConfiguration = Postgre
// See https://github.com/brianc/node-postgres/blob/92cb640fd316972e323ced6256b2acd89b1b58e0/packages/pg-protocol/src/buffer-writer.ts#L32-L37
const length = (values?.length ?? 0);
if (length >= 65536) {
throw new Error(`PostgreSQL protocol does not support more than 65535 parameters, but ${length} passed`);
throw new PostgresError(`PostgreSQL protocol does not support more than 65535 parameters, but ${length} passed`);
}
}

Expand Down Expand Up @@ -417,7 +424,7 @@ export class PostgresDriver<Config extends PostgresDriverConfiguration = Postgre

public async createTable(quotedTableName: string, columns: TableColumn[]): Promise<void> {
if (quotedTableName.length > 63) {
throw new Error('PostgreSQL can not work with table names longer than 63 symbols. ' +
throw new PostgresError('PostgreSQL can not work with table names longer than 63 symbols. ' +
`Consider using the 'sqlAlias' attribute in your cube definition for ${quotedTableName}.`);
}
return super.createTable(quotedTableName, columns);
Expand Down Expand Up @@ -460,7 +467,7 @@ export class PostgresDriver<Config extends PostgresDriverConfiguration = Postgre
indexesSql: IndexesSQL
) {
if (!tableData.rows) {
throw new Error(`${this.constructor} driver supports only rows upload`);
throw new PostgresError(`${this.constructor} driver supports only rows upload`);
}

await this.createTable(table, columns);
Expand Down
19 changes: 19 additions & 0 deletions packages/cubejs-postgres-driver/src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export class PostgresError extends Error {
public name = 'PostgresError';

public constructor(message: string, options?: ErrorOptions) {
super(message, options);
}
}

export class ConnectionError extends PostgresError {
public readonly name = 'ConnectionError';

public constructor(cause: Error, poolName: string) {
const message = cause instanceof AggregateError
? cause.errors.map((e: Error) => e.message).join(', ')
: cause.message;

super(`Unable to connect to the database (${poolName}): ${message}`, { cause });
}
}
1 change: 1 addition & 0 deletions packages/cubejs-postgres-driver/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ import { PostgresDriver } from './PostgresDriver';

export * from './PostgresDriver';
export * from './PgClient';
export * from './errors';

export default PostgresDriver;
4 changes: 2 additions & 2 deletions packages/cubejs-redshift-driver/src/RedshiftDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ export class RedshiftDriver extends PostgresDriver<RedshiftDriverConfiguration>
this.credentials = credentialsProvider;
}

protected override async createConnection(poolConfig: PgClientConfig): Promise<PgClient> {
protected async createConnection(poolConfig: PgClientConfig, poolName: string): Promise<PgClient> {
const { user, password } = await this.credentials.getCredentials();
return super.createConnection({ ...poolConfig, user, password });
return super.createConnection({ ...poolConfig, user, password }, poolName);
}

protected primaryKeysQuery() {
Expand Down
Loading