Skip to content

Commit 5173fb7

Browse files
committed
refactor(static-ct): move code to ct-tiled.go file
The code got a bit mixed up. Having everything static ct related in one place makes a lot more sense.
1 parent a2b4a9f commit 5173fb7

2 files changed

Lines changed: 225 additions & 110 deletions

File tree

internal/certificatetransparency/ct-tiled.go

Lines changed: 222 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ import (
55
"context"
66
"fmt"
77
"io"
8+
"log"
89
"net/http"
910
"strconv"
1011
"strings"
12+
"time"
1113

1214
ct "github.com/google/certificate-transparency-go"
15+
"github.com/google/trillian/client/backoff"
1316
"golang.org/x/crypto/cryptobyte"
1417
)
1518

@@ -32,8 +35,13 @@ type TileLeaf struct {
3235
IssuerKeyHash [32]byte
3336
}
3437

35-
// EncodeTilePath encodes a tile index into the proper path format.
36-
func EncodeTilePath(index uint64) string {
38+
var (
39+
EntryTypeCert uint16 = 0
40+
EntryTypePrecert uint16 = 1
41+
)
42+
43+
// encodeTilePath encodes a tile index into the proper path format.
44+
func encodeTilePath(index uint64) string {
3745
if index == 0 {
3846
return "000"
3947
}
@@ -114,7 +122,7 @@ func FetchCheckpoint(ctx context.Context, client *http.Client, baseURL string) (
114122
// If partialWidth > 0, fetches a partial tile with that width (1-255).
115123
func FetchTile(ctx context.Context, client *http.Client, baseURL string, tileIndex, partialWidth uint64) ([]TileLeaf, error) {
116124
baseURL = strings.TrimRight(baseURL, "/")
117-
tilePath := EncodeTilePath(tileIndex)
125+
tilePath := encodeTilePath(tileIndex)
118126

119127
if partialWidth > 0 {
120128
tilePath = fmt.Sprintf("%s.p/%d", tilePath, partialWidth)
@@ -253,3 +261,214 @@ func ConvertTileLeafToRawLogEntry(leaf TileLeaf, index uint64) *ct.RawLogEntry {
253261

254262
return rawEntry
255263
}
264+
type StaticCTClient struct {
265+
url string
266+
httpClient *http.Client
267+
backoff backoff.Backoff
268+
userAgent string
269+
ctIndex uint64
270+
}
271+
272+
func NewStaticCTClient(url string, httpClient *http.Client, userAgent string, startIndex uint64) *StaticCTClient {
273+
return &StaticCTClient{
274+
url: strings.TrimRight(url, "/"),
275+
httpClient: httpClient,
276+
backoff: backoff.Backoff{
277+
Min: 2 * time.Second,
278+
Max: 15 * time.Second,
279+
Factor: 1.3,
280+
Jitter: true,
281+
},
282+
userAgent: userAgent,
283+
ctIndex: startIndex,
284+
}
285+
}
286+
287+
// Monitor continuously monitors the tiled CT log for new entries, starting from the current ctIndex.
288+
func (s *StaticCTClient) Monitor(ctx context.Context, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
289+
for {
290+
hadNewEntries, err := s.fetchAndProcessTiles(ctx, foundCert, foundPrecert)
291+
if err != nil {
292+
log.Printf("Error processing tiled log updates for '%s': %s\n", s.url, err)
293+
return err
294+
}
295+
296+
// Reset backoff if we found new entries
297+
if hadNewEntries {
298+
s.backoff.Reset()
299+
}
300+
301+
select {
302+
case <-ctx.Done():
303+
ctxErr := ctx.Err()
304+
if ctxErr != nil {
305+
return fmt.Errorf("context error: %w", ctxErr)
306+
}
307+
308+
return nil
309+
case <-time.After(s.backoff.Duration()):
310+
// Continue to the next iteration
311+
}
312+
}
313+
}
314+
315+
// fetchAndProcessTiles checks for new entries in the tiled log and processes them.
316+
// It returns true if at least one full tile was fetched.
317+
func (s *StaticCTClient) fetchAndProcessTiles(ctx context.Context, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) (bool, error) {
318+
// Fetch current checkpoint
319+
checkpoint, fetchErr := s.fetchCheckpoint(ctx)
320+
if fetchErr != nil {
321+
return false, fmt.Errorf("fetching checkpoint: %w", fetchErr)
322+
}
323+
324+
currentTreeSize := checkpoint.Size
325+
if currentTreeSize <= s.ctIndex {
326+
// No new entries
327+
return false, nil
328+
}
329+
330+
// Process entries from current index to new tree size
331+
startTile := (s.ctIndex + 1) / TileSize
332+
endTile := currentTreeSize / TileSize
333+
334+
// Process full tiles
335+
for tileIndex := startTile; tileIndex < endTile; tileIndex++ {
336+
if err := s.processTile(ctx, tileIndex, 0, foundCert, foundPrecert); err != nil {
337+
return false, fmt.Errorf("processing tile %d: %w", tileIndex, err)
338+
}
339+
}
340+
341+
// Process partial tile if exists
342+
partialSize := currentTreeSize % TileSize
343+
if partialSize > 0 {
344+
if err := s.processTile(ctx, endTile, partialSize, foundCert, foundPrecert); err != nil {
345+
log.Printf("Warning: error processing partial tile %d: %s\n", endTile, err)
346+
// Don't return error for partial tiles as they might be incomplete
347+
}
348+
}
349+
350+
return true, nil
351+
}
352+
353+
// processTile processes a single tile from the tiled log.
354+
// partialWidth of 0 means full tile, otherwise fetch partial tile with that width.
355+
func (s *StaticCTClient) processTile(ctx context.Context, tileIndex, partialWidth uint64, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
356+
leaves, err := s.fetchTile(ctx, tileIndex, partialWidth)
357+
if err != nil {
358+
return fmt.Errorf("fetching tile: %w", err)
359+
}
360+
361+
// Calculate the starting index for entries in this tile
362+
baseIndex := tileIndex * TileSize
363+
364+
for i, leaf := range leaves {
365+
entryIndex := baseIndex + uint64(i)
366+
367+
// Skip entries we've already processed
368+
if entryIndex <= s.ctIndex {
369+
continue
370+
}
371+
372+
// Convert TileLeaf to RawLogEntry for compatibility with existing parsing
373+
rawEntry := ConvertTileLeafToRawLogEntry(leaf, entryIndex)
374+
375+
// Process the entry using existing callbacks
376+
switch leaf.EntryType {
377+
case EntryTypeCert:
378+
foundCert(rawEntry)
379+
case EntryTypePrecert:
380+
foundPrecert(rawEntry)
381+
default:
382+
log.Printf("Unknown entry type %d in tile %d, skipping entry at index %d\n", leaf.EntryType, tileIndex, entryIndex)
383+
}
384+
385+
// Update the index
386+
s.ctIndex = entryIndex
387+
}
388+
389+
return nil
390+
}
391+
392+
// fetchTile fetches a tile from the tiled CT log using the provided client.
393+
// If partialWidth > 0, fetches a partial tile with that width (1-255).
394+
func (s *StaticCTClient) fetchTile(ctx context.Context, tileIndex, partialWidth uint64) ([]TileLeaf, error) {
395+
tilePath := encodeTilePath(tileIndex)
396+
397+
if partialWidth > 0 {
398+
tilePath = fmt.Sprintf("%s.p/%d", tilePath, partialWidth)
399+
}
400+
401+
url := fmt.Sprintf("%s/tile/data/%s", s.url, tilePath)
402+
403+
req, newReqErr := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
404+
if newReqErr != nil {
405+
return nil, fmt.Errorf("failed to create tile request: %w", newReqErr)
406+
}
407+
408+
req.Header.Set("User-Agent", UserAgent)
409+
410+
resp, reqErr := s.httpClient.Do(req)
411+
if reqErr != nil {
412+
return nil, fmt.Errorf("fetching tile %d: %w", tileIndex, reqErr)
413+
}
414+
defer resp.Body.Close()
415+
416+
if resp.StatusCode != http.StatusOK {
417+
return nil, fmt.Errorf("%w: unexpected status code %d", ErrRequestFailed, resp.StatusCode)
418+
}
419+
420+
data, err := io.ReadAll(resp.Body)
421+
if err != nil {
422+
return nil, fmt.Errorf("reading tile data: %w", err)
423+
}
424+
425+
return ParseTileData(data)
426+
}
427+
428+
// fetchCheckpoint fetches the checkpoint from a tiled CT log using the provided client.
429+
func (s *StaticCTClient) fetchCheckpoint(ctx context.Context) (*TiledCheckpoint, error) {
430+
url := s.url + "/checkpoint"
431+
432+
req, newReqErr := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
433+
if newReqErr != nil {
434+
return nil, fmt.Errorf("failed to create checkpoint request: %w", newReqErr)
435+
}
436+
437+
req.Header.Set("User-Agent", UserAgent)
438+
439+
resp, reqErr := s.httpClient.Do(req)
440+
if reqErr != nil {
441+
return nil, fmt.Errorf("failed to execute checkpoint request: %w", reqErr)
442+
}
443+
defer resp.Body.Close()
444+
445+
if resp.StatusCode != http.StatusOK {
446+
return nil, fmt.Errorf("%w: unexpected status code %d", ErrRequestFailed, resp.StatusCode)
447+
}
448+
449+
lines := make([]string, 0, 3)
450+
451+
scanner := bufio.NewScanner(resp.Body)
452+
for scanner.Scan() {
453+
lines = append(lines, scanner.Text())
454+
}
455+
456+
if scanErr := scanner.Err(); scanErr != nil {
457+
return nil, fmt.Errorf("failed reading response body: %w", scanErr)
458+
}
459+
460+
if len(lines) < 3 {
461+
return nil, fmt.Errorf("%w: invalid checkpoint format: expected at least 3 lines, got %d", ErrCheckpointInvalidFormat, len(lines))
462+
}
463+
464+
size, parseErr := strconv.ParseUint(lines[1], 10, 64)
465+
if parseErr != nil {
466+
return nil, fmt.Errorf("failed parsing tree size: %w", parseErr)
467+
}
468+
469+
return &TiledCheckpoint{
470+
Origin: lines[0],
471+
Size: size,
472+
Hash: lines[2],
473+
}, nil
474+
}

internal/certificatetransparency/ct-watcher.go

Lines changed: 3 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16-
"github.com/google/trillian/client/backoff"
17-
1816
"github.com/d-Rickyy-b/certstream-server-go/internal/config"
1917
"github.com/d-Rickyy-b/certstream-server-go/internal/metrics"
2018
"github.com/d-Rickyy-b/certstream-server-go/internal/models"
@@ -483,112 +481,10 @@ func (w *worker) runTiledWorker(ctx context.Context) error {
483481
w.ctIndex = checkpoint.Size
484482
}
485483

486-
// Initialize backoff for polling
487-
pollBackoff := &backoff.Backoff{
488-
Min: 1 * time.Second,
489-
Max: 30 * time.Second,
490-
Factor: 2,
491-
Jitter: true,
492-
}
493-
494-
// Continuous monitoring loop
495-
for {
496-
hadNewEntries, err := w.processTiledLogUpdates(ctx, httpClient)
497-
if err != nil {
498-
log.Printf("Error processing tiled log updates for '%s': %s\n", w.ctURL, err)
499-
return err
500-
}
501-
502-
// Reset backoff if we found new entries
503-
if hadNewEntries {
504-
pollBackoff.Reset()
505-
}
506-
507-
select {
508-
case <-ctx.Done():
509-
ctxErr := ctx.Err()
510-
if ctxErr != nil {
511-
return fmt.Errorf("context error: %w", ctxErr)
512-
}
513-
514-
return nil
515-
case <-time.After(pollBackoff.Duration()):
516-
// Continue to the next iteration
517-
}
518-
}
519-
}
520-
521-
// processTiledLogUpdates checks for new entries in the tiled log and processes them.
522-
func (w *worker) processTiledLogUpdates(ctx context.Context, httpClient *http.Client) (bool, error) {
523-
// Fetch current checkpoint
524-
checkpoint, fetchErr := FetchCheckpoint(ctx, httpClient, w.ctURL)
525-
if fetchErr != nil {
526-
return false, fmt.Errorf("fetching checkpoint: %w", fetchErr)
527-
}
528-
529-
currentTreeSize := checkpoint.Size
530-
if currentTreeSize <= w.ctIndex {
531-
// No new entries
532-
return false, nil
533-
}
534-
535-
// Process entries from current index to new tree size
536-
startTile := (w.ctIndex + 1) / TileSize
537-
endTile := currentTreeSize / TileSize
538-
539-
// Process complete tiles
540-
for tileIndex := startTile; tileIndex < endTile; tileIndex++ {
541-
if err := w.processTile(ctx, httpClient, tileIndex, 0); err != nil {
542-
return false, fmt.Errorf("processing tile %d: %w", tileIndex, err)
543-
}
544-
}
545-
546-
// Process partial tile if exists
547-
partialSize := currentTreeSize % TileSize
548-
if partialSize > 0 {
549-
if err := w.processTile(ctx, httpClient, endTile, partialSize); err != nil {
550-
log.Printf("Warning: error processing partial tile %d: %s\n", endTile, err)
551-
// Don't return error for partial tiles as they might be incomplete
552-
}
553-
}
554-
555-
return true, nil
556-
}
557-
558-
// processTile processes a single tile from the tiled log.
559-
// partialWidth of 0 means full tile, otherwise fetch partial tile with that width.
560-
func (w *worker) processTile(ctx context.Context, hc *http.Client, tileIndex, partialWidth uint64) error {
561-
leaves, err := FetchTile(ctx, hc, w.ctURL, tileIndex, partialWidth)
484+
staticCTClient := NewStaticCTClient(w.ctURL, httpClient, UserAgent, w.ctIndex)
485+
err := staticCTClient.Monitor(ctx, w.foundCertCallback, w.foundPrecertCallback)
562486
if err != nil {
563-
return fmt.Errorf("fetching tile: %w", err)
564-
}
565-
566-
// Calculate the starting index for entries in this tile
567-
baseIndex := tileIndex * TileSize
568-
569-
for i, leaf := range leaves {
570-
entryIndex := baseIndex + uint64(i)
571-
572-
// Skip entries we've already processed
573-
if entryIndex <= w.ctIndex {
574-
continue
575-
}
576-
577-
// Convert TileLeaf to RawLogEntry for compatibility with existing parsing
578-
rawEntry := ConvertTileLeafToRawLogEntry(leaf, entryIndex)
579-
580-
// Process the entry using existing callbacks
581-
switch leaf.EntryType {
582-
case 0:
583-
w.foundCertCallback(rawEntry)
584-
case 1:
585-
w.foundPrecertCallback(rawEntry)
586-
default:
587-
log.Printf("Unknown entry type %d in tile %d, skipping entry at index %d\n", leaf.EntryType, tileIndex, entryIndex)
588-
}
589-
590-
// Update the index
591-
w.ctIndex = entryIndex
487+
return fmt.Errorf("error scanning for certificates: %w", err)
592488
}
593489

594490
return nil

0 commit comments

Comments
 (0)