Skip to content

Commit 5243e0b

Browse files
authored
Merge pull request #818 from Extra-Chill/fix/issue-815-artifact-cleanup-drain
Fix artifact cleanup child drainability
2 parents a49da36 + 5ef8b22 commit 5243e0b

5 files changed

Lines changed: 385 additions & 25 deletions

File tree

inc/Cleanup/DataMachineJobCleanupRunEvidenceStore.php

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
namespace DataMachineCode\Cleanup;
99

10+
use DataMachineCode\Support\SystemTaskDrainability;
11+
1012
defined('ABSPATH') || exit;
1113

1214
class DataMachineJobCleanupRunEvidenceStore implements CleanupRunEvidenceStoreInterface {
@@ -126,19 +128,20 @@ private function aggregate_cleanup_child_jobs( array $child_jobs ): array {
126128
'remaining_safely_removable_worktrees' => 0,
127129
),
128130
'children' => array(
129-
'batch_job_ids' => array(),
130-
'chunk_job_ids' => array(),
131-
'pending_job_ids' => array(),
132-
'processing_job_ids' => array(),
133-
'failed_job_ids' => array(),
134-
'processing' => 0,
135-
'completed' => 0,
136-
'failed' => 0,
137-
'skipped' => 0,
138-
'running' => 0,
139-
'total' => 0,
140-
'statuses' => array(),
141-
'job_ids' => array(),
131+
'batch_job_ids' => array(),
132+
'chunk_job_ids' => array(),
133+
'pending_job_ids' => array(),
134+
'pending_without_drainable_action_job_ids' => array(),
135+
'processing_job_ids' => array(),
136+
'failed_job_ids' => array(),
137+
'processing' => 0,
138+
'completed' => 0,
139+
'failed' => 0,
140+
'skipped' => 0,
141+
'running' => 0,
142+
'total' => 0,
143+
'statuses' => array(),
144+
'job_ids' => array(),
142145
),
143146
);
144147

