-
Notifications
You must be signed in to change notification settings - Fork 429
Expand file tree
/
Copy pathrunMultiThread.test.ts
More file actions
126 lines (105 loc) · 4.03 KB
/
Copy pathrunMultiThread.test.ts
File metadata and controls
126 lines (105 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import { runMultiThread } from "../src/run-multi-thread";
import * as admin from "firebase-admin";
import { CliConfig } from "../src/types";
import * as workerpool from "workerpool";
import { FIRESTORE_DEFAULT_DATABASE } from "../src/config";
// Initialize Firebase Admin (Ensure credentials are set)
if (admin.apps.length === 0) {
admin.initializeApp();
}
const firestore = admin.firestore();
// Use a unique collection name to avoid conflicts with existing data
const uniqueTestCollection = `test_multithread_${Date.now()}`;
// Mock the workerpool to simulate document processing
jest.mock("workerpool", () => ({
pool: jest.fn().mockReturnValue({
exec: jest.fn(),
stats: jest.fn().mockReturnValue({ activeTasks: 0, pendingTasks: 0 }),
terminate: jest.fn().mockResolvedValue(undefined),
}),
}));
describe("runMultiThread Partitioning with Firestore", () => {
let mockConfig: CliConfig;
let mockPool: any;
let mockExec: jest.Mock;
let actualDocumentCount: number = 0;
beforeAll(async () => {
console.log("Creating test documents...");
// Create test documents in a unique collection to avoid conflicts
for (let i = 0; i < 10; i++) {
await firestore
.collection(
`${uniqueTestCollection}_${i % 2}/subcoll/${uniqueTestCollection}`
)
.add({
index: i,
timestamp: admin.firestore.FieldValue.serverTimestamp(),
});
}
// Count actual documents in the collection group
const collectionGroupDocs = await firestore
.collectionGroup(uniqueTestCollection)
.get();
actualDocumentCount = collectionGroupDocs.size;
console.log(
`Created ${actualDocumentCount} test documents in collection group '${uniqueTestCollection}'`
);
});
beforeEach(() => {
jest.clearAllMocks();
mockPool = workerpool.pool();
mockExec = mockPool.exec;
// Track calls to ensure we only return documents once per test
let hasBeenCalled = false;
// Mock exec to return all documents on first call, 0 on subsequent calls
// This simulates processing all documents in the partitions that are created
mockExec.mockImplementation(() => {
if (!hasBeenCalled) {
hasBeenCalled = true;
return Promise.resolve(actualDocumentCount);
}
return Promise.resolve(0);
});
mockConfig = {
kind: "CONFIG",
projectId: "test-project",
bigQueryProjectId: "test-bq-project",
sourceCollectionPath: `${uniqueTestCollection}_0/subcoll/${uniqueTestCollection}`,
datasetId: "testDataset",
tableId: "testTable",
batchSize: 5, // Small batch size for controlled partitioning
queryCollectionGroup: true,
datasetLocation: "us",
multiThreaded: true,
useNewSnapshotQuerySyntax: false,
useEmulator: false,
rawChangeLogName: "testTable_raw_changelog",
cursorPositionFile: "/tmp/test_cursor_position",
firestoreInstanceId: FIRESTORE_DEFAULT_DATABASE,
};
});
it("should correctly distribute partitions and process all documents", async () => {
console.log("Running multi-thread import...");
const totalProcessed = await runMultiThread(mockConfig);
console.log(`Total documents processed: ${totalProcessed}`);
// Ensure workerpool.exec() was called (at least once for the partition)
expect(mockExec).toHaveBeenCalled();
// Ensure `runMultiThread` terminates properly
expect(mockPool.terminate).toHaveBeenCalled();
// Check if all test docs are processed
expect(totalProcessed).toBe(actualDocumentCount);
});
afterAll(async () => {
console.log("Cleaning up test data...");
// Clean up test collections
const collectionRefs = [
`${uniqueTestCollection}_0/subcoll/${uniqueTestCollection}`,
`${uniqueTestCollection}_1/subcoll/${uniqueTestCollection}`,
];
for (const collectionPath of collectionRefs) {
const collectionRef = firestore.collection(collectionPath);
const docs = await collectionRef.listDocuments();
await Promise.all(docs.map((doc) => doc.delete()));
}
});
});