Skip to content

Commit 5b05e74

Browse files
committed
Integrate tlog-mirror client into mirror hammer
1 parent 37bfa98 commit 5b05e74

3 files changed

Lines changed: 89 additions & 23 deletions

File tree

client/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,13 @@ func (lst *LogStateTracker) Latest() log.Checkpoint {
332332
return lst.latestConsistent
333333
}
334334

335+
// LatestRaw returns the raw bytes of the latest proven-consistent checkpoint.
336+
func (lst *LogStateTracker) LatestRaw() []byte {
337+
lst.mu.RLock()
338+
defer lst.mu.RUnlock()
339+
return lst.latestConsistentRaw
340+
}
341+
335342
// nodeCache hides the tiles abstraction away, and improves
336343
// performance by caching tiles it's seen.
337344
// Threadsafe.

internal/mirror/hammer/README.md

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@ For real load-testing applications, especially headless runs as part of a CI pip
1515

1616
## Usage
1717

18-
Example usage to test a deployment of `cmd/conformance/posix`:
18+
First, store the hammer log test private key:
19+
20+
```shell
21+
echo "PRIVATE+KEY+example.com/log/testdata+3232599a+BhO1aBkQITKqnLpM1tkZqj6H7+WU506YqBVlOhyrTO+j" > /tmp/hammer-log.key
22+
```
23+
24+
Example usage to test a deployment of `cmd/mtc/mirror/posix` server:
1925

2026
```shell
2127
go run ./internal/mirror/hammer \
22-
--log_public_key=transparency.dev/tessera/example+ae330e15+ASf4/L1zE859VqlfQgGzKy34l91Gl8W6wfwp+vKP62DW \
23-
--log_url=http://localhost:2024 \
24-
--max_read_ops=1024 \
25-
--num_readers_random=128 \
26-
--num_readers_full=128 \
28+
--mirror_url=http://localhost:8080 \
29+
--storage_dir=/tmp/hammer-log \
30+
--log_private_key=/tmp/hammer-log.key \
2731
--num_writers=256 \
2832
--max_write_ops=42
2933
```
@@ -34,9 +38,9 @@ If the timeout of 1 minute is reached first, then it exits with an exit code of
3438

