Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Temp files
*.tmp
*.temp

# Dependency directories (remove the comment below to include it)
# vendor/

Expand All @@ -26,3 +30,4 @@ dist/
# Ignore actual config files
config.yaml
config.yml
ct_index.json
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
### Added
- Ability to store index of most recent cert per log to resume processing from that point after a restart (#49)
- New CLI switch for creating an index file from a CT log (#49)
### Changed
### Removed
### Fixed
Expand Down
17 changes: 17 additions & 0 deletions cmd/certstream-server-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
func main() {
configFile := flag.String("config", "config.yml", "path to the config file")
versionFlag := flag.Bool("version", false, "Print the version and exit")
createIndexFile := flag.Bool("create-index-file", false, "Create the ct_index.json based on current STHs")
flag.Parse()

if *versionFlag {
Expand All @@ -21,6 +22,22 @@ func main() {
}

log.SetFlags(log.LstdFlags | log.Lshortfile)

// If the user only wants to create the index file, we don't need to start the server
if *createIndexFile {
conf, readConfErr := config.ReadConfig(*configFile)
if readConfErr != nil {
log.Fatalf("Error while reading config: %v", readConfErr)
}
cs := certstream.NewRawCertstream(conf)

createErr := cs.CreateIndexFile()
if createErr != nil {
log.Fatalf("Error while creating index file: %v", createErr)
}
return
}

log.Printf("Starting certstream-server-go v%s\n", config.Version)

cs, err := certstream.NewCertstreamFromConfigFile(*configFile)
Expand Down
14 changes: 14 additions & 0 deletions config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ general:
- url: https://dodo.ct.comodo.com
operator: "Comodo"
description: "Comodo Dodo"

# To optimize the performance of the server, you can overwrite the size of different buffers
# For low CPU, low memory machines, you should reduce the buffer sizes to save memory in case the CPU is maxed.
buffer_sizes:
Expand All @@ -38,6 +39,19 @@ general:
ctlog: 1000
# Combined buffer for the broadcast manager
broadcastmanager: 10000

# Google regularly updates the log list. If this option is set to true, the server will remove all logs no longer listed in the Google log list.
# This option defaults to true. See https://github.com/d-Rickyy-b/certstream-server-go/issues/51
drop_old_logs: true

# Options for resuming certificate downloads after restart
recovery:
# If enabled, the server will resume downloading certificates from the last processed and stored index for each log.
# If there is no ct_index_file or for a specific log there is no index entry, the server will start from index 0.
# Be aware that this leads to a massive number of certificates being downloaded.
# Depending on your server's performance and network connection, this could be up to 10.000 certificates per second.
# Make sure your infrastructure can handle this!
enabled: true
# Path to the file where indices are stored. Be aware that a temp file in the same path with the same name and ".tmp" as suffix will be created.
# If there are no write permissions to the path, the server will not be able to store the indices.
ct_index_file: "./ct_index.json"
86 changes: 72 additions & 14 deletions internal/certificatetransparency/ct-watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"log"
"net/http"
"path/filepath"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -54,6 +55,18 @@ func (w *Watcher) Start() {
w.certChan = make(chan models.Entry, 5000)
}

if config.AppConfig.General.Recovery.Enabled {
ctIndexFilePath, err := filepath.Abs(config.AppConfig.General.Recovery.CTIndexFile)
if err != nil {
log.Printf("Error getting absolute path for CT index file: '%s', %s\n", config.AppConfig.General.Recovery.CTIndexFile, err)
return
}
// Load Saved CT Indexes
metrics.LoadCTIndex(ctIndexFilePath)
// Save CTIndexes at regular intervals
go metrics.SaveCertIndexesAtInterval(time.Second*30, ctIndexFilePath) // save indexes every X seconds
}

// initialize the watcher with currently available logs
w.updateLogs()

Expand Down Expand Up @@ -126,13 +139,16 @@ func (w *Watcher) addNewlyAvailableLogs(logList loglist3.LogList) {
w.wg.Add(1)
newCTs++

lastCTIndex := metrics.GetCTIndex(transparencyLog.URL)
Comment thread
messede-degod marked this conversation as resolved.
ctWorker := worker{
name: transparencyLog.Description,
operatorName: operator.Name,
ctURL: transparencyLog.URL,
entryChan: w.certChan,
ctIndex: lastCTIndex,
}
w.workers = append(w.workers, &ctWorker)
metrics.Init(operator.Name, transparencyLog.URL)

// Start a goroutine for each worker
go func() {
Expand Down Expand Up @@ -199,12 +215,55 @@ func (w *Watcher) Stop() {
w.cancelFunc()
}

// CreateIndexFile creates a ct_index.json file based on the current STHs of all availble logs.
func (w *Watcher) CreateIndexFile(filePath string) error {
logs, err := getAllLogs()
if err != nil {
return err
}

w.context, w.cancelFunc = context.WithCancel(context.Background())
log.Println("Fetching current STH for all logs...")
for _, operator := range logs.Operators {
// Iterate over each log of the operator
for _, transparencyLog := range operator.Logs {
// Check if the log is already being watched
metrics.Init(operator.Name, transparencyLog.URL)
log.Println("Fetching STH for", transparencyLog.URL)

hc := http.Client{Timeout: 5 * time.Second}
jsonClient, e := client.New(transparencyLog.URL, &hc, jsonclient.Options{UserAgent: userAgent})
if e != nil {
log.Printf("Error creating JSON client: %s\n", e)
continue
}

sth, getSTHerr := jsonClient.GetSTH(w.context)
if getSTHerr != nil {
// TODO this can happen due to a 429 error. We should retry the request
log.Printf("Could not get STH for '%s': %s\n", transparencyLog.URL, getSTHerr)
continue
}

metrics.index[transparencyLog.URL] = int64(sth.TreeSize)
}
}
w.cancelFunc()

tempFilePath := fmt.Sprintf("%s.tmp", filePath)
metrics.SaveCertIndexes(tempFilePath, filePath)
log.Println("Index file saved to", filePath)

return nil
}

// A worker processes a single CT log.
type worker struct {
name string
operatorName string
ctURL string
entryChan chan models.Entry
ctIndex int64
mu sync.Mutex
running bool
cancel context.CancelFunc
Expand Down Expand Up @@ -284,18 +343,24 @@ func (w *worker) runWorker(ctx context.Context) error {
return errCreatingClient
}

sth, getSTHerr := jsonClient.GetSTH(ctx)
if getSTHerr != nil {
// If recovery is enabled and the CT index is set, we start at the saved index. Otherwise we start at the latest STH.
validSavedCTIndexExists := config.AppConfig.General.Recovery.Enabled && w.ctIndex >= 0
if !validSavedCTIndexExists {
sth, getSTHerr := jsonClient.GetSTH(ctx)
if getSTHerr != nil {
// TODO this can happen due to a 429 error. We should retry the request
log.Printf("Could not get STH for '%s': %s\n", w.ctURL, getSTHerr)
return errFetchingSTHFailed
log.Printf("Could not get STH for '%s': %s\n", w.ctURL, getSTHerr)
return errFetchingSTHFailed
}
// Start at the latest STH to skip all the past certificates
w.ctIndex = int64(sth.TreeSize)
}

certScanner := scanner.NewScanner(jsonClient, scanner.ScannerOptions{
FetcherOptions: scanner.FetcherOptions{
BatchSize: 100,
ParallelFetch: 1,
StartIndex: int64(sth.TreeSize), // Start at the latest STH to skip all the past certificates
StartIndex: w.ctIndex,
Continuous: true,
},
Matcher: scanner.MatchAll{},
Expand Down Expand Up @@ -364,8 +429,9 @@ func certHandler(entryChan chan models.Entry) {
// Update metrics
url := entry.Data.Source.NormalizedURL
operator := entry.Data.Source.Operator
index := entry.Data.CertIndex

metrics.Inc(operator, url)
metrics.Inc(operator, url, index)
}
}

Expand Down Expand Up @@ -421,14 +487,6 @@ func getAllLogs() (loglist3.LogList, error) {
}
}

// Add new ct logs to metrics
for _, operator := range allLogs.Operators {
for _, ctlog := range operator.Logs {
url := normalizeCtlogURL(ctlog.URL)
metrics.Init(operator.Name, url)
}
}

return *allLogs, nil
}

Expand Down
Loading