@@ -115,8 +115,6 @@ func (w *Watcher) addNewlyAvailableLogs() {
115115 }
116116 }
117117
118- // TODO maybe add a check for logs that are still watched but no longer on the logList and remove them? See also issue #41 and #42
119-
120118 // If the log is not being watched, create a new worker
121119 if ! alreadyWatched {
122120 w .wg .Add (1 )
@@ -141,6 +139,33 @@ func (w *Watcher) addNewlyAvailableLogs() {
141139
142140 log .Printf ("New ct logs found: %d\n " , newCTs )
143141 log .Printf ("Currently monitored ct logs: %d\n " , len (w .workers ))
142+
143+ // Iterate over all workers and check if they are still in the logList
144+ // If they are not, the CT Logs are probably no longer relevant.
145+ // We should stop the worker if that didn't already happen.
146+ for _ , ctWorker := range w .workers {
147+ workerURL := normalizeCtlogURL (ctWorker .ctURL )
148+
149+ onLogList := false
150+ for _ , operator := range logList .Operators {
151+ // Iterate over each log of the operator
152+ for _ , transparencyLog := range operator .Logs {
153+ // Check if the log is already being watched
154+
155+ logListURL := normalizeCtlogURL (transparencyLog .URL )
156+ if workerURL == logListURL {
157+ onLogList = true
158+ break
159+ }
160+ }
161+ }
162+
163+ // If the log is not in the loglist, stop the worker
164+ if ! onLogList {
165+ log .Printf ("Stopping worker. CT URL not found in LogList: '%s'\n " , ctWorker .ctURL )
166+ ctWorker .stop ()
167+ }
168+ }
144169}
145170
146171// Stop stops the watcher.
@@ -157,10 +182,13 @@ type worker struct {
157182 entryChan chan models.Entry
158183 mu sync.Mutex
159184 running bool
185+ cancel context.CancelFunc
160186}
161187
162188// startDownloadingCerts starts downloading certificates from the CT log. This method is blocking.
163189func (w * worker ) startDownloadingCerts (ctx context.Context ) {
190+ ctx , w .cancel = context .WithCancel (ctx )
191+
164192 // Normalize CT URL. We remove trailing slashes and prepend "https://" if it's not already there.
165193 w .ctURL = strings .TrimRight (w .ctURL , "/" )
166194 if ! strings .HasPrefix (w .ctURL , "https://" ) && ! strings .HasPrefix (w .ctURL , "http://" ) {
@@ -179,13 +207,15 @@ func (w *worker) startDownloadingCerts(ctx context.Context) {
179207 }
180208
181209 w .running = true
210+ defer func () { w .running = false }()
182211 w .mu .Unlock ()
183212
184213 for {
185214 log .Printf ("Starting worker for CT log: %s\n " , w .ctURL )
186215 workerErr := w .runWorker (ctx )
187216 if workerErr != nil {
188217 if errors .Is (workerErr , errFetchingSTHFailed ) {
218+ // TODO this could happen due to a 429 error. We should retry the request
189219 log .Printf ("Worker for '%s' failed - could not fetch STH\n " , w .ctURL )
190220 return
191221 } else if errors .Is (workerErr , errCreatingClient ) {
@@ -213,6 +243,13 @@ func (w *worker) startDownloadingCerts(ctx context.Context) {
213243 }
214244}
215245
246+ func (w * worker ) stop () {
247+ w .mu .Lock ()
248+ defer w .mu .Unlock ()
249+
250+ w .cancel ()
251+ }
252+
216253// runWorker runs a single worker for a single CT log. This method is blocking.
217254func (w * worker ) runWorker (ctx context.Context ) error {
218255 hc := http.Client {Timeout : 30 * time .Second }
@@ -224,6 +261,7 @@ func (w *worker) runWorker(ctx context.Context) error {
224261
225262 sth , getSTHerr := jsonClient .GetSTH (ctx )
226263 if getSTHerr != nil {
264+ // TODO this can happen due to a 429 error. We should retry the request
227265 log .Printf ("Could not get STH for '%s': %s\n " , w .ctURL , getSTHerr )
228266 return errFetchingSTHFailed
229267 }
0 commit comments