3539
```shell
3640
go run ./internal/mirror/hammer \
37-
--log_public_key=transparency.dev/tessera/example+ae330e15+ASf4/L1zE859VqlfQgGzKy34l91Gl8W6wfwp+vKP62DW \
38-
--log_url=http://localhost:2024 \
39-
--max_read_ops=0 \
41+
--mirror_url=http://localhost:8080 \
42+
--storage_dir=/tmp/hammer-log \
43+
--log_private_key=/tmp/hammer-log.key \
4044
--num_writers=512 \
4145
--max_write_ops=512 \
4246
--max_runtime=1m \

internal/mirror/hammer/hammer.go

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"math/rand/v2"
2424
"net"
2525
"net/http"
26+
"net/url"
2627
"os"
2728
"strings"
2829
"sync"
@@ -31,29 +32,24 @@ import (
3132
"github.com/transparency-dev/formats/note"
3233
"github.com/transparency-dev/tessera"
3334
"github.com/transparency-dev/tessera/client"
34-
"github.com/transparency-dev/tessera/internal/hammer/loadtest"
35+
"github.com/transparency-dev/tessera/client/mirror"
36+
"github.com/transparency-dev/tessera/internal/mirror/hammer/loadtest"
3537
"github.com/transparency-dev/tessera/storage/posix"
3638
"golang.org/x/net/http2"
3739

3840
"log/slog"
3941
)
4042

4143
func init() {
42-
flag.Var(&logURL, "log_url", "Log storage root URL (can be specified multiple times), e.g. https://log.server/and/path/")
43-
flag.Var(&writeLogURL, "write_log_url", "Root URL for writing to a log (can be specified multiple times), e.g. https://log.server/and/path/ (optional, defaults to log_url)")
44+
flag.Var(&mirrorURL, "mirror_url", "Root URL for writing to a mirror (can be specified multiple times), e.g. https://log.server/and/path/ (optional, defaults to log_url)")
4445
}
4546

4647
var (
47-
logURL multiStringFlag
48-
writeLogURL multiStringFlag
48+
mirrorURL multiStringFlag
4949

5050
storageDir = flag.String("storage_dir", "", "Root directory to store log data")
5151
logPrivKey = flag.String("log_private_key", "", "Location of private key file")
5252

53-
maxReadOpsPerSecond = flag.Int("max_read_ops", 20, "The maximum number of read operations per second")
54-
numReadersRandom = flag.Int("num_readers_random", 4, "The number of readers looking for random leaves")
55-
numReadersFull = flag.Int("num_readers_full", 4, "The number of readers downloading the whole log")
56-
5753
maxWriteOpsPerSecond = flag.Int("max_write_ops", 0, "The maximum number of write operations per second")
5854
numWriters = flag.Int("num_writers", 0, "The number of independent write tasks to run")
5955

@@ -81,8 +77,8 @@ func main() {
8177

8278
hc = &http.Client{
8379
Transport: &http.Transport{
84-
MaxIdleConns: *numWriters + *numReadersFull + *numReadersRandom,
85-
MaxIdleConnsPerHost: *numWriters + *numReadersFull + *numReadersRandom,
80+
MaxIdleConns: *numWriters,
81+
MaxIdleConnsPerHost: *numWriters,
8682
DisableKeepAlives: false,
8783
},
8884
Timeout: *httpTimeout,
@@ -153,15 +149,42 @@ func main() {
153149
os.Exit(1)
154150
}
155151

152+
// TODO(roger2hk): Move this into the internal/mirror/hammer/loadtest package.
153+
// Sync log data to mirror servers.
154+
for _, urlStr := range mirrorURL {
155+
mURL, err := url.Parse(urlStr)
156+
if err != nil {
157+
slog.ErrorContext(ctx, "Failed to parse write mirror URL", slog.String("url", urlStr), slog.Any("error", err))
158+
os.Exit(1)
159+
}
160+
mOpts := mirror.NewOptions().
161+
WithMirrorURL(mURL).
162+
WithHTTPClient(hc).
163+
WithLogOrigin(s.Verifier().Name()).
164+
WithTileFetcher(logReader.ReadTile).
165+
WithBundleFetcher(logReader.ReadEntryBundle).
166+
WithMirrorCheckpointFetcher(func(ctx context.Context) ([]byte, error) {
167+
return nil, nil
168+
}).
169+
WithPackageProver(dummyPackageProver)
170+
mc, err := mirror.NewClient(ctx, mOpts)
171+
if err != nil {
172+
slog.ErrorContext(ctx, "Failed to create mirror client", slog.String("url", urlStr), slog.Any("error", err))
173+
os.Exit(1)
174+
}
175+
176+
go runMirrorSync(ctx, tracker, mc, mURL)
177+
}
178+
156179
ha := loadtest.NewHammerAnalyser(func() uint64 { return tracker.Latest().Size })
157180
ha.Run(ctx)
158181

159182
gen := newLeafGenerator(tracker.Latest().Size, *leafMinSize, *dupChance)
160183
opts := loadtest.HammerOpts{
161-
MaxReadOpsPerSecond: *maxReadOpsPerSecond,
184+
MaxReadOpsPerSecond: 0,
162185
MaxWriteOpsPerSecond: *maxWriteOpsPerSecond,
163-
NumReadersRandom: *numReadersRandom,
164-
NumReadersFull: *numReadersFull,
186+
NumReadersRandom: 0,
187+
NumReadersFull: 0,
165188
NumWriters: *numWriters,
166189
}
167190
hammer := loadtest.NewHammer(tracker, logReader.ReadEntryBundle, leafWriter, gen, ha.SeqLeafChan, ha.ErrChan, opts)
@@ -301,3 +324,35 @@ func getKeyFile(path string) (string, error) {
301324
}
302325
return string(k), nil
303326
}
327+
328+
// TODO(roger2hk): Replace with the subtree consistency proof from merkle repo.
329+
func dummyPackageProver(_ context.Context, _ uint64, _ uint64) ([][]byte, error) {
330+
return nil, nil
331+
}
332+
333+
// runMirrorSync syncs the source log to a mirror server.
334+
func runMirrorSync(ctx context.Context, tracker *client.LogStateTracker, mClient *mirror.Client, mURL *url.URL) {
335+
tick := time.NewTicker(500 * time.Millisecond)
336+
defer tick.Stop()
337+
var lastSyncedSize uint64
338+
for {
339+
select {
340+
case <-ctx.Done():
341+
return
342+
case <-tick.C:
343+
latest := tracker.Latest()
344+
if latest.Size == lastSyncedSize {
345+
continue
346+
}
347+
slog.InfoContext(ctx, "Syncing source log to mirror", slog.Uint64("size", latest.Size), slog.String("mirror", mURL.String()))
348+
latestRaw := tracker.LatestRaw()
349+
cosigs, err := mClient.Sync(ctx, latestRaw, latest.Size)
350+
if err != nil {
351+
slog.ErrorContext(ctx, "Failed to sync to mirror", slog.String("mirror", mURL.String()), slog.Any("error", err))
352+
continue
353+
}
354+
slog.InfoContext(ctx, "Successfully synced to mirror", slog.Uint64("size", latest.Size), slog.String("mirror", mURL.String()), slog.Int("cosigs_len", len(cosigs)))
355+
lastSyncedSize = latest.Size
356+
}
357+
}
358+
}

0 commit comments

Comments
 (0)