Skip to content

Commit 586d039

Browse files
committed
feat(knex)!: add QueueSchemaService
1 parent ef81a3a commit 586d039

File tree

7 files changed

+230
-123
lines changed

7 files changed

+230
-123
lines changed

.changelog/queue-schema-service.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# QueueSchemaService for Knex Adapter
2+
3+
## Breaking Change
4+
5+
The Knex adapter no longer automatically creates database tables on first use. You must now create the tables explicitly using the new `QueueSchemaService` or your own migration system.
6+
7+
## New Feature: QueueSchemaService
8+
9+
A new `QueueSchemaService` class is now exported from the main package, providing methods to create and drop the queue tables in a controlled manner.
10+
11+
### Methods
12+
13+
- `createJobsTable(tableName?, extend?)` - Creates the jobs table with the default schema
14+
- `createSchedulesTable(tableName?, extend?)` - Creates the schedules table with the default schema
15+
- `dropJobsTable(tableName?)` - Drops the jobs table if it exists
16+
- `dropSchedulesTable(tableName?)` - Drops the schedules table if it exists
17+
18+
### Usage
19+
20+
```typescript
21+
import { QueueSchemaService } from '@boringnode/queue'
22+
import Knex from 'knex'
23+
24+
const connection = Knex({ client: 'pg', connection: '...' })
25+
const schemaService = new QueueSchemaService(connection)
26+
27+
// Create tables with default names
28+
await schemaService.createJobsTable()
29+
await schemaService.createSchedulesTable()
30+
31+
// Or with custom table names
32+
await schemaService.createJobsTable('my_jobs')
33+
await schemaService.createSchedulesTable('my_schedules')
34+
35+
// Extend with custom columns
36+
await schemaService.createJobsTable('queue_jobs', (table) => {
37+
table.string('tenant_id', 255).nullable()
38+
})
39+
```
40+
41+
### AdonisJS Migration Example
42+
43+
```typescript
44+
import { BaseSchema } from '@adonisjs/lucid/schema'
45+
import { QueueSchemaService } from '@boringnode/queue'
46+
47+
export default class extends BaseSchema {
48+
async up() {
49+
const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
50+
51+
await schemaService.createJobsTable()
52+
await schemaService.createSchedulesTable()
53+
}
54+
55+
async down() {
56+
const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
57+
58+
await schemaService.dropSchedulesTable()
59+
await schemaService.dropJobsTable()
60+
}
61+
}
62+
```
63+
64+
## Migration Guide
65+
66+
If you were relying on automatic table creation, you need to:
67+
68+
1. Create a migration that uses `QueueSchemaService` to create the tables
69+
2. Run the migration before starting your application
70+
71+
This change gives you full control over when and how the tables are created, and allows you to extend the schema with custom columns.

