Skip to content

Commit b0a7808

Browse files
committed
feat: testing @wendelmax/tasklets
1 parent b853453 commit b0a7808

6 files changed

Lines changed: 110 additions & 118 deletions

File tree

package-lock.json

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020
"@faker-js/faker": "^8.4.1"
2121
},
2222
"dependencies": {
23+
"@wendelmax/tasklets": "^2.2.0",
2324
"cli-progress": "^3.12.0",
2425
"draftlog": "^1.0.13",
2526
"mongodb": "^6.5.0",
2627
"pg": "^8.11.5",
2728
"sqlite3": "^5.1.7"
2829
}
29-
}
30+
}

src/background-task.js

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/cluster.js

Lines changed: 0 additions & 64 deletions
This file was deleted.

src/index.js

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,32 @@
1-
import { initialize } from "./cluster.js"
21
import { getMongoConnection, getPostgresConnection } from './db.js'
32
import cliProgress from 'cli-progress'
4-
import { setTimeout } from 'node:timers/promises'
3+
import Tasklets from '@wendelmax/tasklets'
4+
import path from 'node:path'
5+
6+
// Configure Tasklets worker pool
7+
const tasklets = new Tasklets()
8+
tasklets.configure({
9+
maxWorkers: 8,
10+
minWorkers: 8,
11+
idleTimeout: 30000,
12+
workload: 'io',
13+
adaptive: true,
14+
})
15+
16+
// Worker module path (CJS module loaded via require() inside worker threads)
17+
const INSERT_WORKER = `MODULE:${path.join(import.meta.dirname, 'insert-worker.cjs')}`
18+
19+
// Connect to databases
520
const mongoDB = await getMongoConnection()
621
const postgresDB = await getPostgresConnection()
22+
723
const ITEMS_PER_PAGE = 4000
8-
const CLUSTER_SIZE = 99
9-
const TASK_FILE = new URL('./background-task.js', import.meta.url).pathname
1024

11-
// console.log(`there was ${await postgresDB.students.count()} items on Postgres, deleting all...`)
25+
// Clean Postgres before migration
1226
await postgresDB.students.deleteAll()
1327

28+
// Async generator to paginate through MongoDB
1429
async function* getAllPagedData(itemsPerPage, page = 0) {
15-
1630
const data = mongoDB.students.find().skip(page).limit(itemsPerPage)
1731
const items = await data.toArray()
1832
if (!items.length) return
@@ -23,39 +37,38 @@ async function* getAllPagedData(itemsPerPage, page = 0) {
2337
}
2438

2539
const total = await mongoDB.students.countDocuments()
26-
// console.log(`total items on DB: ${total}`)
2740

41+
// Progress bar
2842
const progress = new cliProgress.SingleBar({
2943
format: 'progress [{bar}] {percentage}% | {value}/{total} | {duration}s',
3044
clearOnComplete: false,
31-
}, cliProgress.Presets.shades_classic);
32-
33-
progress.start(total, 0);
34-
let totalProcessed = 0
35-
const cp = initialize(
36-
{
37-
backgroundTaskFile: TASK_FILE,
38-
clusterSize: CLUSTER_SIZE,
39-
amountToBeProcessed: total,
40-
async onMessage(message) {
41-
progress.increment()
42-
43-
if (++totalProcessed !== total) return
44-
// console.log(`all ${amountToBeProcessed} processed! Exiting...`)
45-
progress.stop()
46-
cp.killAll()
47-
48-
const insertedOnSQLite = await postgresDB.students.count()
49-
console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnSQLite}`)
50-
console.log(`are the same? ${total === insertedOnSQLite ? 'yes' : 'no'}`)
51-
process.exit()
52-
53-
}
54-
}
55-
)
56-
await setTimeout(1000)
57-
58-
for await (const data of getAllPagedData(ITEMS_PER_PAGE)) {
59-
cp.sendToChild(data)
45+
}, cliProgress.Presets.shades_classic)
46+
47+
progress.start(total, 0)
48+
49+
// Process all pages in parallel using tasklets
50+
const promises = []
51+
52+
for await (const items of getAllPagedData(ITEMS_PER_PAGE)) {
53+
// Convert MongoDB docs to plain objects (strip ObjectId etc.)
54+
const plainItems = items.map(({ name, email, age, registeredAt }) => ({ name, email, age, registeredAt }))
55+
56+
const promise = tasklets.run(INSERT_WORKER, plainItems).then(count => {
57+
progress.increment(count)
58+
})
59+
promises.push(promise)
6060
}
6161

62+
await Promise.all(promises)
63+
64+
progress.stop()
65+
66+
const insertedOnPostgres = await postgresDB.students.count()
67+
console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnPostgres}`)
68+
console.log(`are the same? ${total === insertedOnPostgres ? 'yes' : 'no'}`)
69+
70+
// Cleanup
71+
await mongoDB.client.close()
72+
await postgresDB.client.end()
73+
await tasklets.terminate()
74+
process.exit()

src/insert-worker.cjs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Worker module for Postgres batch inserts
2+
// Loaded by tasklets worker threads via MODULE: prefix
3+
4+
const { Client } = require('pg')
5+
6+
async function insertBatch(items) {
7+
const client = new Client({
8+
user: 'erickwendel',
9+
host: process.env.POSTGRES_HOST || 'localhost',
10+
database: 'school',
11+
password: 'mypassword',
12+
port: 5432,
13+
})
14+
15+
await client.connect()
16+
17+
try {
18+
// Build a single bulk INSERT for performance
19+
const values = []
20+
const placeholders = []
21+
let idx = 1
22+
23+
for (const item of items) {
24+
placeholders.push(`($${idx++}, $${idx++}, $${idx++}, $${idx++})`)
25+
values.push(item.name, item.email, item.age, item.registeredAt)
26+
}
27+
28+
const query = `INSERT INTO students (name, email, age, registered_at) VALUES ${placeholders.join(', ')}`
29+
await client.query(query, values)
30+
} finally {
31+
await client.end()
32+
}
33+
34+
return items.length
35+
}
36+
37+
module.exports = insertBatch

0 commit comments

Comments
 (0)