Skip to content

Commit aca1abc

Browse files
committed
fix: stream row by row
Signed-off-by: Mouad BANI <mouad-mb@outlook.com>
1 parent 7fc7a4e commit aca1abc

2 files changed

Lines changed: 9 additions & 13 deletions

File tree

services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,11 @@ export class TransformerConsumer {
8484
const platform = job.platform as PlatformType
8585
const source = getDataSource(platform, job.sourceName)
8686

87-
let rows: Record<string, unknown>[] | null = await this.s3Service.readParquetRows(job.s3Path)
88-
8987
let transformedCount = 0
9088
let transformSkippedCount = 0
9189
let resolveSkippedCount = 0
9290

93-
for (let i = 0; i < rows.length; i++) {
94-
const row = rows[i]
91+
for await (const row of this.s3Service.streamParquetRows(job.s3Path)) {
9592
const result = source.transformer.safeTransformRow(row)
9693
if (!result) {
9794
transformSkippedCount++
@@ -112,8 +109,6 @@ export class TransformerConsumer {
112109
transformedCount++
113110
}
114111

115-
rows = null
116-
117112
const skippedCount = transformSkippedCount + resolveSkippedCount
118113
const processingMetrics = {
119114
transformedCount,

services/apps/snowflake_connectors/src/core/s3Service.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,19 @@ export class S3Service {
4141
return Buffer.from(byteArray)
4242
}
4343

44-
async readParquetRows(s3Uri: string): Promise<Record<string, unknown>[]> {
44+
async *streamParquetRows(s3Uri: string): AsyncGenerator<Record<string, unknown>> {
4545
let buffer: Buffer | null = await this.downloadFile(s3Uri)
4646
const reader = await ParquetReader.openBuffer(buffer)
4747
buffer = null
4848
const cursor = reader.getCursor()
49-
const rows: Record<string, unknown>[] = []
50-
let row: Record<string, unknown> | null = null
51-
while ((row = (await cursor.next()) as Record<string, unknown> | null) !== null) {
52-
rows.push(row)
49+
try {
50+
let row: Record<string, unknown> | null = null
51+
while ((row = (await cursor.next()) as Record<string, unknown> | null) !== null) {
52+
yield row
53+
}
54+
} finally {
55+
await reader.close()
5356
}
54-
await reader.close()
55-
return rows
5657
}
5758

5859
async deleteFile(s3Uri: string): Promise<void> {

0 commit comments

Comments
 (0)