Skip to content

Commit c7f0622

Browse files
committed
fix(task): use subquery to avoid arguments limit of sql
- use chunked operations, subquery and CTE for batch operations
1 parent ebe0156 commit c7f0622

4 files changed

Lines changed: 566 additions & 454 deletions

File tree

netmito/src/schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ pub struct WorkerShutdown {
560560
pub op: Option<WorkerShutdownOp>,
561561
}
562562

563-
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
563+
#[derive(Debug, Serialize, Deserialize, Clone, Default, Copy)]
564564
pub enum WorkerShutdownOp {
565565
#[default]
566566
#[serde(alias = "graceful")]

netmito/src/service/s3.rs

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,19 @@ async fn generate_artifact_downloads(
179179
uuids: Vec<Uuid>,
180180
content_type: ArtifactContentType,
181181
) -> Result<Vec<ArtifactDownloadItem>, crate::error::Error> {
182-
// Query artifacts for these tasks with the specified content type
183-
let artifacts = Artifact::Entity::find()
184-
.filter(Artifact::Column::TaskId.is_in(uuids))
185-
.filter(Artifact::Column::ContentType.eq(content_type))
186-
.all(&pool.db)
187-
.await?;
182+
// Chunk UUIDs to avoid PostgreSQL parameter limit
183+
let mut all_artifacts = Vec::new();
184+
185+
for chunk in uuids.chunks(2048) {
186+
let artifacts = Artifact::Entity::find()
187+
.filter(Artifact::Column::TaskId.is_in(chunk.to_vec()))
188+
.filter(Artifact::Column::ContentType.eq(content_type))
189+
.all(&pool.db)
190+
.await?;
191+
all_artifacts.extend(artifacts);
192+
}
193+
194+
let artifacts = all_artifacts;
188195

189196
// Create a set of task UUIDs that have artifacts
190197
let mut downloads = Vec::with_capacity(artifacts.len());
@@ -710,12 +717,20 @@ async fn generate_attachment_downloads(
710717
"Group {} not found",
711718
group_name
712719
))))?;
713-
// Query attachments for the specified keys
714-
let attachments = Attachment::Entity::find()
715-
.filter(Attachment::Column::Key.is_in(keys.clone()))
716-
.filter(Attachment::Column::GroupId.eq(group.id))
717-
.all(&pool.db)
718-
.await?;
720+
721+
// Chunk keys to avoid PostgreSQL parameter limit
722+
let mut all_attachments = Vec::new();
723+
724+
for chunk in keys.chunks(2048) {
725+
let attachments = Attachment::Entity::find()
726+
.filter(Attachment::Column::Key.is_in(chunk.to_vec()))
727+
.filter(Attachment::Column::GroupId.eq(group.id))
728+
.all(&pool.db)
729+
.await?;
730+
all_attachments.extend(attachments);
731+
}
732+
733+
let attachments = all_attachments;
719734

720735
// Verify all attachments belong to the specified group and generate URLs
721736
let mut downloads = Vec::new();
@@ -1244,12 +1259,20 @@ async fn delete_artifacts_by_uuids_internal(
12441259
uuids: Vec<Uuid>,
12451260
content_type: ArtifactContentType,
12461261
) -> Result<(u64, Vec<Uuid>), crate::error::Error> {
1247-
// Query artifacts for these tasks with the specified content type
1248-
let artifacts = Artifact::Entity::find()
1249-
.filter(Artifact::Column::TaskId.is_in(uuids.clone()))
1250-
.filter(Artifact::Column::ContentType.eq(content_type))
1251-
.all(&pool.db)
1252-
.await?;
1262+
// Chunk UUIDs to avoid PostgreSQL parameter limit
1263+
1264+
let mut all_artifacts = Vec::new();
1265+
1266+
for chunk in uuids.chunks(2048) {
1267+
let artifacts = Artifact::Entity::find()
1268+
.filter(Artifact::Column::TaskId.is_in(chunk.to_vec()))
1269+
.filter(Artifact::Column::ContentType.eq(content_type))
1270+
.all(&pool.db)
1271+
.await?;
1272+
all_artifacts.extend(artifacts);
1273+
}
1274+
1275+
let artifacts = all_artifacts;
12531276

12541277
let mut deleted_count = 0u64;
12551278
let mut deleted_uuids = std::collections::HashSet::new();

0 commit comments

Comments
 (0)