Skip to content

Commit d736521

Browse files
committed
test: add worker concurrency tests to prevent job duplication
1 parent 6d3be20 commit d736521

File tree

3 files changed

+401
-0
lines changed

3 files changed

+401
-0
lines changed
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
import { test as JapaTest } from '@japa/runner'
2+
import { setTimeout } from 'node:timers/promises'
3+
import { Worker } from '../../src/worker.js'
4+
import { Locator } from '../../src/locator.js'
5+
import { Job } from '../../src/job.js'
6+
import type { Adapter } from '../../src/contracts/adapter.js'
7+
import type { QueueManagerConfig } from '../../src/types/main.js'
8+
9+
interface WorkerConcurrencyTestSuiteOptions {
10+
test: typeof JapaTest
11+
createAdapter: () => Adapter | Promise<Adapter>
12+
}
13+
14+
export function registerWorkerConcurrencyTestSuite(options: WorkerConcurrencyTestSuiteOptions) {
15+
const { test } = options
16+
17+
test('single job should be processed exactly once with concurrency > 1', async ({
18+
assert,
19+
cleanup,
20+
}) => {
21+
const jobExecutions: Map<string, number> = new Map()
22+
23+
class TrackingJob extends Job<{ jobId: string }> {
24+
static jobName = 'TrackingJob'
25+
26+
async execute() {
27+
const count = jobExecutions.get(this.payload.jobId) || 0
28+
jobExecutions.set(this.payload.jobId, count + 1)
29+
await setTimeout(50)
30+
}
31+
}
32+
33+
const adapter = await options.createAdapter()
34+
Locator.register('TrackingJob', TrackingJob)
35+
36+
cleanup(() => Locator.clear())
37+
38+
const config: QueueManagerConfig = {
39+
default: 'test',
40+
adapters: { test: () => adapter },
41+
worker: { concurrency: 5 },
42+
}
43+
44+
const worker = new Worker(config)
45+
46+
cleanup(async () => {
47+
await worker.stop()
48+
})
49+
50+
// Push a single job
51+
await adapter.pushOn('default', {
52+
id: 'single-job-1',
53+
name: 'TrackingJob',
54+
payload: { jobId: 'job-1' },
55+
attempts: 0,
56+
})
57+
58+
// Process until idle
59+
let cycles = 0
60+
const maxCycles = 20
61+
while (cycles < maxCycles) {
62+
const cycle = await worker.processCycle(['default'])
63+
cycles++
64+
if (cycle?.type === 'idle') break
65+
}
66+
67+
assert.equal(
68+
jobExecutions.get('job-1'),
69+
1,
70+
'Job should be executed exactly once with concurrency 5'
71+
)
72+
})
73+
74+
test('multiple jobs should each be processed exactly once with high concurrency', async ({
75+
assert,
76+
cleanup,
77+
}) => {
78+
const jobExecutions: Map<string, number> = new Map()
79+
80+
class TrackingJob extends Job<{ jobId: string }> {
81+
static jobName = 'TrackingJob'
82+
83+
async execute() {
84+
const count = jobExecutions.get(this.payload.jobId) || 0
85+
jobExecutions.set(this.payload.jobId, count + 1)
86+
await setTimeout(30)
87+
}
88+
}
89+
90+
const adapter = await options.createAdapter()
91+
Locator.register('TrackingJob', TrackingJob)
92+
93+
cleanup(() => Locator.clear())
94+
95+
const config: QueueManagerConfig = {
96+
default: 'test',
97+
adapters: { test: () => adapter },
98+
worker: { concurrency: 5 },
99+
}
100+
101+
const worker = new Worker(config)
102+
103+
cleanup(async () => {
104+
await worker.stop()
105+
})
106+
107+
// Push 3 jobs (less than concurrency)
108+
for (let i = 1; i <= 3; i++) {
109+
await adapter.pushOn('default', {
110+
id: `job-${i}`,
111+
name: 'TrackingJob',
112+
payload: { jobId: `job-${i}` },
113+
attempts: 0,
114+
})
115+
}
116+
117+
// Process until idle
118+
let cycles = 0
119+
const maxCycles = 30
120+
while (cycles < maxCycles) {
121+
const cycle = await worker.processCycle(['default'])
122+
cycles++
123+
if (cycle?.type === 'idle') break
124+
}
125+
126+
assert.equal(jobExecutions.size, 3, 'All 3 jobs should have been executed')
127+
for (const [jobId, count] of jobExecutions) {
128+
assert.equal(count, 1, `${jobId} should be executed exactly once`)
129+
}
130+
})
131+
132+
test('jobs should not be duplicated under concurrent popFrom stress', async ({
133+
assert,
134+
cleanup,
135+
}) => {
136+
const jobExecutions: Map<string, number> = new Map()
137+
const executionOrder: string[] = []
138+
139+
class TrackingJob extends Job<{ jobId: string }> {
140+
static jobName = 'TrackingJob'
141+
142+
async execute() {
143+
executionOrder.push(this.payload.jobId)
144+
const count = jobExecutions.get(this.payload.jobId) || 0
145+
jobExecutions.set(this.payload.jobId, count + 1)
146+
// Very short execution to maximize concurrency pressure
147+
await setTimeout(5)
148+
}
149+
}
150+
151+
const adapter = await options.createAdapter()
152+
Locator.register('TrackingJob', TrackingJob)
153+
154+
cleanup(() => Locator.clear())
155+
156+
const config: QueueManagerConfig = {
157+
default: 'test',
158+
adapters: { test: () => adapter },
159+
worker: { concurrency: 10 },
160+
}
161+
162+
const worker = new Worker(config)
163+
164+
cleanup(async () => {
165+
await worker.stop()
166+
})
167+
168+
// Push many jobs quickly
169+
const jobCount = 20
170+
for (let i = 1; i <= jobCount; i++) {
171+
await adapter.pushOn('default', {
172+
id: `stress-job-${i}`,
173+
name: 'TrackingJob',
174+
payload: { jobId: `stress-job-${i}` },
175+
attempts: 0,
176+
})
177+
}
178+
179+
// Process until idle
180+
let cycles = 0
181+
const maxCycles = 100
182+
while (cycles < maxCycles) {
183+
const cycle = await worker.processCycle(['default'])
184+
cycles++
185+
if (cycle?.type === 'idle') break
186+
}
187+
188+
// Verify no duplicates
189+
assert.equal(jobExecutions.size, jobCount, `All ${jobCount} jobs should have been executed`)
190+
for (const [jobId, count] of jobExecutions) {
191+
assert.equal(count, 1, `${jobId} should be executed exactly once`)
192+
}
193+
194+
// Verify execution order has no duplicates
195+
const uniqueExecutions = new Set(executionOrder)
196+
assert.equal(
197+
executionOrder.length,
198+
uniqueExecutions.size,
199+
'No job should appear twice in execution order'
200+
)
201+
})
202+
}

