@@ -10,7 +10,6 @@ import (
1010 "io/fs"
1111 "os"
1212 "os/exec"
13- "path/filepath"
1413 "runtime"
1514 "sync"
1615 "syscall"
@@ -348,12 +347,7 @@ func (m *Manager) createDefaultRunner(ctx context.Context) (*Runner, error) {
348347 tmpDir : tmpDir ,
349348 uploader : uploader ,
350349 }
351- // Only enable forced shutdown for procedure mode
352- var forceShutdown * config.ForceShutdownSignal
353- if m .cfg .UseProcedureMode {
354- forceShutdown = m .cfg .ForceShutdown
355- }
356- runner , err := NewRunner (runtimeContext , runtimeCancel , runnerCtx , cmd , cogYaml .Concurrency .Max , m .cfg .CleanupTimeout , forceShutdown , m .baseLogger )
350+ runner , err := NewRunner (runtimeContext , runtimeCancel , runnerCtx , cmd , cogYaml .Concurrency .Max , m .cfg , m .baseLogger )
357351 if err != nil {
358352 return nil , err
359353 }
@@ -419,6 +413,35 @@ func (m *Manager) allocatePrediction(runner *Runner, req PredictionRequest) { //
419413 delete (runner .pending , req .ID )
420414 runner .mu .Unlock ()
421415
416+ // In one-shot mode, stop runner after prediction completes to trigger cleanup
417+ if m .cfg .OneShot && finalResponse .Status .IsCompleted () {
418+ go func () {
419+ logger := m .logger .Sugar ()
420+ logger .Infow ("one-shot mode: stopping runner after prediction completion" , "prediction_id" , req .ID , "runner_id" , runner .runnerCtx .id )
421+
422+ // Try graceful stop with timeout
423+ stopDone := make (chan error , 1 )
424+ go func () {
425+ stopDone <- runner .Stop ()
426+ }()
427+
428+ timeout := m .cfg .CleanupTimeout
429+ if timeout == 0 {
430+ timeout = 10 * time .Second // Default timeout
431+ }
432+
433+ select {
434+ case err := <- stopDone :
435+ if err != nil {
436+ logger .Errorw ("failed to stop runner in one-shot mode" , "error" , err , "runner_id" , runner .runnerCtx .id )
437+ }
438+ case <- time .After (timeout ):
439+ logger .Warnw ("stop timeout exceeded in one-shot mode, falling back to force kill" , "timeout" , timeout , "runner_id" , runner .runnerCtx .id )
440+ runner .ForceKill ()
441+ }
442+ }()
443+ }
444+
422445 if cancel != nil {
423446 cancel ()
424447 }
@@ -626,20 +649,28 @@ func (m *Manager) createProcedureRunner(runnerName, procedureHash string) (*Runn
626649 env = append (env , "TMPDIR=" + tmpDir )
627650 cmd .Env = env
628651
629- // Apply setUID isolation for procedure runners if needed
652+ var allocatedUID * int
630653 if m .shouldUseSetUID () {
631654 uid , err := AllocateUID ()
632655 if err != nil {
633656 runtimeCancel ()
634657 return nil , fmt .Errorf ("failed to allocate UID: %w" , err )
635658 }
659+ allocatedUID = & uid
660+
661+ // Use os.Root for secure ownership changes
662+ workingRoot , err := os .OpenRoot (workingDir )
663+ if err != nil {
664+ runtimeCancel ()
665+ return nil , fmt .Errorf ("failed to open working directory root: %w" , err )
666+ }
667+ defer func () { _ = workingRoot .Close () }()
636668
637- // Change ownership of source directory (workingDir)
638- err = filepath .WalkDir (workingDir , func (path string , d fs.DirEntry , err error ) error {
669+ err = fs .WalkDir (workingRoot .FS (), "." , func (path string , d fs.DirEntry , err error ) error {
639670 if err != nil {
640671 return err
641672 }
642- if lchownErr := os .Lchown (path , uid , NoGroupGID ); lchownErr != nil {
673+ if lchownErr := workingRoot .Lchown (path , uid , NoGroupGID ); lchownErr != nil {
643674 log .Errorw ("failed to change ownership" , "path" , path , "uid" , uid , "error" , lchownErr )
644675 return lchownErr
645676 }
@@ -650,19 +681,24 @@ func (m *Manager) createProcedureRunner(runnerName, procedureHash string) (*Runn
650681 return nil , fmt .Errorf ("failed to change ownership of source directory: %w" , err )
651682 }
652683
653- // Make working dir writable by unprivileged Python process
654- if err := os .Lchown (workingDir , uid , NoGroupGID ); err != nil {
684+ if err := workingRoot .Lchown ("." , uid , NoGroupGID ); err != nil {
655685 log .Errorw ("failed to change ownership of working directory" , "path" , workingDir , "uid" , uid , "error" , err )
656686 runtimeCancel ()
657687 return nil , fmt .Errorf ("failed to change ownership of working directory: %w" , err )
658688 }
659- // Change ownership of temp directory
660- if err := os .Lchown (tmpDir , uid , NoGroupGID ); err != nil {
689+
690+ tmpRoot , err := os .OpenRoot (tmpDir )
691+ if err != nil {
692+ runtimeCancel ()
693+ return nil , fmt .Errorf ("failed to open temp directory root: %w" , err )
694+ }
695+ defer func () { _ = tmpRoot .Close () }()
696+
697+ if err := tmpRoot .Lchown ("." , uid , NoGroupGID ); err != nil {
661698 log .Errorw ("failed to change ownership of temp directory" , "path" , tmpDir , "uid" , uid , "error" , err )
662699 runtimeCancel ()
663700 return nil , fmt .Errorf ("failed to change ownership of temp directory: %w" , err )
664701 }
665- // Use syscall.Credential to run process as unprivileged user from start
666702 cmd .SysProcAttr .Credential = & syscall.Credential {
667703 Uid : uint32 (uid ), //nolint:gosec // this is guarded in isolation .allocate, cannot exceed const MaxUID
668704 Gid : uint32 (NoGroupGID ),
@@ -675,19 +711,17 @@ func (m *Manager) createProcedureRunner(runnerName, procedureHash string) (*Runn
675711 if m .cfg .UploadURL != "" {
676712 uploader = newUploader (m .cfg .UploadURL )
677713 }
714+
678715 runnerCtx := RunnerContext {
679- id : runnerName ,
680- workingdir : workingDir ,
681- tmpDir : tmpDir ,
682- uploader : uploader ,
716+ id : runnerName ,
717+ workingdir : workingDir ,
718+ tmpDir : tmpDir ,
719+ uploader : uploader ,
720+ uid : allocatedUID ,
721+ cleanupDirectories : m .cfg .CleanupDirectories ,
683722 }
684723
685- // Only enable forced shutdown for procedure mode
686- var forceShutdown * config.ForceShutdownSignal
687- if m .cfg .UseProcedureMode {
688- forceShutdown = m .cfg .ForceShutdown
689- }
690- runner , err := NewRunner (runtimeContext , runtimeCancel , runnerCtx , cmd , 1 , m .cfg .CleanupTimeout , forceShutdown , m .baseLogger )
724+ runner , err := NewRunner (runtimeContext , runtimeCancel , runnerCtx , cmd , 1 , m .cfg , m .baseLogger )
691725 if err != nil {
692726 return nil , fmt .Errorf ("failed to create runner: %w" , err )
693727 }
0 commit comments