Skip to content

Commit 386f758

Browse files
committed
bulkWriter
1 parent 88ecda1 commit 386f758

6 files changed

Lines changed: 112 additions & 103 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ tf_apply:
1313
terraform -chdir=infra/tf init && terraform -chdir=infra/tf apply -auto-approve
1414

1515
bigquery_export_deploy:
16-
cd infra/bigquery-export && npm install && npm run buildpack
16+
cd infra/bigquery-export && npm run build
1717

1818
#bigquery_export_spark_deploy:
1919
# cd infra/bigquery_export_spark && gcloud builds submit --region=global --tag us-docker.pkg.dev/httparchive/bigquery-spark-procedures/firestore_export:latest
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
node_modules
2+
npm-debug.log
3+
.git
4+
.gitignore
5+
.env
6+
.nyc_output
7+
coverage
8+
*.md
9+
.DS_Store

infra/bigquery-export/Dockerfile

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ FROM node:current-slim
33

44
WORKDIR /usr/src/app
55

6-
COPY . .
6+
# Copy package files first for better caching
7+
COPY package*.json ./
78

8-
# Clean up the node_modules directory
9-
RUN rm -rf node_modules
9+
# Install dependencies
10+
RUN npm ci --only=production --quiet --no-fund --no-audit
1011

11-
RUN npm ci --only=production
12+
# Copy source code
13+
COPY . .
1214

1315
CMD ["node", "index.js"]
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
steps:
2+
- name: 'gcr.io/cloud-builders/docker'
3+
args: [
4+
'build',
5+
'-t', 'us.gcr.io/httparchive/cloud-run/bigquery-export',
6+
'.'
7+
]
8+
images:
9+
- 'us.gcr.io/httparchive/cloud-run/bigquery-export'

infra/bigquery-export/firestore.js

Lines changed: 86 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -19,97 +19,45 @@ export class FirestoreBatch {
1919

2020
// Configuration constants
2121
this.config = {
22-
batchSize: {
23-
delete: 500,
24-
write: 400
25-
},
26-
maxConcurrentBatches: 200,
27-
retryCount: 5,
28-
timeout: 10 * 60 * 1000 // 10 minutes
22+
timeout: 10 * 60 * 1000, // 10 minutes
23+
progressReportInterval: 200000, // Report progress every N operations
24+
flushThreshold: 200000 // Flush BulkWriter every N operations
2925
}
3026

3127
this.reset()
3228
}
3329

3430
reset () {
35-
this.currentBatch = []
36-
this.batchPromises = []
31+
this.processedDocs = 0
32+
this.totalDocs = 0
33+
this.bulkWriter = null
3734
}
3835