README.md

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,50 @@ const adapter = knex(connection)
209209
const adapter = knex(config, 'custom_jobs_table')
210210
```
211211

212-
The adapter automatically creates tables on first use.
212+
</details>
213+
214+
<details>
215+
<summary><strong>Database setup with QueueSchemaService</strong></summary>
216+
217+
The Knex adapter requires tables to be created before use. Use `QueueSchemaService` to create them:
218+
219+
```typescript
220+
import { QueueSchemaService } from '@boringnode/queue'
221+
import Knex from 'knex'
222+
223+
const connection = Knex({ client: 'pg', connection: '...' })
224+
const schemaService = new QueueSchemaService(connection)
225+
226+
// Create tables with default names
227+
await schemaService.createJobsTable()
228+
await schemaService.createSchedulesTable()
229+
230+
// Or extend with custom columns
231+
await schemaService.createJobsTable('queue_jobs', (table) => {
232+
table.string('tenant_id', 255).nullable()
233+
})
234+
```
235+
236+
**AdonisJS migration example:**
237+
238+
```typescript
239+
import { BaseSchema } from '@adonisjs/lucid/schema'
240+
import { QueueSchemaService } from '@boringnode/queue'
241+
242+
export default class extends BaseSchema {
243+
async up() {
244+
const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
245+
await schemaService.createJobsTable()
246+
await schemaService.createSchedulesTable()
247+
}
248+
249+
async down() {
250+
const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
251+
await schemaService.dropSchedulesTable()
252+
await schemaService.dropJobsTable()
253+
}
254+
}
255+
```
213256

214257
</details>
215258

index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export { Locator } from './src/locator.js'
55
export { Schedule } from './src/schedule.js'
66
export { ScheduleBuilder } from './src/schedule_builder.js'
77
export { JobBatchDispatcher } from './src/job_batch_dispatcher.js'
8+
export { QueueSchemaService } from './src/services/queue_schema.js'
89
export {
910
customBackoff,
1011
linearBackoff,

src/drivers/knex_adapter.ts

Lines changed: 0 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ export class KnexAdapter implements Adapter {
5151
readonly #schedulesTable: string
5252
readonly #ownsConnection: boolean
5353
#workerId: string = ''
54-
#initialized: boolean = false
5554

5655
constructor(config: KnexAdapterOptions) {
5756
this.#connection = config.connection
@@ -64,85 +63,6 @@ export class KnexAdapter implements Adapter {
6463
this.#workerId = workerId
6564
}
6665

67-
/**
68-
* Ensure all required tables exist.
69-
* Creates them if not exists, handles race conditions.
70-
*/
71-
async #ensureTables(): Promise<void> {
72-
if (this.#initialized) return
73-
74-
await Promise.all([this.#createJobsTable(), this.#createSchedulesTable()])
75-
76-
this.#initialized = true
77-
}
78-
79-
async #createJobsTable(): Promise<void> {
80-
try {
81-
await this.#connection.schema.createTable(this.#jobsTable, (table) => {
82-
table.string('id', 255).notNullable()
83-
table.string('queue', 255).notNullable()
84-
table.enu('status', ['pending', 'active', 'delayed', 'completed', 'failed']).notNullable()
85-
table.text('data').notNullable()
86-
table.bigint('score').unsigned().nullable()
87-
table.string('worker_id', 255).nullable()
88-
table.bigint('acquired_at').unsigned().nullable()
89-
table.bigint('execute_at').unsigned().nullable()
90-
table.bigint('finished_at').unsigned().nullable()
91-
table.text('error').nullable()
92-
table.primary(['id', 'queue'])
93-
table.index(['queue', 'status', 'score'])
94-
table.index(['queue', 'status', 'execute_at'])
95-
table.index(['queue', 'status', 'finished_at'])
96-
})
97-
} catch {
98-
/**
99-
* If table creation fails, verify the table actually exists.
100-
* This handles race conditions where multiple instances try to create
101-
* the table simultaneously.
102-
*/
103-
const hasTable = await this.#connection.schema.hasTable(this.#jobsTable)
104-
if (!hasTable) {
105-
throw new Error(`Failed to create table "${this.#jobsTable}"`)
106-
}
107-
}
108-
}
109-
110-
async #createSchedulesTable(): Promise<void> {
111-
try {
112-
await this.#connection.schema.createTable(this.#schedulesTable, (table) => {
113-
table.string('id', 255).primary()
114-
table.string('status', 50).notNullable().defaultTo('active')
115-
table.string('name', 255).notNullable()
116-
table.text('payload').notNullable()
117-
table.string('cron_expression', 255).nullable()
118-
table.bigint('every_ms').unsigned().nullable()
119-
table.string('timezone', 100).notNullable().defaultTo('UTC')
120-
table.timestamp('from_date').nullable()
121-
table.timestamp('to_date').nullable()
122-
table.integer('run_limit').unsigned().nullable()
123-
table.integer('run_count').unsigned().notNullable().defaultTo(0)
124-
table.timestamp('next_run_at').nullable()
125-
table.timestamp('last_run_at').nullable()
126-
table
127-
.timestamp('created_at')
128-
.notNullable()
129-
.defaultTo(this.#connection.fn.now())
130-
// Indexes
131-
table.index(['status', 'next_run_at'])
132-
})
133-
} catch {
134-
/**
135-
* If table creation fails, verify the table actually exists.
136-
* This handles race conditions where multiple instances try to create
137-
* the table simultaneously.
138-
*/
139-
const hasTable = await this.#connection.schema.hasTable(this.#schedulesTable)
140-
if (!hasTable) {
141-
throw new Error(`Failed to create table "${this.#schedulesTable}"`)
142-
}
143-
}
144-
}
145-
14666
async destroy(): Promise<void> {
14767
if (this.#ownsConnection) {
14868
await this.#connection.destroy()
@@ -154,8 +74,6 @@ export class KnexAdapter implements Adapter {
15474
}
15575

15676
async popFrom(queue: string): Promise<AcquiredJob | null> {
157-
await this.#ensureTables()
158-
15977
const now = Date.now()
16078

16179
// First, move ready delayed jobs to pending
@@ -255,8 +173,6 @@ export class KnexAdapter implements Adapter {
255173
}
256174

257175
async completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise<void> {
258-
await this.#ensureTables()
259-
260176
const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete)
261177

262178
if (!keep) {
@@ -294,8 +210,6 @@ export class KnexAdapter implements Adapter {
294210
error?: Error,
295211
removeOnFail?: JobRetention
296212
): Promise<void> {
297-
await this.#ensureTables()
298-
299213
const { keep, maxAge, maxCount } = resolveRetention(removeOnFail)
300214

301215
if (!keep) {
@@ -329,8 +243,6 @@ export class KnexAdapter implements Adapter {
329243
}
330244

331245
async getJob(jobId: string, queue: string): Promise<JobRecord | null> {
332-
await this.#ensureTables()
333-
334246
const row = await this.#connection(this.#jobsTable)
335247
.where('id', jobId)
336248
.where('queue', queue)
@@ -383,8 +295,6 @@ export class KnexAdapter implements Adapter {
383295
}
384296

385297
async retryJob(jobId: string, queue: string, retryAt?: Date): Promise<void> {
386-
await this.#ensureTables()
387-
388298
const now = Date.now()
389299

390300
// Get the active job
@@ -438,8 +348,6 @@ export class KnexAdapter implements Adapter {
438348
}
439349

440350
async pushOn(queue: string, jobData: JobData): Promise<void> {
441-
await this.#ensureTables()
442-
443351
const priority = jobData.priority ?? DEFAULT_PRIORITY
444352
const timestamp = Date.now()
445353
const score = calculateScore(priority, timestamp)
@@ -458,8 +366,6 @@ export class KnexAdapter implements Adapter {
458366
}
459367

460368
async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
461-
await this.#ensureTables()
462-
463369
const executeAt = Date.now() + delay
464370

465371
await this.#connection(this.#jobsTable).insert({
@@ -478,8 +384,6 @@ export class KnexAdapter implements Adapter {
478384
async pushManyOn(queue: string, jobs: JobData[]): Promise<void> {
479385
if (jobs.length === 0) return
480386

481-
await this.#ensureTables()
482-
483387
const now = Date.now()
484388
const rows = jobs.map((job) => ({
485389
id: job.id,
@@ -497,8 +401,6 @@ export class KnexAdapter implements Adapter {
497401
}
498402

499403
async sizeOf(queue: string): Promise<number> {
500-
await this.#ensureTables()
501-
502404
const result = await this.#connection(this.#jobsTable)
503405
.where('queue', queue)
504406
.where('status', 'pending')
@@ -513,8 +415,6 @@ export class KnexAdapter implements Adapter {
513415
stalledThreshold: number,
514416
maxStalledCount: number
515417
): Promise<number> {
516-
await this.#ensureTables()
517-
518418
const now = Date.now()
519419
const stalledCutoff = now - stalledThreshold
520420

@@ -570,8 +470,6 @@ export class KnexAdapter implements Adapter {
570470
}
571471

572472
async createSchedule(config: ScheduleConfig): Promise<string> {
573-
await this.#ensureTables()
574-
575473
const id = config.id ?? randomUUID()
576474

577475
const data = {
@@ -611,8 +509,6 @@ export class KnexAdapter implements Adapter {
611509
}
612510

613511
async getSchedule(id: string): Promise<ScheduleData | null> {
614-
await this.#ensureTables()
615-
616512
const row = await this.#connection(this.#schedulesTable)
617513
.where('id', id)
618514
.first()
@@ -622,8 +518,6 @@ export class KnexAdapter implements Adapter {
622518
}
623519

624520
async listSchedules(options?: ScheduleListOptions): Promise<ScheduleData[]> {
625-
await this.#ensureTables()
626-
627521
let query = this.#connection(this.#schedulesTable).whereNot('status', 'cancelled')
628522

629523
if (options?.status) {
@@ -638,8 +532,6 @@ export class KnexAdapter implements Adapter {
638532
id: string,
639533
updates: Partial<Pick<ScheduleData, 'status' | 'nextRunAt' | 'lastRunAt' | 'runCount'>>
640534
): Promise<void> {
641-
await this.#ensureTables()
642-
643535
const data: Record<string, any> = {}
644536

645537
if (updates.status !== undefined) data.status = updates.status
@@ -655,16 +547,12 @@ export class KnexAdapter implements Adapter {
655547
}
656548

657549
async deleteSchedule(id: string): Promise<void> {
658-
await this.#ensureTables()
659-
660550
await this.#connection(this.#schedulesTable)
661551
.where('id', id)
662552
.delete()
663553
}
664554

665555
async claimDueSchedule(): Promise<ScheduleData | null> {
666-
await this.#ensureTables()
667-
668556
const now = new Date()
669557

670558
return this.#connection.transaction(async (trx) => {

0 commit comments

Comments
 (0)