2929from http .server import BaseHTTPRequestHandler , HTTPServer
3030from pathlib import Path
3131
32- try :
33- import yaml
34- except ImportError :
35- print (
36- "error: pyyaml is required. Install with: pip install pyyaml" , file = sys .stderr
37- )
38- sys .exit (1 )
39-
4032import ci_resource as res
4133import github_status as gh
4234import run
5244STATE_FAILURE = "failure"
5345STATE_ERROR = "error"
5446
47+ TAIL_LINES = 50
48+
5549# urllib helpers (module-level for easier mocking in tests)
5650urllib_request = urllib .request .Request
5751urllib_urlopen = urllib .request .urlopen
6559class JobRequest :
6660 """Describes a CI job to be executed."""
6761
68- def __init__ (self , job_name , branch , commit_sha , config , image_tag = None , results_dir = None ):
62+ def __init__ (
63+ self , job_name , branch , commit_sha , config , image_tag = None , results_dir = None
64+ ):
6965 self .job_id = str (uuid .uuid4 ())[:8 ]
7066 self .job_name = job_name
7167 self .branch = branch
@@ -92,18 +88,28 @@ def to_dict(self):
9288class JobResult :
9389 """Outcome of a completed job."""
9490
95- def __init__ (self , job_id , job_name , commit_sha , returncode , results_dir , duration ):
91+ def __init__ (
92+ self ,
93+ job_id ,
94+ job_name ,
95+ commit_sha ,
96+ returncode ,
97+ results_dir ,
98+ duration ,
99+ error_tail = None ,
100+ ):
96101 self .job_id = job_id
97102 self .job_name = job_name
98103 self .commit_sha = commit_sha
99104 self .returncode = returncode
100105 self .results_dir = results_dir
101106 self .duration = duration
107+ self .error_tail = error_tail or []
102108
103109 self .state = STATE_SUCCESS if returncode == 0 else STATE_FAILURE
104110
105111 def to_dict (self ):
106- return {
112+ d = {
107113 "job_id" : self .job_id ,
108114 "job_name" : self .job_name ,
109115 "commit_sha" : self .commit_sha ,
@@ -113,6 +119,11 @@ def to_dict(self):
113119 "duration_seconds" : round (self .duration , 1 ),
114120 }
115121
122+ if self .error_tail :
123+ d ["error_tail" ] = self .error_tail
124+
125+ return d
126+
116127
117128# ---------------------------------------------------------------------------
118129# Job selection and routing
@@ -130,14 +141,11 @@ def select_jobs(config, platform=None, job_name=None):
130141 return [job_name ]
131142
132143 if platform :
133- return [
134- name for name , job in jobs .items () if job .get ("platform" ) == platform
135- ]
144+ return [name for name , job in jobs .items () if job .get ("platform" ) == platform ]
136145
137146 return list (jobs .keys ())
138147
139148
140-
141149# ---------------------------------------------------------------------------
142150# Scheduler
143151# ---------------------------------------------------------------------------
@@ -211,18 +219,17 @@ def get_job(self, job_id):
211219 def get_status (self ):
212220 """Return scheduler status for the /status endpoint."""
213221 with self ._lock :
214- queued = [
215- self ._jobs [r .job_id ]["request" ].to_dict ()
216- for r in self ._queue
217- ]
222+ queued = [self ._jobs [r .job_id ]["request" ].to_dict () for r in self ._queue ]
218223 running = []
219224 completed = []
220225
221226 for entry in self ._jobs .values ():
222227 state = entry ["state" ]
223228
224229 if state == STATE_RUNNING :
225- running .append ({** entry ["request" ].to_dict (), "gpu_ids" : entry ["gpu_ids" ]})
230+ running .append (
231+ {** entry ["request" ].to_dict (), "gpu_ids" : entry ["gpu_ids" ]}
232+ )
226233 elif state in (STATE_SUCCESS , STATE_FAILURE ):
227234 completed .append (entry ["result" ].to_dict ())
228235
@@ -238,7 +245,8 @@ def wait_all(self):
238245 while True :
239246 with self ._lock :
240247 pending = any (
241- e ["state" ] in (STATE_QUEUED , STATE_RUNNING ) for e in self ._jobs .values ()
248+ e ["state" ] in (STATE_QUEUED , STATE_RUNNING )
249+ for e in self ._jobs .values ()
242250 )
243251
244252 if not pending :
@@ -248,11 +256,7 @@ def wait_all(self):
248256 self ._done_event .clear ()
249257
250258 with self ._lock :
251- return [
252- e ["result" ]
253- for e in self ._jobs .values ()
254- if e ["result" ] is not None
255- ]
259+ return [e ["result" ] for e in self ._jobs .values () if e ["result" ] is not None ]
256260
257261 def _try_schedule (self ):
258262 """Try to run queued jobs that have enough resources.
@@ -315,7 +319,9 @@ def _run_job(self, req, gpu_ids):
315319 job_cfg = self ._config ["jobs" ][req .job_name ]
316320 all_stages = job_cfg .get ("stages" , [])
317321 repo_url = self ._config .get ("repo" , {}).get ("url" , "" )
318- commit_short = req .commit_sha [:7 ] if len (req .commit_sha ) > 7 else req .commit_sha
322+ commit_short = (
323+ req .commit_sha [:7 ] if len (req .commit_sha ) > 7 else req .commit_sha
324+ )
319325 results_dir = run .build_results_dir (
320326 req .results_dir , req .platform , all_stages , commit_short
321327 )
@@ -338,10 +344,30 @@ def _run_job(self, req, gpu_ids):
338344 if self ._dry_run :
339345 print (f"[dry-run] { req .job_name } : { shlex .join (docker_args )} " )
340346 returncode = 0
347+ error_tail = []
341348 else :
342349 results_dir .mkdir (parents = True , exist_ok = True )
343- proc = subprocess .run (docker_args )
344- returncode = proc .returncode
350+ proc = subprocess .Popen (
351+ docker_args ,
352+ stdout = subprocess .PIPE ,
353+ stderr = subprocess .STDOUT ,
354+ )
355+ tail_buf = collections .deque (maxlen = TAIL_LINES )
356+
357+ for line in proc .stdout :
358+ sys .stdout .buffer .write (line )
359+ tail_buf .append (line )
360+
361+ proc .stdout .close ()
362+ returncode = proc .wait ()
363+
364+ if returncode != 0 :
365+ error_tail = [
366+ raw .decode ("utf-8" , errors = "replace" ).rstrip ("\n " )
367+ for raw in tail_buf
368+ ]
369+ else :
370+ error_tail = []
345371
346372 duration = time .monotonic () - start
347373
@@ -352,6 +378,7 @@ def _run_job(self, req, gpu_ids):
352378 returncode = returncode ,
353379 results_dir = results_dir ,
354380 duration = duration ,
381+ error_tail = error_tail ,
355382 )
356383
357384 # Post final status
@@ -365,7 +392,9 @@ def _run_job(self, req, gpu_ids):
365392 f"{ req .job_name } : { result .state } in { duration :.0f} s" ,
366393 )
367394 except Exception as e :
368- print (f"error: job { req .job_name } failed with exception: { e } " , file = sys .stderr )
395+ print (
396+ f"error: job { req .job_name } failed with exception: { e } " , file = sys .stderr
397+ )
369398
370399 if result is None :
371400 result = JobResult (
@@ -375,6 +404,7 @@ def _run_job(self, req, gpu_ids):
375404 returncode = - 1 ,
376405 results_dir = req .results_dir ,
377406 duration = 0 ,
407+ error_tail = [str (e )],
378408 )
379409
380410 if not self ._no_status :
@@ -392,7 +422,9 @@ def _run_job(self, req, gpu_ids):
392422
393423 with self ._lock :
394424 self ._jobs [req .job_id ]["result" ] = result
395- self ._jobs [req .job_id ]["state" ] = result .state if result else STATE_FAILURE
425+ self ._jobs [req .job_id ]["state" ] = (
426+ result .state if result else STATE_FAILURE
427+ )
396428
397429 self ._done_event .set ()
398430 self ._try_schedule ()
@@ -410,9 +442,9 @@ def verify_signature(secret, body, signature_header):
410442 if not signature_header :
411443 return False
412444
413- expected = "sha256=" + hmac . new (
414- secret .encode ("utf-8" ), body , hashlib .sha256
415- ). hexdigest ()
445+ expected = (
446+ "sha256=" + hmac . new ( secret .encode ("utf-8" ), body , hashlib .sha256 ). hexdigest ()
447+ )
416448 return hmac .compare_digest (expected , signature_header )
417449
418450
@@ -567,7 +599,9 @@ def _parse_pull_request(self, payload):
567599
568600 def _submit_jobs (self , branch , sha , job_name = None , image_tag = None ):
569601 config = self .server .config
570- job_names = select_jobs (config , platform = self .server .platform , job_name = job_name )
602+ job_names = select_jobs (
603+ config , platform = self .server .platform , job_name = job_name
604+ )
571605 job_ids = []
572606
573607 for name in job_names :
@@ -621,7 +655,9 @@ def __init__(
621655# ---------------------------------------------------------------------------
622656
623657
624- def dispatch_remote_job (agent_url , job_name , branch , commit_sha , image_tag = None , api_token = None ):
658+ def dispatch_remote_job (
659+ agent_url , job_name , branch , commit_sha , image_tag = None , api_token = None
660+ ):
625661 """Send a job to a remote agent via HTTP API. Returns job_id or None."""
626662 url = f"{ agent_url .rstrip ('/' )} /api/run"
627663 body = {
@@ -707,7 +743,10 @@ def cmd_run(args):
707743 agent_url = agents .get (platform , {}).get ("url" , "" )
708744
709745 if not agent_url :
710- print (f"error: no agent URL configured for platform { platform !r} (job { name } )" , file = sys .stderr )
746+ print (
747+ f"error: no agent URL configured for platform { platform !r} (job { name } )" ,
748+ file = sys .stderr ,
749+ )
711750 sys .exit (1 )
712751
713752 jobs_to_dispatch .append ((name , agent_url ))
@@ -730,7 +769,11 @@ def cmd_run(args):
730769 file = sys .stderr ,
731770 )
732771 job_id = dispatch_remote_job (
733- agent_url , name , branch , commit_sha , args .image_tag ,
772+ agent_url ,
773+ name ,
774+ branch ,
775+ commit_sha ,
776+ args .image_tag ,
734777 api_token = api_token or None ,
735778 )
736779
@@ -748,6 +791,9 @@ def cmd_run(args):
748791 for name , url , jid in dispatched
749792 }
750793
794+ # Collect name lengths for column alignment.
795+ name_width = max (len (n ) for n , _ , _ in dispatched )
796+
751797 for future in as_completed (futures ):
752798 name , _ , _ = futures [future ]
753799 result = future .result ()
@@ -757,30 +803,47 @@ def cmd_run(args):
757803 duration = result .get ("duration_seconds" , 0 )
758804 tag = "PASS" if state == STATE_SUCCESS else "FAIL"
759805 print (
760- f"<== { tag } { name } ({ duration :.0f} s)" ,
806+ f"<== { tag } { name :<{ name_width } } ({ duration :.0f} s)" ,
761807 file = sys .stderr ,
762808 )
809+
810+ error_tail = result .get ("error_tail" , [])
811+
812+ if error_tail :
813+ print (
814+ f"--- error output (last { len (error_tail )} lines) ---" ,
815+ file = sys .stderr ,
816+ )
817+
818+ for line in error_tail :
819+ print (f" { line } " , file = sys .stderr )
820+
821+ print ("---" , file = sys .stderr )
822+
763823 results .append (result )
764824 else :
765- print (f"<== TIMEOUT { name } " , file = sys .stderr )
825+ print (
826+ f"<== TIMEOUT { name :<{name_width }} " ,
827+ file = sys .stderr ,
828+ )
766829 results .append ({"job_name" : name , "state" : "timeout" })
767830
768- # Summary
769- print ("\n ========== Results ==========" )
770- all_ok = True
771-
772- for r in results :
773- state = r .get ("state" , "unknown" )
774- name = r .get ("job_name" , "?" )
775- status = "PASS" if state == STATE_SUCCESS else "FAIL"
831+ # Summary: only print when there are failures.
832+ failed = [r for r in results if r .get ("state" ) != STATE_SUCCESS ]
776833
777- if state != STATE_SUCCESS :
778- all_ok = False
834+ if failed :
835+ print ("\n ========== Failed ==========" , file = sys .stderr )
836+ name_width = max (len (r .get ("job_name" , "?" )) for r in failed )
779837
780- duration = r .get ("duration_seconds" , 0 )
781- print (f" { status } { name } ({ duration :.0f} s)" )
838+ for r in failed :
839+ name = r .get ("job_name" , "?" )
840+ state = r .get ("state" , "unknown" )
841+ duration = r .get ("duration_seconds" , 0 )
842+ print (
843+ f" FAIL { name :<{name_width }} { state } ({ duration :.0f} s)" ,
844+ file = sys .stderr ,
845+ )
782846
783- if not all_ok :
784847 sys .exit (1 )
785848
786849
@@ -849,11 +912,11 @@ def cmd_serve(args):
849912 f"Agent serving on { args .host } :{ args .port } (platform={ platform } )" ,
850913 file = sys .stderr ,
851914 )
852- print (f " POST /webhook — GitHub webhook" , file = sys .stderr )
853- print (f " POST /api/run — remote job trigger" , file = sys .stderr )
854- print (f " GET /health — health check" , file = sys .stderr )
855- print (f " GET /status — queue & resource status" , file = sys .stderr )
856- print (f " GET /api/job/{{id} } — job status" , file = sys .stderr )
915+ print (" POST /webhook — GitHub webhook" , file = sys .stderr )
916+ print (" POST /api/run — remote job trigger" , file = sys .stderr )
917+ print (" GET /health — health check" , file = sys .stderr )
918+ print (" GET /status — queue & resource status" , file = sys .stderr )
919+ print (" GET /api/job/{id } — job status" , file = sys .stderr )
857920
858921 try :
859922 server .serve_forever ()
@@ -875,7 +938,9 @@ def main():
875938 type = Path ,
876939 default = Path (__file__ ).resolve ().parent / "config.yaml" ,
877940 )
878- run_parser .add_argument ("--branch" , type = str , help = "Branch to test (default: config repo.branch)" )
941+ run_parser .add_argument (
942+ "--branch" , type = str , help = "Branch to test (default: config repo.branch)"
943+ )
879944 run_parser .add_argument ("--job" , type = str , help = "Specific job name" )
880945 run_parser .add_argument ("--platform" , type = str , help = "Filter jobs by platform" )
881946 run_parser .add_argument ("--image-tag" , type = str , help = "Override image tag" )
0 commit comments