Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,24 @@ jobs:
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: failure() && runner.debug == '1'

go-tests:
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v5
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.24'
cache-dependency-path: BackupRepair/go.mod
- name: Run Go tests
run: go test -v ./...
working-directory: BackupRepair

build:
name: Build and Push
needs:
- tests
- go-tests
uses: ./.github/workflows/build.yml
secrets: inherit
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ node_modules/
*-win.exe
*-linux
*-macos
BackupRepair/BackupRepair
238 changes: 238 additions & 0 deletions BackupRepair/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package main

import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
)

type AdminClient struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make sense to add this to the Go version of bucketclient

endpoint string
client *http.Client
}

type BackupIndexEntry struct {
Bseq int `json:"bseq"`
CopyNumber int `json:"copyNumber"`
Size int `json:"size"`
CumSize int `json:"cumsize"`
Time string `json:"time"`
FormatVersion int `json:"formatVersion"`
Compression string `json:"compression"`
}

type ReindexStatus struct {
Status string `json:"status"`
ProcessingBseq int `json:"processingBseq"`
TargetBseq int `json:"targetBseq"`
Error string `json:"error"`
}

func NewAdminClient(endpoint string) *AdminClient {
return &AdminClient{
endpoint: endpoint,
client: &http.Client{Timeout: 30 * time.Second},
}
}

const indexPageLimit = 1000

// HasBseq checks whether a specific bseq exists for a given copy number.
func (a *AdminClient) HasBseq(copyNumber, bseq int) (bool, error) {
entries, err := a.getBackupIndexPage(copyNumber, bseq, 1, bseq)
if err != nil {
return false, err
}
return len(entries) > 0 && entries[0].Bseq == bseq, nil
}

// getBackupIndexPage fetches a single page of backup index entries.
// If maxBseq is 0, no upper bound is applied.
func (a *AdminClient) getBackupIndexPage(copyNumber, minBseq, limit, maxBseq int) ([]BackupIndexEntry, error) {
url := fmt.Sprintf("%s/_/raft/backups?copy=%d&limit=%d&minBseq=%d",
a.endpoint, copyNumber, limit, minBseq)
if maxBseq > 0 {
url += fmt.Sprintf("&maxBseq=%d", maxBseq)
}

resp, err := a.client.Get(url)
if err != nil {
return nil, fmt.Errorf("GET %s: %w", url, err)
}

body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("reading response body: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("GET %s returned %d: %s", url, resp.StatusCode, string(body))
}

var entries []BackupIndexEntry
if err := json.Unmarshal(body, &entries); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}
return entries, nil
}

// indexIterator streams backup index entries page by page for a single copy.
type indexIterator struct {
admin *AdminClient
copyNum int
buf []BackupIndexEntry
pos int
nextMin int
done bool
count int
maxBseq int
}

func newIndexIterator(admin *AdminClient, copyNum int) *indexIterator {
return &indexIterator{admin: admin, copyNum: copyNum, nextMin: 1}
}

// next returns the next bseq, or (0, false, nil) when exhausted.
func (it *indexIterator) next() (int, bool, error) {
if err := it.fill(); err != nil {
return 0, false, err
}
if it.done {
return 0, false, nil
}
bseq := it.buf[it.pos].Bseq
it.pos++
it.count++
if bseq > it.maxBseq {
it.maxBseq = bseq
}
return bseq, true, nil
}

// peek returns the current bseq without consuming it, or (0, false, nil) when exhausted.
func (it *indexIterator) peek() (int, bool, error) {
if err := it.fill(); err != nil {
return 0, false, err
}
if it.done {
return 0, false, nil
}
return it.buf[it.pos].Bseq, true, nil
}

func (it *indexIterator) fill() error {
if it.done || it.pos < len(it.buf) {
return nil
}
entries, err := it.admin.getBackupIndexPage(it.copyNum, it.nextMin, indexPageLimit, 0)
if err != nil {
return err
}
if len(entries) == 0 {
it.done = true
return nil
}
it.buf = entries
it.pos = 0
it.nextMin = entries[len(entries)-1].Bseq + 1
return nil
}

// GetBackupIndex fetches all backup index entries for a given copy number.
// Used by verifyRepairs where we need the full list.
func (a *AdminClient) GetBackupIndex(copyNumber int) ([]BackupIndexEntry, error) {
var allEntries []BackupIndexEntry
it := newIndexIterator(a, copyNumber)
for {
if err := it.fill(); err != nil {
return nil, err
}
if it.done {
break
}
allEntries = append(allEntries, it.buf[it.pos:]...)
it.pos = len(it.buf)
}
return allEntries, nil
}