tests/worker.spec.ts

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,4 +922,133 @@ test.group('Worker', () => {
922922

923923
assert.isFalse(foundJob, 'Job should not have been recovered - it exceeded maxStalledCount')
924924
})
925+
926+
test('should not process the same job multiple times with concurrency > 1', async ({
927+
assert,
928+
cleanup,
929+
}) => {
930+
const jobExecutions: Map<string, number> = new Map()
931+
932+
class TrackingJob extends Job<{ jobId: string }> {
933+
async execute() {
934+
const count = jobExecutions.get(this.payload.jobId) || 0
935+
jobExecutions.set(this.payload.jobId, count + 1)
936+
// Add a small delay to ensure concurrent execution window
937+
await setTimeout(50)
938+
}
939+
}
940+
941+
const sharedAdapter = memory()()
942+
943+
const localConfig = {
944+
default: 'memory',
945+
adapters: { memory: () => sharedAdapter },
946+
locations: ['./jobs/**/*'],
947+
worker: {
948+
concurrency: 5,
949+
},
950+
}
951+
952+
Locator.register('TrackingJob', TrackingJob)
953+
954+
const worker = new Worker(localConfig)
955+
956+
cleanup(async () => {
957+
Locator.clear()
958+
await worker.stop()
959+
})
960+
961+
// Push only ONE job but with concurrency of 5
962+
await sharedAdapter.push({
963+
id: 'single-job',
964+
name: 'TrackingJob',
965+
payload: { jobId: 'job-1' },
966+
attempts: 0,
967+
priority: 0,
968+
})
969+
970+
// Process until idle
971+
let cycles = 0
972+
const maxCycles = 20
973+
while (cycles < maxCycles) {
974+
const cycle = await worker.processCycle(['default'])
975+
cycles++
976+
977+
if (cycle?.type === 'idle') {
978+
break
979+
}
980+
}
981+
982+
// The job should have been executed exactly ONCE
983+
assert.equal(
984+
jobExecutions.get('job-1'),
985+
1,
986+
'Job should be executed exactly once, not multiple times due to concurrency'
987+
)
988+
})
989+
990+
test('should process each job exactly once with multiple jobs and high concurrency', async ({
991+
assert,
992+
cleanup,
993+
}) => {
994+
const jobExecutions: Map<string, number> = new Map()
995+
996+
class TrackingJob extends Job<{ jobId: string }> {
997+
async execute() {
998+
const count = jobExecutions.get(this.payload.jobId) || 0
999+
jobExecutions.set(this.payload.jobId, count + 1)
1000+
// Add delay to create overlap window
1001+
await setTimeout(30)
1002+
}
1003+
}
1004+
1005+
const sharedAdapter = memory()()
1006+
1007+
const localConfig = {
1008+
default: 'memory',
1009+
adapters: { memory: () => sharedAdapter },
1010+
locations: ['./jobs/**/*'],
1011+
worker: {
1012+
concurrency: 5,
1013+
},
1014+
}
1015+
1016+
Locator.register('TrackingJob', TrackingJob)
1017+
1018+
const worker = new Worker(localConfig)
1019+
1020+
cleanup(async () => {
1021+
Locator.clear()
1022+
await worker.stop()
1023+
})
1024+
1025+
// Push 3 jobs with concurrency of 5
1026+
for (let i = 1; i <= 3; i++) {
1027+
await sharedAdapter.push({
1028+
id: `job-${i}`,
1029+
name: 'TrackingJob',
1030+
payload: { jobId: `job-${i}` },
1031+
attempts: 0,
1032+
priority: 0,
1033+
})
1034+
}
1035+
1036+
// Process until idle
1037+
let cycles = 0
1038+
const maxCycles = 30
1039+
while (cycles < maxCycles) {
1040+
const cycle = await worker.processCycle(['default'])
1041+
cycles++
1042+
1043+
if (cycle?.type === 'idle') {
1044+
break
1045+
}
1046+
}
1047+
1048+
// Each job should have been executed exactly ONCE
1049+
assert.equal(jobExecutions.size, 3, 'All 3 jobs should have been executed')
1050+
for (const [jobId, count] of jobExecutions) {
1051+
assert.equal(count, 1, `${jobId} should be executed exactly once`)
1052+
}
1053+
})
9251054
})

0 commit comments

Comments
 (0)