diff --git a/.gitignore b/.gitignore index 9823e0e..daf149d 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,10 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out +# Temp files +*.tmp +*.temp + # Dependency directories (remove the comment below to include it) # vendor/ @@ -26,3 +30,4 @@ dist/ # Ignore actual config files config.yaml config.yml +ct_index.json diff --git a/CHANGELOG.md b/CHANGELOG.md index a6ea8d9..f421cec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Ability to store index of most recent cert per log to resume processing from that point after a restart (#49) +- New CLI switch for creating an index file from a CT log (#49) ### Changed ### Removed ### Fixed diff --git a/cmd/certstream-server-go/main.go b/cmd/certstream-server-go/main.go index f415daa..217fdc6 100644 --- a/cmd/certstream-server-go/main.go +++ b/cmd/certstream-server-go/main.go @@ -13,6 +13,7 @@ import ( func main() { configFile := flag.String("config", "config.yml", "path to the config file") versionFlag := flag.Bool("version", false, "Print the version and exit") + createIndexFile := flag.Bool("create-index-file", false, "Create the ct_index.json based on current STHs") flag.Parse() if *versionFlag { @@ -21,6 +22,22 @@ func main() { } log.SetFlags(log.LstdFlags | log.Lshortfile) + + // If the user only wants to create the index file, we don't need to start the server + if *createIndexFile { + conf, readConfErr := config.ReadConfig(*configFile) + if readConfErr != nil { + log.Fatalf("Error while reading config: %v", readConfErr) + } + cs := certstream.NewRawCertstream(conf) + + createErr := cs.CreateIndexFile() + if createErr != nil { + log.Fatalf("Error while creating index file: %v", createErr) + } + return + } + log.Printf("Starting certstream-server-go v%s\n", config.Version) cs, err := certstream.NewCertstreamFromConfigFile(*configFile) diff --git a/config.sample.yaml b/config.sample.yaml index 9773f1e..cb8a67e 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -29,6 +29,7 @@ general: - url: https://dodo.ct.comodo.com operator: "Comodo" description: "Comodo Dodo" + # To optimize the performance of the server, you can overwrite the size of different buffers # For low CPU, low memory machines, you should reduce the buffer sizes to save memory in case the CPU is maxed. buffer_sizes: @@ -38,6 +39,19 @@ general: ctlog: 1000 # Combined buffer for the broadcast manager broadcastmanager: 10000 + # Google regularly updates the log list. If this option is set to true, the server will remove all logs no longer listed in the Google log list. # This option defaults to true. See https://github.com/d-Rickyy-b/certstream-server-go/issues/51 drop_old_logs: true + + # Options for resuming certificate downloads after restart + recovery: + # If enabled, the server will resume downloading certificates from the last processed and stored index for each log. + # If there is no ct_index_file or for a specific log there is no index entry, the server will start from index 0. + # Be aware that this leads to a massive number of certificates being downloaded. + # Depending on your server's performance and network connection, this could be up to 10.000 certificates per second. + # Make sure your infrastructure can handle this! + enabled: true + # Path to the file where indices are stored. Be aware that a temp file in the same path with the same name and ".tmp" as suffix will be created. + # If there are no write permissions to the path, the server will not be able to store the indices. + ct_index_file: "./ct_index.json" diff --git a/internal/certificatetransparency/ct-watcher.go b/internal/certificatetransparency/ct-watcher.go index b383ff6..6e0b5e3 100644 --- a/internal/certificatetransparency/ct-watcher.go +++ b/internal/certificatetransparency/ct-watcher.go @@ -7,6 +7,7 @@ import ( "io" "log" "net/http" + "path/filepath" "strings" "sync" "sync/atomic" @@ -54,6 +55,18 @@ func (w *Watcher) Start() { w.certChan = make(chan models.Entry, 5000) } + if config.AppConfig.General.Recovery.Enabled { + ctIndexFilePath, err := filepath.Abs(config.AppConfig.General.Recovery.CTIndexFile) + if err != nil { + log.Printf("Error getting absolute path for CT index file: '%s', %s\n", config.AppConfig.General.Recovery.CTIndexFile, err) + return + } + // Load Saved CT Indexes + metrics.LoadCTIndex(ctIndexFilePath) + // Save CTIndexes at regular intervals + go metrics.SaveCertIndexesAtInterval(time.Second*30, ctIndexFilePath) // save indexes every X seconds + } + // initialize the watcher with currently available logs w.updateLogs() @@ -126,13 +139,16 @@ func (w *Watcher) addNewlyAvailableLogs(logList loglist3.LogList) { w.wg.Add(1) newCTs++ + lastCTIndex := metrics.GetCTIndex(transparencyLog.URL) ctWorker := worker{ name: transparencyLog.Description, operatorName: operator.Name, ctURL: transparencyLog.URL, entryChan: w.certChan, + ctIndex: lastCTIndex, } w.workers = append(w.workers, &ctWorker) + metrics.Init(operator.Name, transparencyLog.URL) // Start a goroutine for each worker go func() { @@ -199,12 +215,55 @@ func (w *Watcher) Stop() { w.cancelFunc() } +// CreateIndexFile creates a ct_index.json file based on the current STHs of all availble logs. +func (w *Watcher) CreateIndexFile(filePath string) error { + logs, err := getAllLogs() + if err != nil { + return err + } + + w.context, w.cancelFunc = context.WithCancel(context.Background()) + log.Println("Fetching current STH for all logs...") + for _, operator := range logs.Operators { + // Iterate over each log of the operator + for _, transparencyLog := range operator.Logs { + // Check if the log is already being watched + metrics.Init(operator.Name, transparencyLog.URL) + log.Println("Fetching STH for", transparencyLog.URL) + + hc := http.Client{Timeout: 5 * time.Second} + jsonClient, e := client.New(transparencyLog.URL, &hc, jsonclient.Options{UserAgent: userAgent}) + if e != nil { + log.Printf("Error creating JSON client: %s\n", e) + continue + } + + sth, getSTHerr := jsonClient.GetSTH(w.context) + if getSTHerr != nil { + // TODO this can happen due to a 429 error. We should retry the request + log.Printf("Could not get STH for '%s': %s\n", transparencyLog.URL, getSTHerr) + continue + } + + metrics.index[transparencyLog.URL] = int64(sth.TreeSize) + } + } + w.cancelFunc() + + tempFilePath := fmt.Sprintf("%s.tmp", filePath) + metrics.SaveCertIndexes(tempFilePath, filePath) + log.Println("Index file saved to", filePath) + + return nil +} + // A worker processes a single CT log. type worker struct { name string operatorName string ctURL string entryChan chan models.Entry + ctIndex int64 mu sync.Mutex running bool cancel context.CancelFunc @@ -284,18 +343,24 @@ func (w *worker) runWorker(ctx context.Context) error { return errCreatingClient } - sth, getSTHerr := jsonClient.GetSTH(ctx) - if getSTHerr != nil { + // If recovery is enabled and the CT index is set, we start at the saved index. Otherwise we start at the latest STH. + validSavedCTIndexExists := config.AppConfig.General.Recovery.Enabled && w.ctIndex >= 0 + if !validSavedCTIndexExists { + sth, getSTHerr := jsonClient.GetSTH(ctx) + if getSTHerr != nil { // TODO this can happen due to a 429 error. We should retry the request - log.Printf("Could not get STH for '%s': %s\n", w.ctURL, getSTHerr) - return errFetchingSTHFailed + log.Printf("Could not get STH for '%s': %s\n", w.ctURL, getSTHerr) + return errFetchingSTHFailed + } + // Start at the latest STH to skip all the past certificates + w.ctIndex = int64(sth.TreeSize) } certScanner := scanner.NewScanner(jsonClient, scanner.ScannerOptions{ FetcherOptions: scanner.FetcherOptions{ BatchSize: 100, ParallelFetch: 1, - StartIndex: int64(sth.TreeSize), // Start at the latest STH to skip all the past certificates + StartIndex: w.ctIndex, Continuous: true, }, Matcher: scanner.MatchAll{}, @@ -364,8 +429,9 @@ func certHandler(entryChan chan models.Entry) { // Update metrics url := entry.Data.Source.NormalizedURL operator := entry.Data.Source.Operator + index := entry.Data.CertIndex - metrics.Inc(operator, url) + metrics.Inc(operator, url, index) } } @@ -421,14 +487,6 @@ func getAllLogs() (loglist3.LogList, error) { } } - // Add new ct logs to metrics - for _, operator := range allLogs.Operators { - for _, ctlog := range operator.Logs { - url := normalizeCtlogURL(ctlog.URL) - metrics.Init(operator.Name, url) - } - } - return *allLogs, nil } diff --git a/internal/certificatetransparency/logmetrics.go b/internal/certificatetransparency/logmetrics.go index b47e055..3df657e 100644 --- a/internal/certificatetransparency/logmetrics.go +++ b/internal/certificatetransparency/logmetrics.go @@ -1,6 +1,13 @@ package certificatetransparency -import "sync" +import ( + "encoding/json" + "fmt" + "log" + "os" + "sync" + "time" +) type ( // OperatorLogs is a map of operator names to a list of CT log urls, operated by said operator. @@ -9,12 +16,14 @@ type ( OperatorMetric map[string]int64 // CTMetrics is a map of operator names to a map of CT log urls to the number of certs processed by said log. CTMetrics map[string]OperatorMetric + // CTCertIndex is a map of CT log urls to the last processed certficate index on the said log + CTCertIndex map[string]int64 ) var ( processedCerts int64 processedPrecerts int64 - metrics = LogMetrics{metrics: make(CTMetrics)} + metrics = LogMetrics{metrics: make(CTMetrics), index: make(CTCertIndex)} ) // LogMetrics is a struct that holds a map of metrics for each CT log grouped by operator. @@ -22,6 +31,7 @@ var ( type LogMetrics struct { mutex sync.RWMutex metrics CTMetrics + index CTCertIndex } // GetCTMetrics returns a copy of the internal metrics map. @@ -76,6 +86,11 @@ func (m *LogMetrics) Init(operator, url string) { if _, ok := m.metrics[operator][url]; !ok { m.metrics[operator][url] = 0 } + + // if url index does not exist, create a new entry + if _, ok := m.index[url]; !ok { + m.index[url] = 0 + } } // Get the metric for a given operator and ct url. @@ -104,7 +119,7 @@ func (m *LogMetrics) Set(operator, url string, value int64) { } // Inc the metric for a given operator and ct url. -func (m *LogMetrics) Inc(operator, url string) { +func (m *LogMetrics) Inc(operator, url string, index int64) { m.mutex.Lock() defer m.mutex.Unlock() @@ -113,6 +128,135 @@ func (m *LogMetrics) Inc(operator, url string) { } m.metrics[operator][url]++ + + m.index[url] = index +} + +func (m *LogMetrics) GetAllCTIndexes() CTCertIndex { + m.mutex.RLock() + defer m.mutex.RUnlock() + + // make a copy of the index and return it + // since map is a refrence type + copyOfIndex := make(map[string]int64) + for k, v := range m.index { + copyOfIndex[k] = v + } + + return copyOfIndex +} + +func (m *LogMetrics) GetCTIndex(url string) int64 { + m.mutex.RLock() + defer m.mutex.RUnlock() + + index, ok := m.index[url] + if !ok { + return 0 + } + + return index +} + +// LoadCTIndex loads the last cert index processed for each CT url if it exists +func (m *LogMetrics) LoadCTIndex(ctIndexFilePath string) { + m.mutex.Lock() + defer m.mutex.Unlock() + + bytes, readErr := os.ReadFile(ctIndexFilePath) + if readErr != nil { + // Create the file if it doesn't exist + if os.IsNotExist(readErr) { + log.Printf("Specified CT index file does not exist: '%s'\n", ctIndexFilePath) + log.Println("Creating CT index file now!") + file, createErr := os.Create(ctIndexFilePath) + if createErr != nil { + log.Printf("Error creating CT index file: '%s'\n", ctIndexFilePath) + log.Panicln(createErr) + } + + var marshalErr error + bytes, marshalErr = json.Marshal(m.index) + if marshalErr != nil { + return + } + _, writeErr := file.Write(bytes) + if writeErr != nil { + log.Printf("Error writing to CT index file: '%s'\n", ctIndexFilePath) + log.Panicln(writeErr) + } + file.Close() + } else { + // If the file exists but we can't read it, log the error and panic + log.Panicln(readErr) + } + } + + jerr := json.Unmarshal(bytes, &m.index) + if jerr != nil { + log.Printf("Error unmarshalling CT index file: '%s'\n", ctIndexFilePath) + log.Panicln(jerr) + } + + log.Println("Sucessfuly loaded saved CT indexes") +} + +// SaveCertIndexesAtInterval saves the index of CTLogs at given intervals. +// We first create a temp file and write the index data to it. Only then do we move the temp file to the actual +// permanent index file. This prevents the last good index file from being clobbered if the program was shutdown/killed +// in-between the write operation. +func (m *LogMetrics) SaveCertIndexesAtInterval(interval time.Duration, ctIndexFilePath string) { + tempFilePath := fmt.Sprintf("%s.tmp", ctIndexFilePath) + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for range ticker.C { + m.SaveCertIndexes(tempFilePath, ctIndexFilePath) + } +} + +// SaveCertIndexes saves the index of CTLogs to a file. +func (m *LogMetrics) SaveCertIndexes(tempFilePath, ctIndexFilePath string) { + // Get the index data + ctIndex := m.GetAllCTIndexes() + bytes, cerr := json.MarshalIndent(ctIndex, "", " ") + if cerr != nil { + log.Panic(cerr) + } + + // Save data to a temporary file first + file, openErr := os.OpenFile(tempFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if openErr != nil { + log.Println("Could not save CT index to temporary file: ", openErr) + return + } + + truncateErr := file.Truncate(0) + if truncateErr != nil { + log.Println("Error truncating CT index temp file: ", truncateErr) + return + } + // TODO: check for short writes + _, writeErr := file.Write(bytes) + if writeErr != nil { + log.Println("Error writing to CT index temp file: ", writeErr) + return + } + syncErr := file.Sync() + if syncErr != nil { + log.Println("Error syncing CT index temp file: ", syncErr) + return + } + + file.Close() + + // Atomically move the temp file to the permanent file + renameErr := os.Rename(tempFilePath, ctIndexFilePath) + if renameErr != nil { + log.Println("Error renaming CT index temp file: ", renameErr) + return + } } func GetProcessedCerts() int64 { diff --git a/internal/certstream/certstream.go b/internal/certstream/certstream.go index 4e6b694..dd958f3 100644 --- a/internal/certstream/certstream.go +++ b/internal/certstream/certstream.go @@ -23,6 +23,13 @@ type Certstream struct { config config.Config } +func NewRawCertstream(config config.Config) *Certstream { + cs := Certstream{} + cs.config = config + + return &cs +} + // NewCertstreamServer creates a new Certstream server from a config struct. func NewCertstreamServer(config config.Config) (*Certstream, error) { cs := Certstream{} @@ -108,6 +115,16 @@ func (cs *Certstream) Stop() { } } +// CreateIndexFile creates the index file for the certificate transparency logs. +// It gets only called when the CLI flag --create-index-file is set. +func (cs *Certstream) CreateIndexFile() error { + // If there is no watcher initialized, create a new one + if cs.watcher == nil { + cs.watcher = &certificatetransparency.Watcher{} + } + return cs.watcher.CreateIndexFile(cs.config.General.Recovery.CTIndexFile) +} + // signalHandler listens for signals in order to gracefully shut down the server. // Executes the callback function when a signal is received. func signalHandler(signals chan os.Signal, callback func()) { diff --git a/internal/config/config.go b/internal/config/config.go index b844a8a..29e1f5c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,6 +55,10 @@ type Config struct { AdditionalLogs []LogConfig `yaml:"additional_logs"` BufferSizes BufferSizes `yaml:"buffer_sizes"` DropOldLogs *bool `yaml:"drop_old_logs"` + Recovery struct { + Enabled bool `yaml:"enabled"` + CTIndexFile string `yaml:"ct_index_file"` + } `yaml:"recovery"` } } @@ -238,5 +242,10 @@ func validateConfig(config *Config) bool { config.General.DropOldLogs = &defaultCleanup } + if config.General.Recovery.Enabled && config.General.Recovery.CTIndexFile == "" { + log.Println("Recovery enabled but no index file specified. Defaulting to ./ct_index.json") + config.General.Recovery.CTIndexFile = "./ct_index.json" + } + return true }