func (a *AdminClient) TriggerReindex() error {
url := a.endpoint + "/_/raft/backups/reindex"
resp, err := a.client.Post(url, "", nil)
if err != nil {
return fmt.Errorf("POST %s: %w", url, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)

if resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("POST %s returned %d: %s", url, resp.StatusCode, string(body))
}
return nil
}

func (a *AdminClient) GetReindexStatus() (*ReindexStatus, error) {
url := a.endpoint + "/_/raft/backups/reindex"
resp, err := a.client.Get(url)
if err != nil {
return nil, fmt.Errorf("GET %s: %w", url, err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("GET %s returned %d: %s", url, resp.StatusCode, string(body))
}

var status ReindexStatus
if err := json.Unmarshal(body, &status); err != nil {
return nil, fmt.Errorf("parsing reindex status: %w", err)
}
return &status, nil
}

const maxStallPolls = 10

func (a *AdminClient) WaitForReindex(pollInterval time.Duration) error {
lastBseq := -1
stallCount := 0

for {
status, err := a.GetReindexStatus()
if err != nil {
return err
}

switch status.Status {
case "success":
return nil
case "failed":
return fmt.Errorf("reindex job failed: %s", status.Error)
case "running":
if status.TargetBseq > 0 {
pct := float64(status.ProcessingBseq) / float64(status.TargetBseq) * 100
log.Printf(" reindex progress: bseq %d / %d (%.1f%%)",
status.ProcessingBseq, status.TargetBseq, pct)
}
if status.ProcessingBseq == lastBseq {
stallCount++
if stallCount >= maxStallPolls {
return fmt.Errorf("reindex stalled at bseq %d for %d consecutive polls",
lastBseq, stallCount)
}
} else {
stallCount = 0
lastBseq = status.ProcessingBseq
}
default:
return fmt.Errorf("reindex returned unknown status: %q", status.Status)
}

time.Sleep(pollInterval)
}
}
78 changes: 78 additions & 0 deletions BackupRepair/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"flag"
"fmt"
)

type Config struct {
AdminEndpoint string
SproxydEndpoint string
SproxydPath string
Cluster string
RaftSessionID string
InstallID int
BackupCopies int
MinBseq int
MaxBseq int
DryRun bool
Yes bool
}

// BackupID returns the backup identifier used for key generation: "cluster/raftSessionId".
func (c Config) BackupID() string {
return c.Cluster + "/" + c.RaftSessionID
}

func parseFlags() Config {
var cfg Config
flag.StringVar(&cfg.AdminEndpoint, "admin", "http://localhost:4250",
"MetaData repd admin endpoint (leader)")
flag.StringVar(&cfg.SproxydEndpoint, "sproxyd", "http://localhost:8181",
"Sproxyd endpoint")
flag.StringVar(&cfg.SproxydPath, "sproxyd-path", "/proxy/chord",
"Sproxyd URL path prefix")
flag.StringVar(&cfg.Cluster, "cluster", "",
"Cluster name from repd config (required)")
flag.StringVar(&cfg.RaftSessionID, "raft-session-id", "",
"Raft session ID (required)")
flag.IntVar(&cfg.InstallID, "install-id", 0,
"Install ID (0-255)")
flag.IntVar(&cfg.BackupCopies, "backup-copies", 3,
"Number of backup copies")
flag.IntVar(&cfg.MinBseq, "min-bseq", 1,
"First bseq to consider (skip older backups)")
flag.IntVar(&cfg.MaxBseq, "max-bseq", 0,
"Last bseq to consider (0 = no upper bound)")
flag.BoolVar(&cfg.DryRun, "dry-run", false,
"Report inconsistencies without repairing")
flag.BoolVar(&cfg.Yes, "y", false,
"Skip confirmation prompts between steps")
flag.Parse()
return cfg
}

func (c Config) validate() error {
if c.Cluster == "" {
return fmt.Errorf("--cluster is required")
}
if c.RaftSessionID == "" {
return fmt.Errorf("--raft-session-id is required")
}
if c.InstallID < 0 || c.InstallID > 255 {
return fmt.Errorf("--install-id must be between 0 and 255")
}
if c.MinBseq < 1 {
return fmt.Errorf("--min-bseq must be at least 1")
}
if c.MaxBseq < 0 {
return fmt.Errorf("--max-bseq must be non-negative")
}
if c.MaxBseq > 0 && c.MaxBseq < c.MinBseq {
return fmt.Errorf("--max-bseq (%d) must be >= --min-bseq (%d)", c.MaxBseq, c.MinBseq)
}
if c.BackupCopies < 2 {
return fmt.Errorf("--backup-copies must be at least 2")
}
return nil
}
3 changes: 3 additions & 0 deletions BackupRepair/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/scality/s3utils/BackupRepair

go 1.24.4
Loading
Loading