@@ -45,6 +45,10 @@ type ParallelGuessGenerator struct {
4545 numParseTrees atomic.Int64
4646 probCoverage atomic.Int64 // scaled by 1e15 for precision
4747 startTime time.Time
48+
49+ // from previous session when resuming with -l (accumulated stats)
50+ prevRunningTime int64
51+ originalFirstStarted string // RFC3339, preserved when resuming
4852}
4953
5054// creates a generator that uses parallel workers
@@ -73,13 +77,35 @@ func NewParallelGuessGeneratorWithQueue(grammar pcfg.Grammar, base []pcfg.BaseSt
7377 }
7478}
7579
80+ // creates a generator with a pre-built queue and restores accumulated stats from a previous session
81+ func NewParallelGuessGeneratorWithQueueAndRestore (grammar pcfg.Grammar , base []pcfg.BaseStructure , queue * PcfgQueue , omenGrammar * omen.Grammar , debug bool , sav * SessionConfig ) * ParallelGuessGenerator {
82+ g := & ParallelGuessGenerator {
83+ Grammar : grammar ,
84+ Base : base ,
85+ Queue : queue ,
86+ Debug : debug ,
87+ OmenGrammar : omenGrammar ,
88+ outputChan : make (chan []byte , outputChanSize ),
89+ startTime : time .Now (),
90+ prevRunningTime : sav .RunningTime ,
91+ originalFirstStarted : sav .FirstStarted ,
92+ }
93+ g .totalGuesses .Store (sav .NumGuesses )
94+ g .numParseTrees .Store (sav .NumParseTrees )
95+ return g
96+ }
97+
7698// generates guesses using all CPU cores
7799func (g * ParallelGuessGenerator ) RunParallel (limit int64 ) (int64 , error ) {
78- return g .runParallelWithCtx (context .Background (), limit )
100+ return g .runParallelWithCtx (context .Background (), limit , nil )
79101}
80102
81- // runs with session save/load. On Ctrl+C, saves and exits gracefully
103+ // runs with session save/load. On Ctrl+C, saves and exits gracefully.
104+ // Save runs on every exit path: normal completion, signal (SIGINT/SIGTERM), or panic.
82105func (g * ParallelGuessGenerator ) RunParallelWithSession (limit int64 , savePath , ruleName , ruleUUID string , skipBrute , skipCase bool ) (int64 , error ) {
106+ // Ignore SIGPIPE so piping to pv, head, etc. doesn't kill us before save on Ctrl+C
107+ signal .Ignore (syscall .SIGPIPE )
108+
83109 ctx , cancel := context .WithCancel (context .Background ())
84110 sigCh := make (chan os.Signal , 1 )
85111 signal .Notify (sigCh , syscall .SIGINT , syscall .SIGTERM )
@@ -90,24 +116,28 @@ func (g *ParallelGuessGenerator) RunParallelWithSession(limit int64, savePath, r
90116 cancel ()
91117 }()
92118
93- total , err := g .runParallelWithCtx (ctx , limit )
119+ // Always save on exit: normal, signal, or panic. Works for first run and -l (load).
120+ defer func () {
121+ currentRunTime := int64 (time .Since (g .startTime ).Seconds ())
122+ cfg := & SessionConfig {
123+ NumGuesses : g .totalGuesses .Load (),
124+ NumParseTrees : g .numParseTrees .Load (),
125+ ProbCoverage : 0 ,
126+ RunningTime : g .prevRunningTime + currentRunTime ,
127+ MinProbability : g .Queue .MinProbability ,
128+ MaxProbability : g .Queue .MaxProbability ,
129+ }
130+ if saveErr := SaveSession (savePath , cfg , ruleName , ruleUUID , skipBrute , skipCase , g .originalFirstStarted ); saveErr != nil {
131+ fmt .Fprintf (os .Stderr , "Warning: could not save session: %v\n " , saveErr )
132+ } else {
133+ fmt .Fprintf (os .Stderr , "Session saved to %s\n " , savePath )
134+ }
135+ }()
94136
95- // save session on exit (normal or signal)
96- cfg := & SessionConfig {
97- NumGuesses : g .totalGuesses .Load (),
98- NumParseTrees : g .numParseTrees .Load (),
99- ProbCoverage : 0 ,
100- RunningTime : int64 (time .Since (g .startTime ).Seconds ()),
101- MinProbability : g .Queue .MinProbability ,
102- MaxProbability : g .Queue .MaxProbability ,
103- }
104- if saveErr := SaveSession (savePath , cfg , ruleName , ruleUUID , skipBrute , skipCase ); saveErr != nil {
105- fmt .Fprintf (os .Stderr , "Warning: could not save session: %v\n " , saveErr )
106- }
107- return total , err
137+ return g .runParallelWithCtx (ctx , limit , cancel )
108138}
109139
110- func (g * ParallelGuessGenerator ) runParallelWithCtx (ctx context.Context , limit int64 ) (int64 , error ) {
140+ func (g * ParallelGuessGenerator ) runParallelWithCtx (ctx context.Context , limit int64 , cancelOnPipe func () ) (int64 , error ) {
111141 numWorkers := runtime .NumCPU ()
112142 if numWorkers < 1 {
113143 numWorkers = 1
@@ -118,12 +148,16 @@ func (g *ParallelGuessGenerator) runParallelWithCtx(ctx context.Context, limit i
118148 var wg sync.WaitGroup
119149
120150 // writer goroutine: consume batches from outputChan
151+ // on broken pipe (e.g. pv exits), cancel so we save instead of spinning
121152 wg .Add (1 )
122153 go func () {
123154 defer wg .Done ()
124155 defer writer .Flush ()
125156 for buf := range g .outputChan {
126- writer .Write (buf )
157+ if _ , err := writer .Write (buf ); err != nil && cancelOnPipe != nil {
158+ // broken pipe, reader exited - cancel to trigger save
159+ cancelOnPipe ()
160+ }
127161 }
128162 }()
129163
0 commit comments