Skip to content

Commit bf24abb

Browse files
authored
Merge pull request #49 from messede-degod/feat/ct-index-save-restore
Ability to persist CT indexes and resume from the saved index file
2 parents 8d71967 + d646312 commit bf24abb

8 files changed

Lines changed: 283 additions & 17 deletions

File tree

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
# Output of the go coverage tool, specifically when used with LiteIDE
1313
*.out
1414

15+
# Temp files
16+
*.tmp
17+
*.temp
18+
1519
# Dependency directories (remove the comment below to include it)
1620
# vendor/
1721

@@ -26,3 +30,4 @@ dist/
2630
# Ignore actual config files
2731
config.yaml
2832
config.yml
33+
ct_index.json

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99
### Added
10+
- Ability to store index of most recent cert per log to resume processing from that point after a restart (#49)
11+
- New CLI switch for creating an index file from a CT log (#49)
1012
### Changed
1113
### Removed
1214
### Fixed

cmd/certstream-server-go/main.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
func main() {
1414
configFile := flag.String("config", "config.yml", "path to the config file")
1515
versionFlag := flag.Bool("version", false, "Print the version and exit")
16+
createIndexFile := flag.Bool("create-index-file", false, "Create the ct_index.json based on current STHs")
1617
flag.Parse()
1718

1819
if *versionFlag {
@@ -21,6 +22,22 @@ func main() {
2122
}
2223

2324
log.SetFlags(log.LstdFlags | log.Lshortfile)
25+
26+
// If the user only wants to create the index file, we don't need to start the server
27+
if *createIndexFile {
28+
conf, readConfErr := config.ReadConfig(*configFile)
29+
if readConfErr != nil {
30+
log.Fatalf("Error while reading config: %v", readConfErr)
31+
}
32+
cs := certstream.NewRawCertstream(conf)
33+
34+
createErr := cs.CreateIndexFile()
35+
if createErr != nil {
36+
log.Fatalf("Error while creating index file: %v", createErr)
37+
}
38+
return
39+
}
40+
2441
log.Printf("Starting certstream-server-go v%s\n", config.Version)
2542

2643
cs, err := certstream.NewCertstreamFromConfigFile(*configFile)

config.sample.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ general:
2929
- url: https://dodo.ct.comodo.com
3030
operator: "Comodo"
3131
description: "Comodo Dodo"
32+
3233
# To optimize the performance of the server, you can overwrite the size of different buffers
3334
# For low CPU, low memory machines, you should reduce the buffer sizes to save memory in case the CPU is maxed.
3435
buffer_sizes:
@@ -38,6 +39,19 @@ general:
3839
ctlog: 1000
3940
# Combined buffer for the broadcast manager
4041
broadcastmanager: 10000
42+
4143
# Google regularly updates the log list. If this option is set to true, the server will remove all logs no longer listed in the Google log list.
4244
# This option defaults to true. See https://github.com/d-Rickyy-b/certstream-server-go/issues/51
4345
drop_old_logs: true
46+
47+
# Options for resuming certificate downloads after restart
48+
recovery:
49+
# If enabled, the server will resume downloading certificates from the last processed and stored index for each log.
50+
# If there is no ct_index_file or for a specific log there is no index entry, the server will start from index 0.
51+
# Be aware that this leads to a massive number of certificates being downloaded.
52+
# Depending on your server's performance and network connection, this could be up to 10.000 certificates per second.
53+
# Make sure your infrastructure can handle this!
54+
enabled: true
55+
# Path to the file where indices are stored. Be aware that a temp file in the same path with the same name and ".tmp" as suffix will be created.
56+
# If there are no write permissions to the path, the server will not be able to store the indices.
57+
ct_index_file: "./ct_index.json"

internal/certificatetransparency/ct-watcher.go

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"log"
99
"net/http"
10+
"path/filepath"
1011
"strings"
1112
"sync"
1213
"sync/atomic"
@@ -54,6 +55,18 @@ func (w *Watcher) Start() {
5455
w.certChan = make(chan models.Entry, 5000)
5556
}
5657

58+
if config.AppConfig.General.Recovery.Enabled {
59+
ctIndexFilePath, err := filepath.Abs(config.AppConfig.General.Recovery.CTIndexFile)
60+
if err != nil {
61+
log.Printf("Error getting absolute path for CT index file: '%s', %s\n", config.AppConfig.General.Recovery.CTIndexFile, err)
62+
return
63+
}
64+
// Load Saved CT Indexes
65+
metrics.LoadCTIndex(ctIndexFilePath)
66+
// Save CTIndexes at regular intervals
67+
go metrics.SaveCertIndexesAtInterval(time.Second*30, ctIndexFilePath) // save indexes every X seconds
68+
}
69+
5770
// initialize the watcher with currently available logs
5871
w.updateLogs()
5972

@@ -126,13 +139,16 @@ func (w *Watcher) addNewlyAvailableLogs(logList loglist3.LogList) {
126139
w.wg.Add(1)
127140
newCTs++
128141

142+
lastCTIndex := metrics.GetCTIndex(transparencyLog.URL)
129143
ctWorker := worker{
130144
name: transparencyLog.Description,
131145
operatorName: operator.Name,
132146
ctURL: transparencyLog.URL,
133147
entryChan: w.certChan,
148+
ctIndex: lastCTIndex,
134149
}
135150
w.workers = append(w.workers, &ctWorker)
151+
metrics.Init(operator.Name, transparencyLog.URL)
136152

137153
// Start a goroutine for each worker
138154
go func() {
@@ -199,12 +215,55 @@ func (w *Watcher) Stop() {
199215
w.cancelFunc()
200216
}
201217

218+
// CreateIndexFile creates a ct_index.json file based on the current STHs of all availble logs.
219+
func (w *Watcher) CreateIndexFile(filePath string) error {
220+
logs, err := getAllLogs()
221+
if err != nil {
222+
return err
223+
}
224+
225+
w.context, w.cancelFunc = context.WithCancel(context.Background())
226+
log.Println("Fetching current STH for all logs...")
227+
for _, operator := range logs.Operators {
228+
// Iterate over each log of the operator
229+
for _, transparencyLog := range operator.Logs {
230+
// Check if the log is already being watched
231+
metrics.Init(operator.Name, transparencyLog.URL)
232+
log.Println("Fetching STH for", transparencyLog.URL)
233+
234+
hc := http.Client{Timeout: 5 * time.Second}
235+
jsonClient, e := client.New(transparencyLog.URL, &hc, jsonclient.Options{UserAgent: userAgent})
236+
if e != nil {
237+
log.Printf("Error creating JSON client: %s\n", e)
238+
continue
239+
}
240+
241+
sth, getSTHerr := jsonClient.GetSTH(w.context)
242+
if getSTHerr != nil {
243+
// TODO this can happen due to a 429 error. We should retry the request
244+
log.Printf("Could not get STH for '%s': %s\n", transparencyLog.URL, getSTHerr)
245+
continue
246+
}
247+
248+
metrics.index[transparencyLog.URL] = int64(sth.TreeSize)
249+
}
250+
}
251+
w.cancelFunc()
252+
253+
tempFilePath := fmt.Sprintf("%s.tmp", filePath)
254+
metrics.SaveCertIndexes(tempFilePath, filePath)
255+
log.Println("Index file saved to", filePath)
256+
257+
return nil
258+
}
259+
202260
// A worker processes a single CT log.
203261
type worker struct {
204262
name string
205263
operatorName string
206264
ctURL string
207265
entryChan chan models.Entry
266+
ctIndex int64
208267
mu sync.Mutex
209268
running bool
210269
cancel context.CancelFunc
@@ -284,18 +343,24 @@ func (w *worker) runWorker(ctx context.Context) error {
284343
return errCreatingClient
285344
}
286345

287-
sth, getSTHerr := jsonClient.GetSTH(ctx)
288-
if getSTHerr != nil {
346+
// If recovery is enabled and the CT index is set, we start at the saved index. Otherwise we start at the latest STH.
347+
validSavedCTIndexExists := config.AppConfig.General.Recovery.Enabled && w.ctIndex >= 0
348+
if !validSavedCTIndexExists {
349+
sth, getSTHerr := jsonClient.GetSTH(ctx)
350+
if getSTHerr != nil {
289351
// TODO this can happen due to a 429 error. We should retry the request
290-
log.Printf("Could not get STH for '%s': %s\n", w.ctURL, getSTHerr)
291-
return errFetchingSTHFailed
352+
log.Printf("Could not get STH for '%s': %s\n", w.ctURL, getSTHerr)
353+
return errFetchingSTHFailed
354+
}
355+
// Start at the latest STH to skip all the past certificates
356+
w.ctIndex = int64(sth.TreeSize)
292357
}
293358

294359
certScanner := scanner.NewScanner(jsonClient, scanner.ScannerOptions{
295360
FetcherOptions: scanner.FetcherOptions{
296361
BatchSize: 100,
297362
ParallelFetch: 1,
298-
StartIndex: int64(sth.TreeSize), // Start at the latest STH to skip all the past certificates
363+
StartIndex: w.ctIndex,
299364
Continuous: true,
300365
},
301366
Matcher: scanner.MatchAll{},
@@ -364,8 +429,9 @@ func certHandler(entryChan chan models.Entry) {
364429
// Update metrics
365430
url := entry.Data.Source.NormalizedURL
366431
operator := entry.Data.Source.Operator
432+
index := entry.Data.CertIndex
367433

368-
metrics.Inc(operator, url)
434+
metrics.Inc(operator, url, index)
369435
}
370436
}
371437

@@ -421,14 +487,6 @@ func getAllLogs() (loglist3.LogList, error) {
421487
}
422488
}
423489

424-
// Add new ct logs to metrics
425-
for _, operator := range allLogs.Operators {
426-
for _, ctlog := range operator.Logs {
427-
url := normalizeCtlogURL(ctlog.URL)
428-
metrics.Init(operator.Name, url)
429-
}
430-
}
431-
432490
return *allLogs, nil
433491
}
434492

0 commit comments

Comments
 (0)