Skip to content

Commit e669993

Browse files
authored
Merge pull request #60 from d-Rickyy-b/remove-old-ctlogs
feat: stop workers for outdated ctlogs
2 parents c71e2a6 + 5b09bd4 commit e669993

4 files changed

Lines changed: 76 additions & 11 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99
### Added
10+
- Remove old CT logs as soon as they are removed from the Google CT Loglist (#60)
1011
- New configuration for buffer sizes (#58)
1112
### Changed
1213
### Fixed

config.sample.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,6 @@ general:
3737
ctlog: 1000
3838
# Combined buffer for the broadcast manager
3939
broadcastmanager: 10000
40+
# Google regularly updates the log list. If this option is set to true, the server will remove all old logs.
41+
# This option defaults to true. See https://github.com/d-Rickyy-b/certstream-server-go/issues/51
42+
drop_old_logs: true

internal/certificatetransparency/ct-watcher.go

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (w *Watcher) Start() {
5555
}
5656

5757
// initialize the watcher with currently available logs
58-
w.addNewlyAvailableLogs()
58+
w.updateLogs()
5959

6060
log.Println("Started CT watcher")
6161
go certHandler(w.certChan)
@@ -69,33 +69,39 @@ func (w *Watcher) Start() {
6969
// This method is blocking. It can be stopped by cancelling the context.
7070
func (w *Watcher) watchNewLogs() {
7171
// Add all available logs to the watcher
72-
w.addNewlyAvailableLogs()
72+
w.updateLogs()
7373

7474
// Check for new logs once every hour
7575
ticker := time.NewTicker(1 * time.Hour)
7676
for {
7777
select {
7878
case <-ticker.C:
79-
w.addNewlyAvailableLogs()
79+
w.updateLogs()
8080
case <-w.context.Done():
8181
ticker.Stop()
8282
return
8383
}
8484
}
8585
}
8686

87-
// The transparency log list is constantly updated with new Log servers.
88-
// This function checks for new ct logs and adds them to the watcher.
89-
func (w *Watcher) addNewlyAvailableLogs() {
90-
log.Println("Checking for new ct logs...")
91-
87+
func (w *Watcher) updateLogs() {
9288
// Get a list of urls of all CT logs
9389
logList, err := getAllLogs()
9490
if err != nil {
9591
log.Println(err)
9692
return
9793
}
9894

95+
w.addNewlyAvailableLogs(logList)
96+
if *config.AppConfig.General.DropOldLogs {
97+
w.dropRemovedLogs(logList)
98+
}
99+
}
100+
101+
// addNewlyAvailableLogs checks the transparency log list for new Log servers and adds workers for those to the watcher.
102+
func (w *Watcher) addNewlyAvailableLogs(logList loglist3.LogList) {
103+
log.Println("Checking for new ct logs...")
104+
99105
newCTs := 0
100106

101107
// Check the ct log list for new, unwatched logs
@@ -115,8 +121,6 @@ func (w *Watcher) addNewlyAvailableLogs() {
115121
}
116122
}
117123

118-
// TODO maybe add a check for logs that are still watched but no longer on the logList and remove them? See also issue #41 and #42
119-
120124
// If the log is not being watched, create a new worker
121125
if !alreadyWatched {
122126
w.wg.Add(1)
@@ -143,6 +147,43 @@ func (w *Watcher) addNewlyAvailableLogs() {
143147
log.Printf("Currently monitored ct logs: %d\n", len(w.workers))
144148
}
145149

150+
// dropRemovedLogs checks if any of the currently monitored logs are no longer in the log list.
151+
// If they are not, the CT Logs are probably no longer relevant and the corresponding workers will be stopped.
152+
func (w *Watcher) dropRemovedLogs(logList loglist3.LogList) {
153+
removedCTs := 0
154+
155+
// Iterate over all workers and check if they are still in the logList
156+
// If they are not, the CT Logs are probably no longer relevant.
157+
// We should stop the worker if that didn't already happen.
158+
for _, ctWorker := range w.workers {
159+
workerURL := normalizeCtlogURL(ctWorker.ctURL)
160+
161+
onLogList := false
162+
for _, operator := range logList.Operators {
163+
// Iterate over each log of the operator
164+
for _, transparencyLog := range operator.Logs {
165+
// Check if the log is already being watched
166+
167+
logListURL := normalizeCtlogURL(transparencyLog.URL)
168+
if workerURL == logListURL {
169+
onLogList = true
170+
break
171+
}
172+
}
173+
}
174+
175+
// If the log is not in the loglist, stop the worker
176+
if !onLogList {
177+
log.Printf("Stopping worker. CT URL not found in LogList: '%s'\n", ctWorker.ctURL)
178+
removedCTs++
179+
ctWorker.stop()
180+
}
181+
}
182+
183+
log.Printf("Removed ct logs: %d\n", removedCTs)
184+
log.Printf("Currently monitored ct logs: %d\n", len(w.workers))
185+
}
186+
146187
// Stop stops the watcher.
147188
func (w *Watcher) Stop() {
148189
log.Printf("Stopping watcher\n")
@@ -157,10 +198,13 @@ type worker struct {
157198
entryChan chan models.Entry
158199
mu sync.Mutex
159200
running bool
201+
cancel context.CancelFunc
160202
}
161203

162204
// startDownloadingCerts starts downloading certificates from the CT log. This method is blocking.
163205
func (w *worker) startDownloadingCerts(ctx context.Context) {
206+
ctx, w.cancel = context.WithCancel(ctx)
207+
164208
// Normalize CT URL. We remove trailing slashes and prepend "https://" if it's not already there.
165209
w.ctURL = strings.TrimRight(w.ctURL, "/")
166210
if !strings.HasPrefix(w.ctURL, "https://") && !strings.HasPrefix(w.ctURL, "http://") {
@@ -179,13 +223,15 @@ func (w *worker) startDownloadingCerts(ctx context.Context) {
179223
}
180224

181225
w.running = true
226+
defer func() { w.running = false }()
182227
w.mu.Unlock()
183228

184229
for {
185230
log.Printf("Starting worker for CT log: %s\n", w.ctURL)
186231
workerErr := w.runWorker(ctx)
187232
if workerErr != nil {
188233
if errors.Is(workerErr, errFetchingSTHFailed) {
234+
// TODO this could happen due to a 429 error. We should retry the request
189235
log.Printf("Worker for '%s' failed - could not fetch STH\n", w.ctURL)
190236
return
191237
} else if errors.Is(workerErr, errCreatingClient) {
@@ -213,6 +259,13 @@ func (w *worker) startDownloadingCerts(ctx context.Context) {
213259
}
214260
}
215261

262+
func (w *worker) stop() {
263+
w.mu.Lock()
264+
defer w.mu.Unlock()
265+
266+
w.cancel()
267+
}
268+
216269
// runWorker runs a single worker for a single CT log. This method is blocking.
217270
func (w *worker) runWorker(ctx context.Context) error {
218271
hc := http.Client{Timeout: 30 * time.Second}
@@ -224,6 +277,7 @@ func (w *worker) runWorker(ctx context.Context) error {
224277

225278
sth, getSTHerr := jsonClient.GetSTH(ctx)
226279
if getSTHerr != nil {
280+
// TODO this can happen due to a 429 error. We should retry the request
227281
log.Printf("Could not get STH for '%s': %s\n", w.ctURL, getSTHerr)
228282
return errFetchingSTHFailed
229283
}

internal/config/config.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type Config struct {
5454
General struct {
5555
AdditionalLogs []LogConfig `yaml:"additional_logs"`
5656
BufferSizes BufferSizes `yaml:"buffer_sizes"`
57+
DropOldLogs *bool `yaml:"drop_old_logs"`
5758
}
5859
}
5960

@@ -228,7 +229,13 @@ func validateConfig(config *Config) bool {
228229

229230
if config.General.BufferSizes.BroadcastManager <= 0 {
230231
config.General.BufferSizes.BroadcastManager = 10000
231-
}
232+
233+
// If the cleanup flag is not set, default to true
234+
if config.General.DropOldLogs == nil {
235+
log.Println("drop_old_logs is not set, defaulting to true")
236+
defaultCleanup := true
237+
config.General.DropOldLogs = &defaultCleanup
238+
}
232239

233240
return true
234241
}

0 commit comments

Comments
 (0)