@@ -162,6 +165,9 @@ private function aggregate_cleanup_child_jobs( array $child_jobs ): array {
162165
$summary['children']['job_ids'][] = $child_job_id;
163166
if ( 'pending' === $status ) {
164167
$summary['children']['pending_job_ids'][] = $child_job_id;
168+
if ( ! SystemTaskDrainability::job_has_execute_step_action($child_job_id) ) {
169+
$summary['children']['pending_without_drainable_action_job_ids'][] = $child_job_id;
170+
}
165171
} elseif ( 'processing' === $status && ! $idle_wrapper ) {
166172
$summary['children']['processing_job_ids'][] = $child_job_id;
167173
} elseif ( str_starts_with($status, 'failed') ) {
@@ -204,11 +210,12 @@ private function aggregate_cleanup_child_jobs( array $child_jobs ): array {
204210
$summary['cleanup_items']['freed_human'] = $this->format_bytes($summary['cleanup_items']['bytes_reclaimed']);
205211
$summary['children']['batch_job_ids'] = array_values(array_unique($summary['children']['batch_job_ids']));
206212
$summary['children']['chunk_job_ids'] = array_values(array_unique($summary['children']['chunk_job_ids']));
207-
$summary['children']['pending_job_ids'] = array_values(array_unique($summary['children']['pending_job_ids']));
208-
$summary['children']['processing_job_ids'] = array_values(array_unique($summary['children']['processing_job_ids']));
209-
$summary['children']['failed_job_ids'] = array_values(array_unique($summary['children']['failed_job_ids']));
210-
$summary['children']['job_ids'] = array_values(array_unique($summary['children']['job_ids']));
211-
$summary['children']['running'] = (int) $summary['children']['processing'];
213+
$summary['children']['pending_job_ids'] = array_values(array_unique($summary['children']['pending_job_ids']));
214+
$summary['children']['pending_without_drainable_action_job_ids'] = array_values(array_unique($summary['children']['pending_without_drainable_action_job_ids']));
215+
$summary['children']['processing_job_ids'] = array_values(array_unique($summary['children']['processing_job_ids']));
216+
$summary['children']['failed_job_ids'] = array_values(array_unique($summary['children']['failed_job_ids']));
217+
$summary['children']['job_ids'] = array_values(array_unique($summary['children']['job_ids']));
218+
$summary['children']['running'] = (int) $summary['children']['processing'];
212219

213220
return $summary;
214221
}
@@ -224,6 +231,7 @@ private function summarize_cleanup_children( array $children ): array {
224231
$batch_ids = (array) ( $children['batch_job_ids'] ?? array() );
225232
$chunk_ids = (array) ( $children['chunk_job_ids'] ?? array() );
226233
$pending = (array) ( $children['pending_job_ids'] ?? array() );
234+
$undrainable = (array) ( $children['pending_without_drainable_action_job_ids'] ?? array() );
227235
$processing = (array) ( $children['processing_job_ids'] ?? array() );
228236

229237
return array(
@@ -238,6 +246,7 @@ private function summarize_cleanup_children( array $children ): array {
238246
'chunk_total' => count($chunk_ids),
239247
'failed_job_ids' => (array) ( $children['failed_job_ids'] ?? array() ),
240248
'pending_job_ids' => array_slice($pending, 0, $limit),
249+
'pending_without_drainable_action_job_ids' => array_slice($undrainable, 0, $limit),
241250
'processing_job_ids' => array_slice($processing, 0, $limit),
242251
'pending_truncated' => count($pending) > $limit,
243252
'processing_truncated' => count($processing) > $limit,
@@ -277,13 +286,24 @@ private function cleanup_run_drain_summary( int $job_id, string $state, array $c
277286
if ( array() !== $active_child_ids ) {
278287
$commands['active_children'] = sprintf('studio wp datamachine drain --job-id=%s', implode(',', $active_child_ids));
279288
}
289+
$undrainable_child_ids = array_values(
290+
array_unique(
291+
array_filter(
292+
array_map('intval', (array) ( $children['pending_without_drainable_action_job_ids'] ?? array() ))
293+
)
294+
)
295+
);
296+
if ( array() !== $undrainable_child_ids ) {
297+
$commands['repair_undrainable_children'] = sprintf('studio wp datamachine-code workspace cleanup resume %s --format=json', $run_id);
298+
}
280299

281300
return array(
282301
'needed' => in_array($state, array( 'running', 'waiting_on_children' ), true),
283302
'commands' => $commands,
284-
'active_child_job_ids' => $active_child_ids,
285-
'bytes_reclaimed' => (int) ( $cleanup_items['bytes_reclaimed'] ?? 0 ),
286-
'freed_human' => (string) ( $cleanup_items['freed_human'] ?? $this->format_bytes(0) ),
303+
'active_child_job_ids' => $active_child_ids,
304+
'undrainable_child_job_ids' => $undrainable_child_ids,
305+
'bytes_reclaimed' => (int) ( $cleanup_items['bytes_reclaimed'] ?? 0 ),
306+
'freed_human' => (string) ( $cleanup_items['freed_human'] ?? $this->format_bytes(0) ),
287307
);
288308
}
289309

inc/Cli/Commands/WorkspaceCommand.php

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,13 +1310,18 @@ private function cleanup_run_control_job_ids( string $operation, int $job_id ):
13101310
return array( $job_id );
13111311
}
13121312

1313-
$children = (array) ( $output['evidence']['children'] ?? array() );
1314-
$processing_ids = array_map('intval', (array) ( $children['processing_job_ids'] ?? array() ));
1315-
$failed_ids = array_map('intval', (array) ( $children['failed_job_ids'] ?? array() ));
1316-
$pending_ids = array_map('intval', (array) ( $children['pending_job_ids'] ?? array() ));
1313+
$children = (array) ( $output['evidence']['children'] ?? array() );
1314+
$processing_ids = array_map('intval', (array) ( $children['processing_job_ids'] ?? array() ));
1315+
$failed_ids = array_map('intval', (array) ( $children['failed_job_ids'] ?? array() ));
1316+
$pending_ids = array_map('intval', (array) ( $children['pending_job_ids'] ?? array() ));
1317+
$undrainable_ids = array_map('intval', (array) ( $children['pending_without_drainable_action_job_ids'] ?? array() ));
13171318

13181319
if ( 'resume' === $operation ) {
1320+
$repair = \DataMachineCode\Support\SystemTaskDrainability::ensure_jobs_have_execute_step_actions($undrainable_ids);
13191321
$child_targets = array_values(array_unique(array_filter(array_merge($processing_ids, $failed_ids))));
1322+
if ( array() === $child_targets && (int) ( $repair['repaired'] ?? 0 ) > 0 ) {
1323+
return array();
1324+
}
13201325
return array() !== $child_targets ? $child_targets : array( $job_id );
13211326
}
13221327

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
<?php
2+
/**
3+
* Data Machine system-task drainability helpers.
4+
*
5+
* @package DataMachineCode\Support
6+
*/
7+
8+
namespace DataMachineCode\Support;
9+
10+
defined('ABSPATH') || exit;
11+
12+
class SystemTaskDrainability {
13+
14+
private const EXECUTE_STEP_HOOK = 'datamachine_execute_step';
15+
16+
private const ACTION_GROUP = 'data-machine';
17+
18+
/**
19+
* Ensure a pending workflow job has the Action Scheduler action that drain can run.
20+
*
21+
* @param int $job_id Data Machine job ID.
22+
* @return bool True when an action already exists or was scheduled.
23+
*/
24+
public static function ensure_job_has_execute_step_action( int $job_id ): bool {
25+
if ( $job_id <= 0 || self::job_has_execute_step_action($job_id) || ! function_exists('as_schedule_single_action') ) {
26+
return $job_id > 0 && self::job_has_execute_step_action($job_id);
27+
}
28+
29+
$flow_step_id = self::first_step_id($job_id);
30+
if ( '' === $flow_step_id ) {
31+
return false;
32+
}
33+
34+
$action_id = as_schedule_single_action(
35+
time(),
36+
self::EXECUTE_STEP_HOOK,
37+
array(
38+
'job_id' => $job_id,
39+
'flow_step_id' => $flow_step_id,
40+
),
41+
self::ACTION_GROUP
42+
);
43+
44+
if ( false === $action_id ) {
45+
return false;
46+
}
47+
48+
if ( function_exists('datamachine_merge_engine_data') ) {
49+
datamachine_merge_engine_data(
50+
$job_id,
51+
array(
52+
'drainability_repair' => array(
53+
'action_id' => (int) $action_id,
54+
'hook' => self::EXECUTE_STEP_HOOK,
55+
'flow_step_id' => $flow_step_id,
56+
'repaired_at' => gmdate('c'),
57+
),
58+
)
59+
);
60+
}
61+
62+
return true;
63+
}
64+
65+
/**
66+
* Ensure every supplied job has drainable work and return repair stats.
67+
*
68+
* @param array<int,int|string> $job_ids Data Machine job IDs.
69+
* @return array{checked:int,repaired:int,unrepairable:array<int,int>}
70+
*/
71+
public static function ensure_jobs_have_execute_step_actions( array $job_ids ): array {
72+
$checked = 0;
73+
$repaired = 0;
74+
$unrepairable = array();
75+
76+
foreach ( array_values(array_unique(array_map('intval', $job_ids))) as $job_id ) {
77+
if ( $job_id <= 0 || ! self::is_pending_job($job_id) ) {
78+
continue;
79+
}
80+
81+
++$checked;
82+
$had_action = self::job_has_execute_step_action($job_id);
83+
if ( self::ensure_job_has_execute_step_action($job_id) ) {
84+
if ( ! $had_action ) {
85+
++$repaired;
86+
}
87+
continue;
88+
}
89+
90+
$unrepairable[] = $job_id;
91+
}
92+
93+
return array(
94+
'checked' => $checked,
95+
'repaired' => $repaired,
96+
'unrepairable' => $unrepairable,
97+
);
98+
}
99+
100+
/**
101+
* Return pending jobs from a list that lack a drainable execute-step action.
102+
*
103+
* @param array<int,int|string> $job_ids Data Machine job IDs.
104+
* @return array<int,int>
105+
*/
106+
public static function pending_jobs_missing_execute_step_actions( array $job_ids ): array {
107+
$missing = array();
108+
foreach ( array_values(array_unique(array_map('intval', $job_ids))) as $job_id ) {
109+
if ( $job_id > 0 && self::is_pending_job($job_id) && ! self::job_has_execute_step_action($job_id) ) {
110+
$missing[] = $job_id;
111+
}
112+
}
113+
114+
return $missing;
115+
}
116+
117+
/**
118+
* Determine whether a job has a pending execute-step action scoped to it.
119+
*
120+
* @param int $job_id Data Machine job ID.
121+
* @return bool
122+
*/
123+
public static function job_has_execute_step_action( int $job_id ): bool {
124+
if ( $job_id <= 0 || ! self::actions_table_available() ) {
125+
return false;
126+
}
127+
128+
global $wpdb;
129+
$actions_table = $wpdb->prefix . 'actionscheduler_actions';
130+
$groups_table = $wpdb->prefix . 'actionscheduler_groups';
131+
132+
// phpcs:disable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching -- Operational status check for Action Scheduler work owned by Data Machine.
133+
$count = (int) $wpdb->get_var(
134+
$wpdb->prepare(
135+
"SELECT COUNT(*)
136+
FROM %i a
137+
INNER JOIN %i g ON g.group_id = a.group_id
138+
WHERE a.hook = %s
139+
AND a.status = 'pending'
140+
AND g.slug = %s
141+
AND (a.args LIKE %s OR a.args LIKE %s)",
142+
$actions_table,
143+
$groups_table,
144+
self::EXECUTE_STEP_HOOK,
145+
self::ACTION_GROUP,
146+
'%"job_id":' . $job_id . ',%',
147+
'%"job_id":' . $job_id . '}%'
148+
)
149+
);
150+
// phpcs:enable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching
151+
152+
return $count > 0;
153+
}
154+
155+
private static function is_pending_job( int $job_id ): bool {
156+
$job = self::get_job($job_id);
157+
return 'pending' === (string) ( $job['status'] ?? '' );
158+
}
159+
160+
/**
161+
* @return array<string,mixed>
162+
*/
163+
private static function get_job( int $job_id ): array {
164+
$ability = function_exists('wp_get_ability') ? wp_get_ability('datamachine/get-jobs') : null;
165+
if ( ! $ability ) {
166+
return array();
167+
}
168+
169+
$result = $ability->execute(array( 'job_id' => $job_id ));
170+
if ( ! ( $result['success'] ?? false ) || ! is_array($result['jobs'] ?? null) ) {
171+
return array();
172+
}
173+
174+
$job = $result['jobs'][0] ?? array();
175+
return is_array($job) ? $job : array();
176+
}
177+
178+
private static function first_step_id( int $job_id ): string {
179+
if ( ! function_exists('datamachine_get_engine_data') ) {
180+
return '';
181+
}
182+
183+
$engine_data = datamachine_get_engine_data($job_id);
184+
$flow_config = is_array($engine_data['flow_config'] ?? null) ? $engine_data['flow_config'] : array();
185+
if ( array() === $flow_config ) {
186+
return '';
187+
}
188+
189+
if ( class_exists('\DataMachine\Engine\ExecutionPlan') ) {
190+
try {
191+
return (string) \DataMachine\Engine\ExecutionPlan::from_flow_config($flow_config)->first_step_id();
192+
} catch ( \InvalidArgumentException ) {
193+
return '';
194+
}
195+
}
196+
197+
$keys = array_keys($flow_config);
198+
return (string) ( $keys[0] ?? '' );
199+
}
200+
201+
private static function actions_table_available(): bool {
202+
global $wpdb;
203+
return isset($wpdb) && is_object($wpdb) && isset($wpdb->prefix) && method_exists($wpdb, 'get_var') && method_exists($wpdb, 'prepare');
204+
}
205+
}

inc/Tasks/WorkspaceRetentionCleanupTask.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use DataMachine\Core\PluginSettings;
1111
use DataMachine\Engine\AI\System\Tasks\SystemTask;
1212
use DataMachine\Engine\Tasks\TaskScheduler;
13+
use DataMachineCode\Support\SystemTaskDrainability;
1314
use DataMachineCode\Workspace\Workspace;
1415

1516
defined('ABSPATH') || exit;
@@ -207,6 +208,10 @@ private function schedule_job_backed_cleanup( int $jobId, Workspace $workspace,
207208
return new \WP_Error('cleanup_chunk_schedule_failed', 'Failed to schedule cleanup chunk jobs.', array( 'status' => 500 ));
208209
}
209210

211+
$drainability = SystemTaskDrainability::ensure_jobs_have_execute_step_actions(
212+
is_array($batch['job_ids'] ?? null) ? $batch['job_ids'] : array()
213+
);
214+
210215
return array(
211216
'success' => true,
212217
'dry_run' => false,
@@ -237,6 +242,7 @@ private function schedule_job_backed_cleanup( int $jobId, Workspace $workspace,
237242
'planned_handles' => $this->cleanup_chunk_handles($chunk_rows),
238243
'batch_job_id' => (int) ( $batch['batch_job_id'] ?? 0 ),
239244
'direct_job_ids' => $batch['job_ids'] ?? array(),
245+
'drainability' => $drainability,
240246
),
241247
);
242248
}

0 commit comments

Comments
 (0)