39-
getCurrentBatchSize (operation) {
40-
return this.config.batchSize[operation === 'delete' ? 'delete' : 'write']
41-
}
36+
createBulkWriter (operation) {
37+
const bulkWriter = this.firestore.bulkWriter()
4238

43-
async commitWithRetry (batch, index) {
44-
let lastError
45-
46-
for (let attempt = 1; attempt <= this.config.retryCount; attempt++) {
47-
try {
48-
await batch.commit()
49-
return
50-
} catch (error) {
51-
lastError = error
52-
console.warn(`Batch ${index} attempt ${attempt} failed:`, error.message)
53-
54-
if (attempt < this.config.retryCount) {
55-
const delayMs = Math.pow(2, attempt) * 500
56-
console.log(`Retrying batch ${index} in ${delayMs}ms...`)
57-
await new Promise(resolve => setTimeout(resolve, delayMs))
58-
}
59-
}
60-
}
39+
// Configure error handling with progress info
40+
bulkWriter.onWriteError((error) => {
41+
const progressInfo = this.totalDocs > 0 ? ` (${this.processedDocs}/${this.totalDocs})` : ''
42+
console.warn(`${operation} operation failed${progressInfo}:`, error.message)
6143

62-
console.error(`Batch ${index} failed after ${this.config.retryCount} attempts:`, lastError)
63-
throw lastError
64-
}
65-
66-
createBatch (operation) {
67-
const batch = this.firestore.batch()
68-
69-
this.currentBatch.forEach((doc) => {
70-
if (operation === 'delete') {
71-
batch.delete(doc.ref)
72-
} else if (operation === 'set') {
73-
const docRef = this.firestore.collection(this.collectionName).doc()
74-
batch.set(docRef, doc)
75-
} else {
76-
throw new Error(`Invalid operation: ${operation}`)
77-
}
44+
// Retry on transient errors, fail on permanent ones
45+
const retryableErrors = ['deadline-exceeded', 'unavailable', 'resource-exhausted']
46+
return retryableErrors.includes(error.code)
7847
})
7948

80-
return batch
81-
}
82-
83-
queueBatch (operation) {
84-
const batch = this.createBatch(operation)
85-
this.batchPromises.push(batch)
86-
this.currentBatch = []
87-
}
88-
89-
async commitBatches () {
90-
if (this.batchPromises.length === 0) return
91-
92-
console.log(`Committing ${this.batchPromises.length} batches to ${this.collectionName}`)
93-
94-
await Promise.all(
95-
this.batchPromises.map((batch, index) =>
96-
this.commitWithRetry(batch, index)
97-
)
98-
)
49+
// Track progress on successful writes
50+
bulkWriter.onWriteResult(() => {
51+
this.processedDocs++
9952

100-
this.batchPromises = []
101-
}
102-
103-
async processInBatches (operation, shouldFlush = false) {
104-
const batchSize = this.getCurrentBatchSize(operation)
105-
106-
if (this.currentBatch.length >= batchSize || shouldFlush) {
107-
this.queueBatch(operation)
108-
}
53+
// Report progress periodically
54+
if (this.processedDocs % this.config.progressReportInterval === 0) {
55+
const progressInfo = this.totalDocs > 0 ? ` (${this.processedDocs}/${this.totalDocs})` : ` (${this.processedDocs} processed)`
56+
console.log(`Progress${progressInfo} - ${operation}ing documents in ${this.collectionName}`)
57+
}
58+
})
10959

110-
if (this.batchPromises.length >= this.config.maxConcurrentBatches || shouldFlush) {
111-
await this.commitBatches()
112-
}
60+
return bulkWriter
11361
}
11462

11563
buildQuery (collectionRef) {
@@ -132,52 +80,93 @@ export class FirestoreBatch {
13280
return queryBuilder()
13381
}
13482

83+
async getDocumentCount (query) {
84+
try {
85+
const countSnapshot = await query.count().get()
86+
return countSnapshot.data().count
87+
} catch (error) {
88+
console.warn('Could not get document count for progress tracking:', error.message)
89+
return 0
90+
}
91+
}
92+
13593
async batchDelete () {
13694
console.info('Starting batch deletion...')
13795
const startTime = Date.now()
13896
this.reset()
13997

140-
let totalDocsDeleted = 0
14198
const collectionRef = this.firestore.collection(this.collectionName)
14299
const collectionQuery = this.buildQuery(collectionRef)
143-
const batchSize = this.getCurrentBatchSize('delete')
144100

145-
while (true) {
146-
const snapshot = await collectionQuery.limit(batchSize * this.config.maxConcurrentBatches).get()
101+
// Get total count for progress tracking
102+
this.totalDocs = await this.getDocumentCount(collectionQuery)
103+
if (this.totalDocs > 0) {
104+
console.info(`Total documents to delete: ${this.totalDocs}`)
105+
}
106+
107+
// Create BulkWriter for delete operations
108+
this.bulkWriter = this.createBulkWriter('delet')
109+
110+
let deletedCount = 0
111+
const batchSize = this.config.flushThreshold // Process documents in chunks
112+
113+
while (deletedCount < this.totalDocs || this.totalDocs === 0) {
114+
const snapshot = await collectionQuery.limit(batchSize).get()
147115
if (snapshot.empty) break
148116

149-
for (const doc of snapshot.docs) {
150-
this.currentBatch.push(doc)
151-
await this.processInBatches('delete')
152-
totalDocsDeleted++
153-
}
117+
// Add all delete operations to BulkWriter
118+
snapshot.docs.forEach(doc => {
119+
this.bulkWriter.delete(doc.ref)
120+
deletedCount++
121+
})
122+
123+
// Periodically flush to manage memory
124+
// if (deletedCount % this.config.flushThreshold === 0) {
125+
console.log(`Flushing BulkWriter at ${deletedCount} operations...`)
126+
await this.bulkWriter.flush()
127+
// }
154128
}
155129

156-
// Final flush
157-
await this.processInBatches('delete', true)
130+
// Final flush and close
131+
console.log('Finalizing deletion operations...')
132+
await this.bulkWriter.close()
158133

159134
const duration = (Date.now() - startTime) / 1000
160-
console.info(`Deletion complete. Total docs deleted: ${totalDocsDeleted}. Time: ${duration} seconds`)
135+
console.info(`Deletion complete. Total docs deleted: ${this.processedDocs}. Time: ${duration} seconds`)
161136
}
162137

163138
async streamFromBigQuery (rowStream) {
164139
console.info('Starting BigQuery to Firestore transfer...')
165140
const startTime = Date.now()
166-
let totalRowsProcessed = 0
167-
168141
this.reset()
169142

143+
// Create BulkWriter for write operations
144+
this.bulkWriter = this.createBulkWriter('writ')
145+
146+
let rowCount = 0
147+
const collectionRef = this.firestore.collection(this.collectionName)
148+
170149
for await (const row of rowStream) {
171-
this.currentBatch.push(row)
172-
await this.processInBatches('set')
173-
totalRowsProcessed++
150+
// Add document to BulkWriter
151+
const docRef = collectionRef.doc()
152+
this.bulkWriter.set(docRef, row)
153+
154+
rowCount++
155+
this.totalDocs = rowCount // Update total as we go since we can't predict BigQuery result size
156+
157+
// Periodically flush to manage memory
158+
if (rowCount % this.config.flushThreshold === 0) {
159+
console.log(`Flushing BulkWriter at ${rowCount} operations...`)
160+
await this.bulkWriter.flush()
161+
}
174162
}
175163

176-
// Final flush
177-
await this.processInBatches('set', true)
164+
// Final flush and close
165+
console.log('Finalizing write operations...')
166+
await this.bulkWriter.close()
178167

179168
const duration = (Date.now() - startTime) / 1000
180-
console.info(`Transfer to ${this.collectionName} complete. Total rows processed: ${totalRowsProcessed}. Time: ${duration} seconds`)
169+
console.info(`Transfer to ${this.collectionName} complete. Total rows processed: ${this.processedDocs}. Time: ${duration} seconds`)
181170
}
182171

183172
async export (query, exportConfig) {

infra/bigquery-export/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"main": "index.js",
55
"scripts": {
66
"start": "node index.js",
7-
"buildpack": "rm -rf node_modules; gcloud builds submit --pack image=us.gcr.io/httparchive/cloud-run/bigquery-export"
7+
"build": "gcloud builds submit"
88
},
99
"type": "module",
1010
"dependencies": {

0 commit comments

Comments
 (0)