Skip to content

Commit 8e021f8

Browse files
committed
feat(task): record reporter uuid when commiting task
1 parent 1daa2ec commit 8e021f8

14 files changed

Lines changed: 145 additions & 10 deletions

File tree

netmito/src/api/workers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ pub async fn worker_report_task(
150150
State(pool): State<InfraPool>,
151151
Json(ReportTaskReq { id, op }): Json<ReportTaskReq>,
152152
) -> ApiResult<Json<ReportTaskResp>> {
153-
match service::worker::report_task(w.id, id, op, &pool).await {
153+
match service::worker::report_task(w.id, w.uuid, id, op, &pool).await {
154154
Ok(url) => Ok(Json(ReportTaskResp { url })),
155155
Err(Error::ApiError(e)) => Err(e),
156156
Err(e) => {

netmito/src/client/interactive.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ pub(crate) fn output_parsed_task_info(info: &ParsedTaskQueryInfo) {
108108
if let Some(downstream_task_uuid) = info.downstream_task_uuid {
109109
tracing::info!("Downstream Task UUID: {:?}", downstream_task_uuid);
110110
}
111+
if let Some(reporter_uuid) = info.reporter_uuid {
112+
tracing::info!("Reporter UUID: {}", reporter_uuid);
113+
}
111114
}
112115

113116
pub(crate) fn output_task_info(info: &TaskQueryInfo) {
@@ -140,6 +143,9 @@ pub(crate) fn output_task_info(info: &TaskQueryInfo) {
140143
if let Some(downstream_task_uuid) = info.downstream_task_uuid {
141144
tracing::info!("Downstream Task UUID: {:?}", downstream_task_uuid);
142145
}
146+
if let Some(reporter_uuid) = info.reporter_uuid {
147+
tracing::info!("Reporter UUID: {}", reporter_uuid);
148+
}
143149
}
144150

145151
pub(crate) fn output_worker_list_info<T: std::fmt::Display>(

netmito/src/config/client/artifacts.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ pub struct DownloadArtifactsByFilterArgs {
103103
/// The priority of the tasks, support operators like `=`(default), `!=`, `<`, `<=`, `>`, `>=`
104104
#[arg(short, long)]
105105
pub priority: Option<String>,
106+
/// Filter by reporter worker UUID (only returns artifacts from completed tasks reported by this worker)
107+
#[arg(long)]
108+
pub reporter_uuid: Option<Uuid>,
106109
/// Specify the directory to download artifacts
107110
#[arg(short, long = "output")]
108111
pub output_dir: Option<PathBuf>,
@@ -142,6 +145,7 @@ pub struct DownloadArtifactsByListArgs {
142145
impl From<DownloadArtifactsByFilterArgs> for ArtifactsDownloadByFilterReq {
143146
fn from(args: DownloadArtifactsByFilterArgs) -> Self {
144147
Self {
148+
reporter_uuid: args.reporter_uuid,
145149
creator_usernames: if args.creators.is_empty() {
146150
None
147151
} else {
@@ -205,6 +209,9 @@ pub struct DeleteArtifactsByFilterArgs {
205209
/// The priority of the tasks, support operators like `=`(default), `!=`, `<`, `<=`, `>`, `>=`
206210
#[arg(short, long)]
207211
pub priority: Option<String>,
212+
/// Filter by reporter worker UUID (only deletes artifacts from completed tasks reported by this worker)
213+
#[arg(long)]
214+
pub reporter_uuid: Option<Uuid>,
208215
}
209216

210217
#[derive(Serialize, Debug, Deserialize, Args, Clone)]
@@ -220,6 +227,7 @@ pub struct DeleteArtifactsByListArgs {
220227
impl From<DeleteArtifactsByFilterArgs> for ArtifactsDeleteByFilterReq {
221228
fn from(args: DeleteArtifactsByFilterArgs) -> Self {
222229
Self {
230+
reporter_uuid: args.reporter_uuid,
223231
creator_usernames: if args.creators.is_empty() {
224232
None
225233
} else {

netmito/src/config/client/tasks.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ pub struct QueryTasksArgs {
106106
/// The priority of the tasks, support operators like `=`(default), `!=`, `<`, `<=`, `>`, `>=`
107107
#[arg(short, long)]
108108
pub priority: Option<String>,
109+
/// Filter by reporter worker UUID (only returns completed tasks reported by this worker)
110+
#[arg(long)]
111+
pub reporter_uuid: Option<Uuid>,
109112
/// The limit of the tasks to query
110113
#[arg(long)]
111114
pub limit: Option<u64>,
@@ -200,6 +203,7 @@ pub struct ChangeTaskArgs {
200203
impl From<QueryTasksArgs> for TasksQueryReq {
201204
fn from(args: QueryTasksArgs) -> Self {
202205
Self {
206+
reporter_uuid: args.reporter_uuid,
203207
creator_usernames: if args.creators.is_empty() {
204208
None
205209
} else {

netmito/src/entity/active_tasks.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ impl Related<super::users::Entity> for Entity {
6262

6363
impl ActiveModelBehavior for ActiveModel {}
6464

65+
/// This conversion is used when archiving an active task.
66+
/// But do remember to set reporter_uuid separately after conversion
6567
impl From<Model> for super::archived_tasks::Model {
6668
fn from(task: Model) -> super::archived_tasks::Model {
6769
Self {
@@ -82,6 +84,7 @@ impl From<Model> for super::archived_tasks::Model {
8284
result: task.result,
8385
upstream_task_uuid: task.upstream_task_uuid,
8486
downstream_task_uuid: task.downstream_task_uuid,
87+
reporter_uuid: None,
8588
}
8689
}
8790
}

netmito/src/entity/archived_tasks.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub struct Model {
2626
pub result: Option<Json>,
2727
pub upstream_task_uuid: Option<Uuid>,
2828
pub downstream_task_uuid: Option<Uuid>,
29+
pub reporter_uuid: Option<Uuid>,
2930
}
3031

3132
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use sea_orm_migration::prelude::*;
2+
3+
#[derive(DeriveMigrationName)]
4+
pub struct Migration;
5+
6+
#[async_trait::async_trait]
7+
impl MigrationTrait for Migration {
8+
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
9+
manager
10+
.alter_table(
11+
Table::alter()
12+
.table(ArchivedTasks::Table)
13+
.add_column_if_not_exists(
14+
ColumnDef::new(ArchivedTasks::ReporterUuid).uuid().null(),
15+
)
16+
.to_owned(),
17+
)
18+
.await?;
19+
manager
20+
.create_index(
21+
sea_query::Index::create()
22+
.if_not_exists()
23+
.name("idx_archived_tasks-reporter_uuid")
24+
.table(ArchivedTasks::Table)
25+
.col(ArchivedTasks::ReporterUuid)
26+
.and_where(Expr::col(ArchivedTasks::ReporterUuid).is_not_null())
27+
.to_owned(),
28+
)
29+
.await
30+
}
31+
32+
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
33+
manager
34+
.drop_index(
35+
sea_query::Index::drop()
36+
.name("idx_archived_tasks-reporter_uuid")
37+
.to_owned(),
38+
)
39+
.await?;
40+
manager
41+
.alter_table(
42+
Table::alter()
43+
.table(ArchivedTasks::Table)
44+
.drop_column(ArchivedTasks::ReporterUuid)
45+
.to_owned(),
46+
)
47+
.await
48+
}
49+
}
50+
51+
#[derive(DeriveIden)]
52+
enum ArchivedTasks {
53+
Table,
54+
ReporterUuid,
55+
}

netmito/src/migration/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod m20250910_192001_add_performance_indices;
66
mod m20250910_205347_add_labels_to_workers;
77
mod m20250911_025409_add_gin_indices_on_tags;
88
mod m20250915_111689_add_tasks_trigger;
9+
mod m20251031_000001_add_reporter_uuid_to_archived_tasks;
910

1011
pub struct Migrator;
1112

@@ -19,6 +20,7 @@ impl MigratorTrait for Migrator {
1920
Box::new(m20250910_205347_add_labels_to_workers::Migration),
2021
Box::new(m20250911_025409_add_gin_indices_on_tags::Migration),
2122
Box::new(m20250915_111689_add_tasks_trigger::Migration),
23+
Box::new(m20251031_000001_add_reporter_uuid_to_archived_tasks::Migration),
2224
]
2325
}
2426
}

netmito/src/schema.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ pub struct TaskQueryInfo {
258258
pub result: Option<serde_json::Value>,
259259
pub upstream_task_uuid: Option<Uuid>,
260260
pub downstream_task_uuid: Option<Uuid>,
261+
pub reporter_uuid: Option<Uuid>,
261262
}
262263

263264
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -277,6 +278,7 @@ pub struct ParsedTaskQueryInfo {
277278
pub result: Option<TaskResultSpec>,
278279
pub upstream_task_uuid: Option<Uuid>,
279280
pub downstream_task_uuid: Option<Uuid>,
281+
pub reporter_uuid: Option<Uuid>,
280282
}
281283

282284
/// Each field in the query request is optional, and the server will return all tasks if no field is specified.
@@ -291,6 +293,8 @@ pub struct TasksQueryReq {
291293
pub states: Option<HashSet<TaskState>>,
292294
pub exit_status: Option<String>,
293295
pub priority: Option<String>,
296+
/// Set reporter_uuid will automatically exclude all non-completed tasks.
297+
pub reporter_uuid: Option<Uuid>,
294298
pub limit: Option<u64>,
295299
pub offset: Option<u64>,
296300
pub count: bool,
@@ -556,8 +560,9 @@ pub struct WorkerShutdown {
556560
pub op: Option<WorkerShutdownOp>,
557561
}
558562

559-
#[derive(Debug, Serialize, Deserialize, Clone)]
563+
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
560564
pub enum WorkerShutdownOp {
565+
#[default]
561566
#[serde(alias = "graceful")]
562567
Graceful,
563568
#[serde(alias = "force")]
@@ -575,6 +580,8 @@ pub struct ArtifactsDownloadByFilterReq {
575580
pub states: Option<HashSet<TaskState>>,
576581
pub exit_status: Option<String>,
577582
pub priority: Option<String>,
583+
/// Set reporter_uuid will automatically exclude all non-completed tasks.
584+
pub reporter_uuid: Option<Uuid>,
578585
pub content_type: ArtifactContentType,
579586
}
580587

@@ -638,6 +645,8 @@ pub struct ArtifactsDeleteByFilterReq {
638645
pub states: Option<HashSet<TaskState>>,
639646
pub exit_status: Option<String>,
640647
pub priority: Option<String>,
648+
/// Set reporter_uuid will automatically exclude all non-completed tasks.
649+
pub reporter_uuid: Option<Uuid>,
641650
pub content_type: ArtifactContentType,
642651
}
643652

@@ -702,12 +711,6 @@ pub struct TasksSubmitResp {
702711
pub results: Vec<Result<SubmitTaskResp, crate::error::ErrorMsg>>,
703712
}
704713

705-
impl Default for WorkerShutdownOp {
706-
fn default() -> Self {
707-
Self::Graceful
708-
}
709-
}
710-
711714
impl TaskSpec {
712715
pub fn new<T, I, P, Q, V, U>(
713716
args: I,

netmito/src/service/auth/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub struct AuthAdminUser {
4242
#[derive(Debug, Clone)]
4343
pub struct AuthWorker {
4444
pub id: i64,
45+
pub uuid: Uuid,
4546
}
4647

4748
pub(crate) fn get_and_prompt_username(
@@ -344,5 +345,8 @@ async fn worker_auth(db: &DatabaseConnection, bearer: &Bearer) -> Result<AuthWor
344345
.await
345346
.map_err(|_| AuthError::WrongCredentials)?
346347
.ok_or(AuthError::WrongCredentials)?;
347-
Ok(AuthWorker { id: worker.id })
348+
Ok(AuthWorker {
349+
id: worker.id,
350+
uuid,
351+
})
348352
}

0 commit comments

Comments
 (0)