Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
### Added
- Remove old CT logs as soon as they are removed from the Google CT Loglist (#60)
- New configuration for buffer sizes (#58)
### Changed
### Fixed
Expand Down
3 changes: 3 additions & 0 deletions config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ general:
ctlog: 1000
# Combined buffer for the broadcast manager
broadcastmanager: 10000
# Google regularly updates the log list. If this option is set to true, the server will remove all old logs.
# This option defaults to true. See https://github.com/d-Rickyy-b/certstream-server-go/issues/51
drop_old_logs: true
74 changes: 64 additions & 10 deletions internal/certificatetransparency/ct-watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (w *Watcher) Start() {
}

// initialize the watcher with currently available logs
w.addNewlyAvailableLogs()
w.updateLogs()

log.Println("Started CT watcher")
go certHandler(w.certChan)
Expand All @@ -69,33 +69,39 @@ func (w *Watcher) Start() {
// This method is blocking. It can be stopped by cancelling the context.
func (w *Watcher) watchNewLogs() {
// Add all available logs to the watcher
w.addNewlyAvailableLogs()
w.updateLogs()

// Check for new logs once every hour
ticker := time.NewTicker(1 * time.Hour)
for {
select {
case <-ticker.C:
w.addNewlyAvailableLogs()
w.updateLogs()
case <-w.context.Done():
ticker.Stop()
return
}
}
}

// The transparency log list is constantly updated with new Log servers.
// This function checks for new ct logs and adds them to the watcher.
func (w *Watcher) addNewlyAvailableLogs() {
log.Println("Checking for new ct logs...")

func (w *Watcher) updateLogs() {
// Get a list of urls of all CT logs
logList, err := getAllLogs()
if err != nil {
log.Println(err)
return
}

w.addNewlyAvailableLogs(logList)
if *config.AppConfig.General.DropOldLogs {
w.dropRemovedLogs(logList)
}
}

// addNewlyAvailableLogs checks the transparency log list for new Log servers and adds workers for those to the watcher.
func (w *Watcher) addNewlyAvailableLogs(logList loglist3.LogList) {
log.Println("Checking for new ct logs...")

newCTs := 0

// Check the ct log list for new, unwatched logs
Expand All @@ -115,8 +121,6 @@ func (w *Watcher) addNewlyAvailableLogs() {
}
}

// TODO maybe add a check for logs that are still watched but no longer on the logList and remove them? See also issue #41 and #42

// If the log is not being watched, create a new worker
if !alreadyWatched {
w.wg.Add(1)
Expand All @@ -143,6 +147,43 @@ func (w *Watcher) addNewlyAvailableLogs() {
log.Printf("Currently monitored ct logs: %d\n", len(w.workers))
}

// dropRemovedLogs checks if any of the currently monitored logs are no longer in the log list.
// If they are not, the CT Logs are probably no longer relevant and the corresponding workers will be stopped.
func (w *Watcher) dropRemovedLogs(logList loglist3.LogList) {
removedCTs := 0

// Iterate over all workers and check if they are still in the logList
// If they are not, the CT Logs are probably no longer relevant.
// We should stop the worker if that didn't already happen.
for _, ctWorker := range w.workers {
workerURL := normalizeCtlogURL(ctWorker.ctURL)

onLogList := false
for _, operator := range logList.Operators {
// Iterate over each log of the operator
for _, transparencyLog := range operator.Logs {
// Check if the log is already being watched

logListURL := normalizeCtlogURL(transparencyLog.URL)
if workerURL == logListURL {
onLogList = true
break
}
}
}

// If the log is not in the loglist, stop the worker
if !onLogList {
log.Printf("Stopping worker. CT URL not found in LogList: '%s'\n", ctWorker.ctURL)
removedCTs++
ctWorker.stop()
}
}

log.Printf("Removed ct logs: %d\n", removedCTs)
log.Printf("Currently monitored ct logs: %d\n", len(w.workers))
}

// Stop stops the watcher.
func (w *Watcher) Stop() {
log.Printf("Stopping watcher\n")
Expand All @@ -157,10 +198,13 @@ type worker struct {
entryChan chan models.Entry
mu sync.Mutex
running bool
cancel context.CancelFunc
}

// startDownloadingCerts starts downloading certificates from the CT log. This method is blocking.
func (w *worker) startDownloadingCerts(ctx context.Context) {
ctx, w.cancel = context.WithCancel(ctx)

// Normalize CT URL. We remove trailing slashes and prepend "https://" if it's not already there.
w.ctURL = strings.TrimRight(w.ctURL, "/")
if !strings.HasPrefix(w.ctURL, "https://") && !strings.HasPrefix(w.ctURL, "http://") {
Expand All @@ -179,13 +223,15 @@ func (w *worker) startDownloadingCerts(ctx context.Context) {
}

w.running = true
defer func() { w.running = false }()
w.mu.Unlock()

for {
log.Printf("Starting worker for CT log: %s\n", w.ctURL)
workerErr := w.runWorker(ctx)
if workerErr != nil {
if errors.Is(workerErr, errFetchingSTHFailed) {
// TODO this could happen due to a 429 error. We should retry the request
log.Printf("Worker for '%s' failed - could not fetch STH\n", w.ctURL)
return
} else if errors.Is(workerErr, errCreatingClient) {
Expand Down Expand Up @@ -213,6 +259,13 @@ func (w *worker) startDownloadingCerts(ctx context.Context) {
}
}

func (w *worker) stop() {
w.mu.Lock()
defer w.mu.Unlock()

w.cancel()
}

// runWorker runs a single worker for a single CT log. This method is blocking.
func (w *worker) runWorker(ctx context.Context) error {
hc := http.Client{Timeout: 30 * time.Second}
Expand All @@ -224,6 +277,7 @@ func (w *worker) runWorker(ctx context.Context) error {

sth, getSTHerr := jsonClient.GetSTH(ctx)
if getSTHerr != nil {
// TODO this can happen due to a 429 error. We should retry the request
log.Printf("Could not get STH for '%s': %s\n", w.ctURL, getSTHerr)
return errFetchingSTHFailed
}
Expand Down
9 changes: 8 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Config struct {
General struct {
AdditionalLogs []LogConfig `yaml:"additional_logs"`
BufferSizes BufferSizes `yaml:"buffer_sizes"`
DropOldLogs *bool `yaml:"drop_old_logs"`
}
}

Expand Down Expand Up @@ -228,7 +229,13 @@ func validateConfig(config *Config) bool {

if config.General.BufferSizes.BroadcastManager <= 0 {
config.General.BufferSizes.BroadcastManager = 10000
}

// If the cleanup flag is not set, default to true
if config.General.DropOldLogs == nil {
log.Println("drop_old_logs is not set, defaulting to true")
defaultCleanup := true
config.General.DropOldLogs = &defaultCleanup
}

return true
}