Skip to content

Commit bc8e0df

Browse files
committed
feat: stop workers for outdated ctlogs
related to #41 and #42
1 parent 02e827e commit bc8e0df

1 file changed

Lines changed: 40 additions & 2 deletions

File tree

internal/certificatetransparency/ct-watcher.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,6 @@ func (w *Watcher) addNewlyAvailableLogs() {
115115
}
116116
}
117117

118-
// TODO maybe add a check for logs that are still watched but no longer on the logList and remove them? See also issue #41 and #42
119-
120118
// If the log is not being watched, create a new worker
121119
if !alreadyWatched {
122120
w.wg.Add(1)
@@ -141,6 +139,33 @@ func (w *Watcher) addNewlyAvailableLogs() {
141139

142140
log.Printf("New ct logs found: %d\n", newCTs)
143141
log.Printf("Currently monitored ct logs: %d\n", len(w.workers))
142+
143+
// Iterate over all workers and check if they are still in the logList
144+
// If they are not, the CT Logs are probably no longer relevant.
145+
// We should stop the worker if that didn't already happen.
146+
for _, ctWorker := range w.workers {
147+
workerURL := normalizeCtlogURL(ctWorker.ctURL)
148+
149+
onLogList := false
150+
for _, operator := range logList.Operators {
151+
// Iterate over each log of the operator
152+
for _, transparencyLog := range operator.Logs {
153+
// Check if the log is already being watched
154+
155+
logListURL := normalizeCtlogURL(transparencyLog.URL)
156+
if workerURL == logListURL {
157+
onLogList = true
158+
break
159+
}
160+
}
161+
}
162+
163+
// If the log is not in the loglist, stop the worker
164+
if !onLogList {
165+
log.Printf("Stopping worker. CT URL not found in LogList: '%s'\n", ctWorker.ctURL)
166+
ctWorker.stop()
167+
}
168+
}
144169
}
145170

146171
// Stop stops the watcher.
@@ -157,10 +182,13 @@ type worker struct {
157182
entryChan chan models.Entry
158183
mu sync.Mutex
159184
running bool
185+
cancel context.CancelFunc
160186
}
161187

162188
// startDownloadingCerts starts downloading certificates from the CT log. This method is blocking.
163189
func (w *worker) startDownloadingCerts(ctx context.Context) {
190+
ctx, w.cancel = context.WithCancel(ctx)
191+
164192
// Normalize CT URL. We remove trailing slashes and prepend "https://" if it's not already there.
165193
w.ctURL = strings.TrimRight(w.ctURL, "/")
166194
if !strings.HasPrefix(w.ctURL, "https://") && !strings.HasPrefix(w.ctURL, "http://") {
@@ -179,13 +207,15 @@ func (w *worker) startDownloadingCerts(ctx context.Context) {
179207
}
180208

181209
w.running = true
210+
defer func() { w.running = false }()
182211
w.mu.Unlock()
183212

184213
for {
185214
log.Printf("Starting worker for CT log: %s\n", w.ctURL)
186215
workerErr := w.runWorker(ctx)
187216
if workerErr != nil {
188217
if errors.Is(workerErr, errFetchingSTHFailed) {
218+
// TODO this could happen due to a 429 error. We should retry the request
189219
log.Printf("Worker for '%s' failed - could not fetch STH\n", w.ctURL)
190220
return
191221
} else if errors.Is(workerErr, errCreatingClient) {
@@ -213,6 +243,13 @@ func (w *worker) startDownloadingCerts(ctx context.Context) {
213243
}
214244
}
215245

246+
func (w *worker) stop() {
247+
w.mu.Lock()
248+
defer w.mu.Unlock()
249+
250+
w.cancel()
251+
}
252+
216253
// runWorker runs a single worker for a single CT log. This method is blocking.
217254
func (w *worker) runWorker(ctx context.Context) error {
218255
hc := http.Client{Timeout: 30 * time.Second}
@@ -224,6 +261,7 @@ func (w *worker) runWorker(ctx context.Context) error {
224261

225262
sth, getSTHerr := jsonClient.GetSTH(ctx)
226263
if getSTHerr != nil {
264+
// TODO this can happen due to a 429 error. We should retry the request
227265
log.Printf("Could not get STH for '%s': %s\n", w.ctURL, getSTHerr)
228266
return errFetchingSTHFailed
229267
}

0 commit comments

Comments
 (0)