Skip to content

Commit b26837c

Browse files
authored
Add experimental POSIX migrate tool (transparency-dev#805)
1 parent f4919e9 commit b26837c

2 files changed

Lines changed: 171 additions & 4 deletions

File tree

cmd/experimental/migrate/gcp/main.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"net/url"
2727
"strconv"
2828
"strings"
29+
"time"
2930

3031
"github.com/transparency-dev/tessera"
3132
"github.com/transparency-dev/tessera/api/layout"
@@ -43,6 +44,7 @@ var (
4344
numWorkers = flag.Uint("num_workers", 30, "Number of migration worker goroutines.")
4445
persistentAntispam = flag.Bool("antispam", false, "EXPERIMENTAL: Set to true to enable GCP-based persistent antispam storage.")
4546
antispamBatchSize = flag.Uint("antispam_batch_size", 1500, "EXPERIMENTAL: maximum number of antispam rows to insert in a batch (1500 gives good performance with 300 Spanner PU and above, smaller values may be required for smaller allocs).")
47+
clientHTTPTimeout = flag.Duration("client_http_timeout", 30*time.Second, "Timeout for outgoing HTTP requests")
4648
)
4749

4850
func main() {
@@ -54,10 +56,18 @@ func main() {
5456
if err != nil {
5557
klog.Exitf("Invalid --source_url %q: %v", *sourceURL, err)
5658
}
59+
hc := &http.Client{
60+
Transport: &http.Transport{
61+
MaxIdleConns: int(*numWorkers) * 2,
62+
MaxIdleConnsPerHost: int(*numWorkers),
63+
DisableKeepAlives: false,
64+
},
65+
Timeout: *clientHTTPTimeout,
66+
}
5767
// TODO(phbnf): This is currently built using the Tessera client lib, with a stand-alone func below for
5868
// fetching the Static CT entry bundles as they live in an different place.
5969
// When there's a Static CT client we can probably switch over to using it in here.
60-
src, err := client.NewHTTPFetcher(srcURL, nil)
70+
src, err := client.NewHTTPFetcher(srcURL, hc)
6171
if err != nil {
6272
klog.Exitf("Failed to create HTTP fetcher: %v", err)
6373
}
@@ -107,7 +117,7 @@ func main() {
107117
klog.Exitf("Failed to create MigrationTarget: %v", err)
108118
}
109119

110-
readEntryBundle := readCTEntryBundle(*sourceURL)
120+
readEntryBundle := readCTEntryBundle(*sourceURL, hc)
111121
if err := m.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, readEntryBundle); err != nil {
112122
klog.Exitf("Migrate failed: %v", err)
113123
}
@@ -133,7 +143,7 @@ func storageConfigFromFlags() gcp.Config {
133143
}
134144
}
135145

136-
func readCTEntryBundle(srcURL string) func(ctx context.Context, i uint64, p uint8) ([]byte, error) {
146+
func readCTEntryBundle(srcURL string, hc *http.Client) func(ctx context.Context, i uint64, p uint8) ([]byte, error) {
137147
return func(ctx context.Context, i uint64, p uint8) ([]byte, error) {
138148
up := strings.Replace(layout.EntriesPath(i, p), "entries", "data", 1)
139149
reqURL, err := url.JoinPath(srcURL, up)
@@ -144,7 +154,7 @@ func readCTEntryBundle(srcURL string) func(ctx context.Context, i uint64, p uint
144154
if err != nil {
145155
return nil, err
146156
}
147-
rsp, err := http.DefaultClient.Do(req)
157+
rsp, err := hc.Do(req)
148158
if err != nil {
149159
return nil, err
150160
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Copyright 2026 The Tessera authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// migrate-posix is a command-line tool for migrating data from a static-ct
16+
// compliant log, into a TesseraCT log instance using POSIX storage.
17+
package main
18+
19+
import (
20+
"context"
21+
"encoding/base64"
22+
"flag"
23+
"fmt"
24+
"io"
25+
"net/http"
26+
"net/url"
27+
"path/filepath"
28+
"strconv"
29+
"strings"
30+
"time"
31+
32+
"github.com/transparency-dev/tessera"
33+
"github.com/transparency-dev/tessera/api/layout"
34+
"github.com/transparency-dev/tessera/client"
35+
"github.com/transparency-dev/tessera/storage/posix"
36+
tposix_as "github.com/transparency-dev/tessera/storage/posix/antispam"
37+
"k8s.io/klog/v2"
38+
)
39+
40+
var (
41+
storageDir = flag.String("storage_dir", "", "Path to directory in which to store the migrated data.")
42+
sourceURL = flag.String("source_url", "", "Base monitoring URL for the source log.")
43+
numWorkers = flag.Uint("num_workers", 30, "Number of migration worker goroutines.")
44+
persistentAntispam = flag.Bool("antispam", true, "Set to true to populate antispam storage.")
45+
antispamBatchSize = flag.Uint("antispam_batch_size", 50000, "Maximum number of antispam rows to insert per batch update.")
46+
clientHTTPTimeout = flag.Duration("client_http_timeout", 30*time.Second, "Timeout for outgoing HTTP requests")
47+
)
48+
49+
func main() {
50+
klog.InitFlags(nil)
51+
flag.Parse()
52+
ctx := context.Background()
53+
54+
if *storageDir == "" {
55+
klog.Exit("--storage_dir must be set")
56+
}
57+
58+
srcURL, err := url.Parse(*sourceURL)
59+
if err != nil {
60+
klog.Exitf("Invalid --source_url %q: %v", *sourceURL, err)
61+
}
62+
hc := &http.Client{
63+
Transport: &http.Transport{
64+
MaxIdleConns: int(*numWorkers) * 2,
65+
MaxIdleConnsPerHost: int(*numWorkers),
66+
DisableKeepAlives: false,
67+
},
68+
Timeout: *clientHTTPTimeout,
69+
}
70+
// TODO(phbnf): This is currently built using the Tessera client lib, with a stand-alone func below for
71+
// fetching the Static CT entry bundles as they live in an different place.
72+
// When there's a Static CT client we can probably switch over to using it in here.
73+
src, err := client.NewHTTPFetcher(srcURL, hc)
74+
if err != nil {
75+
klog.Exitf("Failed to create HTTP fetcher: %v", err)
76+
}
77+
sourceCP, err := src.ReadCheckpoint(ctx)
78+
if err != nil {
79+
klog.Exitf("fetch initial source checkpoint: %v", err)
80+
}
81+
// TODO(AlCutter): We should be properly verifying and opening the checkpoint here with the source log's
82+
// public key.
83+
bits := strings.Split(string(sourceCP), "\n")
84+
sourceSize, err := strconv.ParseUint(bits[1], 10, 64)
85+
if err != nil {
86+
klog.Exitf("invalid CP size %q: %v", bits[1], err)
87+
}
88+
sourceRoot, err := base64.StdEncoding.DecodeString(bits[2])
89+
if err != nil {
90+
klog.Exitf("invalid checkpoint roothash %q: %v", bits[2], err)
91+
}
92+
93+
// Create our Tessera storage backend:
94+
cfg := posix.Config{
95+
Path: *storageDir,
96+
}
97+
driver, err := posix.New(ctx, cfg)
98+
if err != nil {
99+
klog.Exitf("Failed to create new POSIX storage driver: %v", err)
100+
}
101+
102+
opts := tessera.NewMigrationOptions().WithCTLayout()
103+
// Configure antispam storage, if necessary
104+
var antispam tessera.Antispam
105+
if *persistentAntispam {
106+
as_opts := tposix_as.AntispamOpts{
107+
MaxBatchSize: *antispamBatchSize,
108+
}
109+
antispam, err = tposix_as.NewAntispam(ctx, filepath.Join(*storageDir, ".state", "antispam"), as_opts)
110+
if err != nil {
111+
klog.Exitf("Failed to create new POSIX antispam storage: %v", err)
112+
}
113+
opts.WithAntispam(antispam)
114+
}
115+
116+
m, err := tessera.NewMigrationTarget(ctx, driver, opts)
117+
if err != nil {
118+
klog.Exitf("Failed to create MigrationTarget: %v", err)
119+
}
120+
121+
readEntryBundle := readCTEntryBundle(*sourceURL, hc)
122+
if err := m.Migrate(context.Background(), *numWorkers, sourceSize, sourceRoot, readEntryBundle); err != nil {
123+
klog.Exitf("Migrate failed: %v", err)
124+
}
125+
126+
// TODO(phbnf): This will need extending to identify and copy over the entries from the intermediate cert storage.
127+
128+
// TODO(Tessera #341): wait for antispam follower to complete
129+
<-make(chan bool)
130+
}
131+
132+
func readCTEntryBundle(srcURL string, hc *http.Client) func(ctx context.Context, i uint64, p uint8) ([]byte, error) {
133+
return func(ctx context.Context, i uint64, p uint8) ([]byte, error) {
134+
up := strings.Replace(layout.EntriesPath(i, p), "entries", "data", 1)
135+
reqURL, err := url.JoinPath(srcURL, up)
136+
if err != nil {
137+
return nil, err
138+
}
139+
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
140+
if err != nil {
141+
return nil, err
142+
}
143+
rsp, err := hc.Do(req)
144+
if err != nil {
145+
return nil, err
146+
}
147+
defer func() {
148+
if err := rsp.Body.Close(); err != nil {
149+
klog.Warningf("Failed to close response body: %v", err)
150+
}
151+
}()
152+
if rsp.StatusCode != http.StatusOK {
153+
return nil, fmt.Errorf("GET %q: %v", req.URL.Path, rsp.Status)
154+
}
155+
return io.ReadAll(rsp.Body)
156+
}
157+
}

0 commit comments

Comments
 (0)