Skip to content

Commit 3314008

Browse files
authored
Merge pull request #746 from SprocketBot/codex/postgres-queue-state
2 parents 6ac4e3b + bfa995e commit 3314008

111 files changed

Lines changed: 3050 additions & 3633 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

clients/discord-bot/src/main.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import {HttpAdapterHost, NestFactory} from "@nestjs/core";
2-
import {Transport} from "@nestjs/microservices";
3-
import {AllExceptionsFilter, config} from "@sprocketbot/common";
2+
import {AllExceptionsFilter, config, PostgresServer} from "@sprocketbot/common";
43
import fetch from "node-fetch";
54

65
import {AppModule} from "./app.module";
@@ -10,16 +9,8 @@ global.fetch = fetch as any;
109

1110
async function bootstrap(): Promise<void> {
1211
const app = await NestFactory.createMicroservice(AppModule, {
13-
transport: Transport.RMQ,
1412
logger: config.logger.levels,
15-
options: {
16-
urls: [config.transport.url],
17-
queue: config.transport.bot_queue,
18-
queueOptions: {
19-
durable: true,
20-
},
21-
heartbeat: 120,
22-
},
13+
strategy: new PostgresServer({queue: config.transport.bot_queue}),
2314
});
2415

2516
const httpAdapter = app.get(HttpAdapterHost);

clients/image-generation-frontend/package.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
"@commitlint/cli": "^12.1.4",
1515
"@commitlint/config-conventional": "^12.1.4",
1616
"@sveltejs/kit": "^1.0.0-next.156",
17-
"@types/amqplib": "^0.8.2",
1817
"@typescript-eslint/eslint-plugin": "^5.10.1",
1918
"@typescript-eslint/parser": "^5.10.1",
2019
"autoprefixer": "^10.3.3",
@@ -35,7 +34,6 @@
3534
"@sveltejs/adapter-node": "^1.0.0-next.43",
3635
"@types/knex": "^0.16.1",
3736
"@types/minio": "^7.0.10",
38-
"amqplib": "^0.8.0",
3937
"knex": "^0.95.10",
4038
"minio": "^7.0.19",
4139
"nats": "^2.2.0",

clients/image-generation-frontend/src/routes/api/exec/[id].ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
// import type {EndpointOutput, Request} from "@sveltejs/kit";
22
import { ReportTemplateDAO } from '$utils/server/database/ReportTemplate.dao';
3-
import { rmqRequest } from '$utils/rabbitmq';
3+
import { postgresRpcRequest } from '$utils/postgres-rpc';
44

55
export async function POST({ request, params }) {
66
const data = await request.json();
77
// const data = JSON.parse(body.toString());
88
const results = await ReportTemplateDAO.runReport(params.id, data.filterValues);
99

1010
try {
11-
const res: any = await rmqRequest('media-gen.img.create', {
11+
const res: any = await postgresRpcRequest('media-gen.img.create', {
1212
inputFile: data.inputFile,
1313
outputFile: data.outputFile,
1414
template: results,

clients/image-generation-frontend/src/routes/api/run/[imageType]/[name].ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { rmqRequest } from '$src/utils/rabbitmq';
1+
import { postgresRpcRequest } from '$src/utils/postgres-rpc';
22
import { ReportTemplateDAO } from '$src/utils/server/database/ReportTemplate.dao';
33
import type { EndpointOutput, Request } from '@sveltejs/kit';
44

@@ -40,7 +40,7 @@ export async function POST({ body, params }: Request): Promise<EndpointOutput> {
4040
const results = await ReportTemplateDAO.runReport(params.id, data.filterValues);
4141

4242
try {
43-
const res: any = await rmqRequest('media-gen.img.create', {
43+
const res: any = await postgresRpcRequest('media-gen.img.create', {
4444
inputFile: data.inputFile,
4545
outputFile: data.outputFile,
4646
filterValues: results,
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import pg from 'pg';
2+
import { v4 } from 'uuid';
3+
import config from '../config';
4+
5+
const { Pool } = pg;
6+
7+
const pool = new Pool({
8+
...config.knex,
9+
ssl: false,
10+
});
11+
12+
export async function postgresRpcRequest(
13+
pattern: string,
14+
data: Record<string, unknown> = {},
15+
): Promise<unknown> {
16+
const id = v4();
17+
await pool.query('CREATE SCHEMA IF NOT EXISTS sprocket');
18+
await pool.query(
19+
`
20+
INSERT INTO sprocket.platform_rpc_queue (id, queue, pattern, payload)
21+
VALUES ($1, $2, $3, $4)
22+
`,
23+
[id, config.transport.queue, pattern, data],
24+
);
25+
26+
const deadline = Date.now() + 30000;
27+
while (Date.now() < deadline) {
28+
const result = await pool.query(
29+
'SELECT status, response, error FROM sprocket.platform_rpc_queue WHERE id = $1',
30+
[id],
31+
);
32+
const row = result.rows[0];
33+
if (row?.status === 'completed') {
34+
await pool.query('DELETE FROM sprocket.platform_rpc_queue WHERE id = $1', [id]);
35+
return row.response;
36+
}
37+
if (row?.status === 'failed') {
38+
throw new Error(row.error?.message ?? 'Request failed');
39+
}
40+
await new Promise(resolve => setTimeout(resolve, 250));
41+
}
42+
43+
throw new Error('Request timed out.');
44+
}

clients/image-generation-frontend/src/utils/rabbitmq.ts

Lines changed: 0 additions & 57 deletions
This file was deleted.

common/package.json

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
"@commitlint/config-conventional": "^15.0.0",
2828
"@types/config": "^0.0.40",
2929
"@types/minio": "^7.0.12",
30+
"@types/pg": "^8.20.0",
3031
"@types/uuid": "^8.3.4",
3132
"husky": "^7.0.4",
3233
"rimraf": "^3.0.2",
@@ -39,19 +40,16 @@
3940
"@nestjs/common": "^8.2.3",
4041
"@nestjs/microservices": "^8.2.3",
4142
"@sprocketbot/gql-client": "^0.0.3",
42-
"@types/amqplib": "^0.8.2",
4343
"@types/ioredis": "^4.28.5",
4444
"@types/jest": "^29.0.3",
4545
"@types/node": "^17.0.13",
46-
"amqp-connection-manager": "^3.7.0",
47-
"amqplib": "^0.8.0",
48-
"celery-node": "^0.5.8",
4946
"config": "^3.3.6",
5047
"dotenv": "^16.0.3",
5148
"eslint-plugin-simple-import-sort": "^7.0.0",
5249
"ioredis": "^4.28.2",
5350
"minio": "^7.0.26",
5451
"nanoid": "^3.3.4",
52+
"pg": "^8.7.1",
5553
"redis": "^4.0.4",
5654
"rxjs": "^7.4.0",
5755
"uuid": "^8.3.2",

common/src/celery/celery.module.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import {Module} from "@nestjs/common";
22

3+
import {PostgresModule} from "../postgres";
34
import {CeleryService} from "./celery.service";
45

56
@Module({
7+
imports: [PostgresModule],
68
providers: [CeleryService],
79
exports: [CeleryService],
810
})

0 commit comments

Comments
 (0)