diff --git a/CHANGELOG.md b/CHANGELOG.md index c7bafab..6215ac0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Remove old CT logs as soon as they are removed from the Google CT Loglist (#60) - New configuration for buffer sizes (#58) ### Changed ### Fixed diff --git a/config.sample.yaml b/config.sample.yaml index 30744a7..6d457c2 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -37,3 +37,6 @@ general: ctlog: 1000 # Combined buffer for the broadcast manager broadcastmanager: 10000 + # Google regularly updates the log list. If this option is set to true, the server will remove all old logs. + # This option defaults to true. See https://github.com/d-Rickyy-b/certstream-server-go/issues/51 + drop_old_logs: true diff --git a/internal/certificatetransparency/ct-watcher.go b/internal/certificatetransparency/ct-watcher.go index 9218465..b79863b 100644 --- a/internal/certificatetransparency/ct-watcher.go +++ b/internal/certificatetransparency/ct-watcher.go @@ -55,7 +55,7 @@ func (w *Watcher) Start() { } // initialize the watcher with currently available logs - w.addNewlyAvailableLogs() + w.updateLogs() log.Println("Started CT watcher") go certHandler(w.certChan) @@ -69,14 +69,14 @@ func (w *Watcher) Start() { // This method is blocking. It can be stopped by cancelling the context. func (w *Watcher) watchNewLogs() { // Add all available logs to the watcher - w.addNewlyAvailableLogs() + w.updateLogs() // Check for new logs once every hour ticker := time.NewTicker(1 * time.Hour) for { select { case <-ticker.C: - w.addNewlyAvailableLogs() + w.updateLogs() case <-w.context.Done(): ticker.Stop() return @@ -84,11 +84,7 @@ func (w *Watcher) watchNewLogs() { } } -// The transparency log list is constantly updated with new Log servers. -// This function checks for new ct logs and adds them to the watcher. -func (w *Watcher) addNewlyAvailableLogs() { - log.Println("Checking for new ct logs...") - +func (w *Watcher) updateLogs() { // Get a list of urls of all CT logs logList, err := getAllLogs() if err != nil { @@ -96,6 +92,16 @@ func (w *Watcher) addNewlyAvailableLogs() { return } + w.addNewlyAvailableLogs(logList) + if *config.AppConfig.General.DropOldLogs { + w.dropRemovedLogs(logList) + } +} + +// addNewlyAvailableLogs checks the transparency log list for new Log servers and adds workers for those to the watcher. +func (w *Watcher) addNewlyAvailableLogs(logList loglist3.LogList) { + log.Println("Checking for new ct logs...") + newCTs := 0 // Check the ct log list for new, unwatched logs @@ -115,8 +121,6 @@ func (w *Watcher) addNewlyAvailableLogs() { } } - // TODO maybe add a check for logs that are still watched but no longer on the logList and remove them? See also issue #41 and #42 - // If the log is not being watched, create a new worker if !alreadyWatched { w.wg.Add(1) @@ -143,6 +147,43 @@ func (w *Watcher) addNewlyAvailableLogs() { log.Printf("Currently monitored ct logs: %d\n", len(w.workers)) } +// dropRemovedLogs checks if any of the currently monitored logs are no longer in the log list. +// If they are not, the CT Logs are probably no longer relevant and the corresponding workers will be stopped. +func (w *Watcher) dropRemovedLogs(logList loglist3.LogList) { + removedCTs := 0 + + // Iterate over all workers and check if they are still in the logList + // If they are not, the CT Logs are probably no longer relevant. + // We should stop the worker if that didn't already happen. + for _, ctWorker := range w.workers { + workerURL := normalizeCtlogURL(ctWorker.ctURL) + + onLogList := false + for _, operator := range logList.Operators { + // Iterate over each log of the operator + for _, transparencyLog := range operator.Logs { + // Check if the log is already being watched + + logListURL := normalizeCtlogURL(transparencyLog.URL) + if workerURL == logListURL { + onLogList = true + break + } + } + } + + // If the log is not in the loglist, stop the worker + if !onLogList { + log.Printf("Stopping worker. CT URL not found in LogList: '%s'\n", ctWorker.ctURL) + removedCTs++ + ctWorker.stop() + } + } + + log.Printf("Removed ct logs: %d\n", removedCTs) + log.Printf("Currently monitored ct logs: %d\n", len(w.workers)) +} + // Stop stops the watcher. func (w *Watcher) Stop() { log.Printf("Stopping watcher\n") @@ -157,10 +198,13 @@ type worker struct { entryChan chan models.Entry mu sync.Mutex running bool + cancel context.CancelFunc } // startDownloadingCerts starts downloading certificates from the CT log. This method is blocking. func (w *worker) startDownloadingCerts(ctx context.Context) { + ctx, w.cancel = context.WithCancel(ctx) + // Normalize CT URL. We remove trailing slashes and prepend "https://" if it's not already there. w.ctURL = strings.TrimRight(w.ctURL, "/") if !strings.HasPrefix(w.ctURL, "https://") && !strings.HasPrefix(w.ctURL, "http://") { @@ -179,6 +223,7 @@ func (w *worker) startDownloadingCerts(ctx context.Context) { } w.running = true + defer func() { w.running = false }() w.mu.Unlock() for { @@ -186,6 +231,7 @@ func (w *worker) startDownloadingCerts(ctx context.Context) { workerErr := w.runWorker(ctx) if workerErr != nil { if errors.Is(workerErr, errFetchingSTHFailed) { + // TODO this could happen due to a 429 error. We should retry the request log.Printf("Worker for '%s' failed - could not fetch STH\n", w.ctURL) return } else if errors.Is(workerErr, errCreatingClient) { @@ -213,6 +259,13 @@ func (w *worker) startDownloadingCerts(ctx context.Context) { } } +func (w *worker) stop() { + w.mu.Lock() + defer w.mu.Unlock() + + w.cancel() +} + // runWorker runs a single worker for a single CT log. This method is blocking. func (w *worker) runWorker(ctx context.Context) error { hc := http.Client{Timeout: 30 * time.Second} @@ -224,6 +277,7 @@ func (w *worker) runWorker(ctx context.Context) error { sth, getSTHerr := jsonClient.GetSTH(ctx) if getSTHerr != nil { + // TODO this can happen due to a 429 error. We should retry the request log.Printf("Could not get STH for '%s': %s\n", w.ctURL, getSTHerr) return errFetchingSTHFailed } diff --git a/internal/config/config.go b/internal/config/config.go index 0010acc..6acba84 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -54,6 +54,7 @@ type Config struct { General struct { AdditionalLogs []LogConfig `yaml:"additional_logs"` BufferSizes BufferSizes `yaml:"buffer_sizes"` + DropOldLogs *bool `yaml:"drop_old_logs"` } } @@ -228,7 +229,13 @@ func validateConfig(config *Config) bool { if config.General.BufferSizes.BroadcastManager <= 0 { config.General.BufferSizes.BroadcastManager = 10000 - } + + // If the cleanup flag is not set, default to true + if config.General.DropOldLogs == nil { + log.Println("drop_old_logs is not set, defaulting to true") + defaultCleanup := true + config.General.DropOldLogs = &defaultCleanup + } return true }