diff --git a/cmd/backup-manager/app/compact/manager.go b/cmd/backup-manager/app/compact/manager.go index 85de0b4d75..af6d06eb2e 100644 --- a/cmd/backup-manager/app/compact/manager.go +++ b/cmd/backup-manager/app/compact/manager.go @@ -57,7 +57,6 @@ type Manager struct { options options.CompactOpts } -const crrCheckpointPrefix = "crr-checkpoint" const redactedCompactArgValue = "" // NewManager return a Manager @@ -190,6 +189,9 @@ func (cm *Manager) buildCompactArgs(base64Storage string) []string { "-N", strconv.FormatUint(cm.options.Concurrency, 10), } + if cm.options.Name != "" { + args = append(args, "--name", cm.options.Name) + } if cm.options.Sharded { return cm.buildShardedCompactArgs(args) @@ -208,16 +210,11 @@ func (cm *Manager) buildShardedCompactArgs(args []string) []string { args = append(args, "--cal-shift-ts", "--physical-file-cache-capacity", - "150G", + cm.options.PhysicalFileCacheCapacity, ) - // When the CR sets EndTs explicitly, honor it as a hard upper bound via - // --until. Otherwise let tikv-ctl resolve until-ts from the replication - // checkpoint stored under the fixed crr-checkpoint sub-prefix. if cm.options.UntilTS != 0 { args = append(args, "--until", strconv.FormatUint(cm.options.UntilTS, 10)) - } else { - args = append(args, "--crr-checkpoint-prefix", crrCheckpointPrefix) } // --shard tells tikv-ctl this pod's slice of the keyspace partition. diff --git a/cmd/backup-manager/app/compact/manager_sharded_test.go b/cmd/backup-manager/app/compact/manager_sharded_test.go index 2da2cf0ee3..944ed49e36 100644 --- a/cmd/backup-manager/app/compact/manager_sharded_test.go +++ b/cmd/backup-manager/app/compact/manager_sharded_test.go @@ -28,107 +28,110 @@ import ( "k8s.io/client-go/tools/cache" ) -func TestBuildCompactArgsDefaultMode(t *testing.T) { - manager := &Manager{ - compact: &v1alpha1.CompactBackup{}, - options: options.CompactOpts{ - FromTS: 11, - UntilTS: 22, - Concurrency: 4, +func TestBuildCompactArgs(t *testing.T) { + testCases := []struct { + name string + opts options.CompactOpts + want []string + }{ + { + name: "default mode", + opts: options.CompactOpts{ + FromTS: 11, + UntilTS: 22, + Concurrency: 4, + }, + want: []string{ + "--log-level", "INFO", + "--log-format", "json", + "compact-log-backup", + "--storage-base64", "storage-base64", + "--from", "11", + "-N", "4", + "--until", "22", + }, }, - } - - args := manager.buildCompactArgs("storage-base64") - want := []string{ - "--log-level", "INFO", - "--log-format", "json", - "compact-log-backup", - "--storage-base64", "storage-base64", - "--from", "11", - "-N", "4", - "--until", "22", - } - - assertStringSliceEqual(t, args, want) -} - -func TestBuildCompactArgsShardedMode(t *testing.T) { - manager := &Manager{ - compact: &v1alpha1.CompactBackup{}, - options: options.CompactOpts{ - FromTS: 11, - UntilTS: 22, - Concurrency: 4, - Sharded: true, - ShardIndex: 1, - ShardCount: 3, + { + name: "default mode with name", + opts: options.CompactOpts{ + FromTS: 11, + UntilTS: 22, + Name: "compact-task-a", + Concurrency: 4, + }, + want: []string{ + "--log-level", "INFO", + "--log-format", "json", + "compact-log-backup", + "--storage-base64", "storage-base64", + "--from", "11", + "-N", "4", + "--name", "compact-task-a", + "--until", "22", + }, }, - } - - args := manager.buildCompactArgs("storage-base64") - want := []string{ - "--log-level", "INFO", - "--log-format", "json", - "compact-log-backup", - "--storage-base64", "storage-base64", - "--from", "11", - "-N", "4", - "--cal-shift-ts", - "--physical-file-cache-capacity", "150G", - "--until", "22", - "--shard", "2/3", - "--minimal-compaction-size", "0", - } - - assertStringSliceEqual(t, args, want) -} - -func TestBuildCompactArgsCRRModeShardedUsesCheckpointPrefix(t *testing.T) { - manager := &Manager{ - compact: &v1alpha1.CompactBackup{}, - options: options.CompactOpts{ - FromTS: 11, - UntilTS: 0, - Concurrency: 4, - Sharded: true, - ShardIndex: 1, - ShardCount: 3, + { + name: "sharded mode", + opts: options.CompactOpts{ + FromTS: 11, + UntilTS: 22, + Concurrency: 4, + PhysicalFileCacheCapacity: "200G", + Sharded: true, + ShardIndex: 0, + ShardCount: 3, + }, + want: []string{ + "--log-level", "INFO", + "--log-format", "json", + "compact-log-backup", + "--storage-base64", "storage-base64", + "--from", "11", + "-N", "4", + "--cal-shift-ts", + "--physical-file-cache-capacity", "200G", + "--until", "22", + "--shard", "1/3", + "--minimal-compaction-size", "0", + }, + }, + { + name: "sharded mode without until", + opts: options.CompactOpts{ + FromTS: 11, + UntilTS: 0, + Concurrency: 4, + PhysicalFileCacheCapacity: "200G", + Sharded: true, + ShardIndex: 1, + ShardCount: 3, + }, + want: []string{ + "--log-level", "INFO", + "--log-format", "json", + "compact-log-backup", + "--storage-base64", "storage-base64", + "--from", "11", + "-N", "4", + "--cal-shift-ts", + "--physical-file-cache-capacity", "200G", + "--shard", "2/3", + "--minimal-compaction-size", "0", + }, }, } - args := manager.buildCompactArgs("storage-base64") - want := []string{ - "--log-level", "INFO", - "--log-format", "json", - "compact-log-backup", - "--storage-base64", "storage-base64", - "--from", "11", - "-N", "4", - "--cal-shift-ts", - "--physical-file-cache-capacity", "150G", - "--crr-checkpoint-prefix", "crr-checkpoint", - "--shard", "2/3", - "--minimal-compaction-size", "0", - } - - assertStringSliceEqual(t, args, want) -} + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + manager := &Manager{ + compact: &v1alpha1.CompactBackup{}, + options: tc.opts, + } -func TestBuildCompactArgsShardedModeConvertsKubernetesIndexToOneBasedShard(t *testing.T) { - manager := &Manager{ - compact: &v1alpha1.CompactBackup{}, - options: options.CompactOpts{ - FromTS: 11, - UntilTS: 22, - Concurrency: 4, - Sharded: true, - ShardIndex: 0, - ShardCount: 3, - }, + args := manager.buildCompactArgs("storage-base64") + assertStringSliceEqual(t, args, tc.want) + }) } - - args := manager.buildCompactArgs("storage-base64") - assertStringSliceContainsPair(t, args, "--shard", "1/3") } func TestSanitizeCompactCommandArgsRedactsStorageBase64Value(t *testing.T) { @@ -231,16 +234,6 @@ func assertStringSliceEqual(t *testing.T, got, want []string) { } } -func assertStringSliceContainsPair(t *testing.T, got []string, key, value string) { - t.Helper() - for i := 0; i+1 < len(got); i++ { - if got[i] == key && got[i+1] == value { - return - } - } - t.Fatalf("expected args to contain %q %q, got %#v", key, value, got) -} - func newManagerForProcessCompactTest(t *testing.T, compact *v1alpha1.CompactBackup) *Manager { t.Helper() @@ -271,11 +264,12 @@ func newShardedCompactBackupForManagerTest() *v1alpha1.CompactBackup { Namespace: "default", }, Spec: v1alpha1.CompactSpec{ - StartTs: "400036290571534337", - EndTs: "400036290571534338", - Concurrency: 4, - Mode: v1alpha1.CompactModeSharded, - ShardCount: &shardCount, + StartTs: "400036290571534337", + EndTs: "400036290571534338", + Concurrency: 4, + Mode: v1alpha1.CompactModeSharded, + ShardCount: &shardCount, + PhysicalFileCacheCapacity: "200G", }, } } diff --git a/cmd/backup-manager/app/compact/options/options.go b/cmd/backup-manager/app/compact/options/options.go index dbd7bc5fc8..fa70828e9c 100644 --- a/cmd/backup-manager/app/compact/options/options.go +++ b/cmd/backup-manager/app/compact/options/options.go @@ -18,10 +18,12 @@ import ( "math" "os" "strconv" + "strings" "github.com/pingcap/errors" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/apis/util/config" + "k8s.io/apimachinery/pkg/api/resource" ) const ( @@ -30,16 +32,17 @@ const ( ) type CompactOpts struct { - FromTS uint64 - UntilTS uint64 - Name string - Concurrency uint64 - ShardIndex int - ShardCount int - Sharded bool - Namespace string `json:"namespace"` - ResourceName string `json:"resourceName"` - TikvVersion string `json:"tikvVersion"` + FromTS uint64 + UntilTS uint64 + Name string + Concurrency uint64 + PhysicalFileCacheCapacity string + ShardIndex int + ShardCount int + Sharded bool + Namespace string `json:"namespace"` + ResourceName string `json:"resourceName"` + TikvVersion string `json:"tikvVersion"` } func ParseCompactOptions(compact *v1alpha1.CompactBackup, opts *CompactOpts) error { @@ -55,8 +58,12 @@ func ParseCompactOptions(compact *v1alpha1.CompactBackup, opts *CompactOpts) err opts.FromTS = startTs opts.UntilTS = endTs - opts.Name = compact.Name + opts.Name = strings.TrimSpace(compact.Spec.Name) opts.Concurrency = uint64(compact.Spec.Concurrency) + opts.PhysicalFileCacheCapacity = strings.TrimSpace(compact.Spec.PhysicalFileCacheCapacity) + if opts.PhysicalFileCacheCapacity == "" { + opts.PhysicalFileCacheCapacity = "0" + } opts.Sharded = compact.Spec.Mode == v1alpha1.CompactModeSharded opts.ShardIndex = 0 opts.ShardCount = 0 @@ -81,10 +88,10 @@ func (c *CompactOpts) Verify() error { if c.FromTS == fromTSUnset { return errors.New("from-ts must be set") } - // UntilTS unset is valid only in sharded CCR checkpoint mode: tikv-ctl - // reads the until-ts from the log-backup global checkpoint via - // --crr-checkpoint-prefix. Non-sharded compact keeps the existing - // requirement that EndTs/UntilTS must be set explicitly. + // UntilTS unset is valid only in sharded CCR checkpoint mode: tikv-ctl can + // read the until-ts from the log-backup global checkpoint. Non-sharded + // compact keeps the existing requirement that EndTs/UntilTS must be set + // explicitly. if c.UntilTS == untilTSUnset && !c.Sharded { return errors.New("until-ts must be set") } @@ -95,6 +102,17 @@ func (c *CompactOpts) Verify() error { return errors.Errorf("concurrency %d must be greater than 0", c.Concurrency) } if c.Sharded { + c.PhysicalFileCacheCapacity = strings.TrimSpace(c.PhysicalFileCacheCapacity) + if c.PhysicalFileCacheCapacity == "" { + c.PhysicalFileCacheCapacity = "0" + } + capacity, err := resource.ParseQuantity(c.PhysicalFileCacheCapacity) + if err != nil { + return errors.Annotatef(err, "invalid physicalFileCacheCapacity %q", c.PhysicalFileCacheCapacity) + } + if capacity.Sign() < 0 { + return errors.New("physicalFileCacheCapacity must be greater than or equal to 0") + } if c.ShardCount <= 0 { return errors.Errorf("shard-count %d must be greater than 0", c.ShardCount) } diff --git a/cmd/backup-manager/app/compact/options/options_sharded_test.go b/cmd/backup-manager/app/compact/options/options_sharded_test.go index aa1b259fa7..ac2066a06e 100644 --- a/cmd/backup-manager/app/compact/options/options_sharded_test.go +++ b/cmd/backup-manager/app/compact/options/options_sharded_test.go @@ -33,11 +33,13 @@ func TestParseCompactOptionsShardedFields(t *testing.T) { Name: "compact-sharded", }, Spec: v1alpha1.CompactSpec{ - StartTs: "400036290571534337", - EndTs: "400036290571534338", - Concurrency: 8, - Mode: v1alpha1.CompactModeSharded, - ShardCount: &shardCount, + StartTs: "400036290571534337", + EndTs: "400036290571534338", + Concurrency: 8, + Mode: v1alpha1.CompactModeSharded, + ShardCount: &shardCount, + PhysicalFileCacheCapacity: "200G", + Name: "compact-task-a", }, } @@ -54,6 +56,12 @@ func TestParseCompactOptionsShardedFields(t *testing.T) { if opts.ShardIndex != 2 { t.Fatalf("expected shard index 2, got %d", opts.ShardIndex) } + if opts.PhysicalFileCacheCapacity != "200G" { + t.Fatalf("expected physical file cache capacity %q, got %q", "200G", opts.PhysicalFileCacheCapacity) + } + if opts.Name != "compact-task-a" { + t.Fatalf("expected name %q, got %q", "compact-task-a", opts.Name) + } } func TestParseCompactOptionsShardedFieldsRequireRuntimeIndex(t *testing.T) { @@ -77,11 +85,12 @@ func TestParseCompactOptionsShardedFieldsRequireRuntimeIndex(t *testing.T) { Name: "compact-sharded-missing-index", }, Spec: v1alpha1.CompactSpec{ - StartTs: "400036290571534337", - EndTs: "400036290571534338", - Concurrency: 8, - Mode: v1alpha1.CompactModeSharded, - ShardCount: &shardCount, + StartTs: "400036290571534337", + EndTs: "400036290571534338", + Concurrency: 8, + Mode: v1alpha1.CompactModeSharded, + ShardCount: &shardCount, + PhysicalFileCacheCapacity: "200G", }, } @@ -123,11 +132,12 @@ func TestParseCompactOptionsShardedFieldsRejectInvalidRuntimeIndex(t *testing.T) Name: "compact-sharded-invalid-index", }, Spec: v1alpha1.CompactSpec{ - StartTs: "400036290571534337", - EndTs: "400036290571534338", - Concurrency: 8, - Mode: v1alpha1.CompactModeSharded, - ShardCount: &shardCount, + StartTs: "400036290571534337", + EndTs: "400036290571534338", + Concurrency: 8, + Mode: v1alpha1.CompactModeSharded, + ShardCount: &shardCount, + PhysicalFileCacheCapacity: "200G", }, } @@ -173,6 +183,93 @@ func TestParseCompactOptionsDefaultModeClearsShardedFields(t *testing.T) { if opts.ShardIndex != 0 { t.Fatalf("expected shard index to be cleared, got %d", opts.ShardIndex) } + if opts.Name != "" { + t.Fatalf("expected metadata name not to be used as compact --name, got %q", opts.Name) + } +} + +func TestParseCompactOptionsShardedDefaultsMissingPhysicalFileCacheCapacity(t *testing.T) { + shardCount := int32(4) + t.Setenv("JOB_COMPLETION_INDEX", "2") + opts := &CompactOpts{} + + compact := &v1alpha1.CompactBackup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "compact-sharded", + }, + Spec: v1alpha1.CompactSpec{ + StartTs: "400036290571534337", + EndTs: "400036290571534338", + Concurrency: 8, + Mode: v1alpha1.CompactModeSharded, + ShardCount: &shardCount, + }, + } + + err := ParseCompactOptions(compact, opts) + if err != nil { + t.Fatalf("expected ParseCompactOptions to succeed, got %v", err) + } + if opts.PhysicalFileCacheCapacity != "0" { + t.Fatalf("expected physicalFileCacheCapacity to default to %q, got %q", "0", opts.PhysicalFileCacheCapacity) + } +} + +func TestCompactOptsVerifyAllowsZeroPhysicalFileCacheCapacity(t *testing.T) { + opts := CompactOpts{ + FromTS: 1, + UntilTS: 2, + Concurrency: 1, + PhysicalFileCacheCapacity: "0", + Sharded: true, + ShardIndex: 0, + ShardCount: 3, + } + + if err := opts.Verify(); err != nil { + t.Fatalf("expected Verify to allow zero physicalFileCacheCapacity, got %v", err) + } +} + +func TestCompactOptsVerifyRejectsInvalidPhysicalFileCacheCapacity(t *testing.T) { + testCases := []struct { + name string + capacity string + wantErr string + }{ + { + name: "invalid quantity", + capacity: "150GB", + wantErr: "invalid physicalFileCacheCapacity", + }, + { + name: "negative quantity", + capacity: "-1G", + wantErr: "must be greater than or equal to 0", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts := CompactOpts{ + FromTS: 1, + UntilTS: 2, + Concurrency: 1, + PhysicalFileCacheCapacity: tc.capacity, + Sharded: true, + ShardIndex: 0, + ShardCount: 3, + } + + err := opts.Verify() + if err == nil { + t.Fatal("expected Verify to fail") + } + if !strings.Contains(err.Error(), tc.wantErr) { + t.Fatalf("expected error containing %q, got %v", tc.wantErr, err) + } + }) + } } func TestCompactOptsVerifyRejectsInvalidKubernetesShardIndex(t *testing.T) { @@ -184,36 +281,39 @@ func TestCompactOptsVerifyRejectsInvalidKubernetesShardIndex(t *testing.T) { { name: "non-positive shard count", opts: CompactOpts{ - FromTS: 1, - UntilTS: 2, - Concurrency: 1, - Sharded: true, - ShardIndex: 0, - ShardCount: 0, + FromTS: 1, + UntilTS: 2, + Concurrency: 1, + PhysicalFileCacheCapacity: "200G", + Sharded: true, + ShardIndex: 0, + ShardCount: 0, }, wantErr: "shard-count", }, { name: "negative shard index", opts: CompactOpts{ - FromTS: 1, - UntilTS: 2, - Concurrency: 1, - Sharded: true, - ShardIndex: -1, - ShardCount: 3, + FromTS: 1, + UntilTS: 2, + Concurrency: 1, + PhysicalFileCacheCapacity: "200G", + Sharded: true, + ShardIndex: -1, + ShardCount: 3, }, wantErr: "kubernetes shard-index", }, { name: "out of range shard index", opts: CompactOpts{ - FromTS: 1, - UntilTS: 2, - Concurrency: 1, - Sharded: true, - ShardIndex: 3, - ShardCount: 3, + FromTS: 1, + UntilTS: 2, + Concurrency: 1, + PhysicalFileCacheCapacity: "200G", + Sharded: true, + ShardIndex: 3, + ShardCount: 3, }, wantErr: "kubernetes shard-index", }, @@ -234,12 +334,13 @@ func TestCompactOptsVerifyRejectsInvalidKubernetesShardIndex(t *testing.T) { func TestCompactOptsVerifyAllowsUnsetUntilTSOnlyWhenSharded(t *testing.T) { sharded := CompactOpts{ - FromTS: 1, - UntilTS: 0, - Concurrency: 1, - Sharded: true, - ShardIndex: 0, - ShardCount: 3, + FromTS: 1, + UntilTS: 0, + Concurrency: 1, + PhysicalFileCacheCapacity: "200G", + Sharded: true, + ShardIndex: 0, + ShardCount: 3, } if err := sharded.Verify(); err != nil { t.Fatalf("expected sharded unset UntilTS to be accepted, got %v", err) diff --git a/cmd/backup-manager/app/restore/restore.go b/cmd/backup-manager/app/restore/restore.go index 5103b5dd0e..3dd330123d 100644 --- a/cmd/backup-manager/app/restore/restore.go +++ b/cmd/backup-manager/app/restore/restore.go @@ -39,12 +39,6 @@ import ( ) const ( - // replicationStatusSubPrefix is the fixed sub-prefix BR uses to - // lay out replication status under the log-backup storage. - // Hardcoded here (not exposed via API) per spec §6: this is a BR - // call detail and shouldn't leak into the CRD. - replicationStatusSubPrefix = "crr-checkpoint" - // replicationPiTRConcurrency is the parallelism BR uses when // applying compacted log files. Spec §6. replicationPiTRConcurrency = 1024 @@ -63,7 +57,6 @@ func replicationBRFlags(phase int) []string { } return []string{ fmt.Sprintf("--restore-phase=%d", phase), - fmt.Sprintf("--replication-status-sub-prefix=%s", replicationStatusSubPrefix), fmt.Sprintf("--pitr-concurrency=%d", replicationPiTRConcurrency), fmt.Sprintf("--metadata-download-batch-size=%d", replicationMetadataDownloadBatchSize), "--retain-latest-mvcc-version", diff --git a/cmd/backup-manager/app/restore/restore_test.go b/cmd/backup-manager/app/restore/restore_test.go index c6343aec99..92a30ac040 100644 --- a/cmd/backup-manager/app/restore/restore_test.go +++ b/cmd/backup-manager/app/restore/restore_test.go @@ -33,8 +33,7 @@ func TestOptions_ReplicationPhase_DefaultsToZero(t *testing.T) { // TestReplicationBRFlags verifies the replication-specific BR flags // produced by replicationBRFlags. Phase 0 returns nil (allowing // `append(args, replicationBRFlags(0)...)` to be a no-op for standard -// PiTR); phases 1 and 2 produce the flags spec §6 mandates, -// with the constants for sub-prefix and concurrency wired in. +// PiTR); phases 1 and 2 produce the flags spec §6 mandates. func TestReplicationBRFlags(t *testing.T) { cases := []struct { phase int @@ -43,14 +42,12 @@ func TestReplicationBRFlags(t *testing.T) { {phase: 0, want: nil}, {phase: 1, want: []string{ "--restore-phase=1", - "--replication-status-sub-prefix=crr-checkpoint", "--pitr-concurrency=1024", "--metadata-download-batch-size=512", "--retain-latest-mvcc-version", }}, {phase: 2, want: []string{ "--restore-phase=2", - "--replication-status-sub-prefix=crr-checkpoint", "--pitr-concurrency=1024", "--metadata-download-batch-size=512", "--retain-latest-mvcc-version", diff --git a/docs/api-references/docs.md b/docs/api-references/docs.md index 960e10198a..4355d6a6ff 100644 --- a/docs/api-references/docs.md +++ b/docs/api-references/docs.md @@ -5456,6 +5456,31 @@ int32 +physicalFileCacheCapacity
+ +string + + + +(Optional) +

PhysicalFileCacheCapacity is passed to tikv-ctl compact-log-backup as –physical-file-cache-capacity. +It uses Kubernetes quantity format, e.g. “150G” or “150Gi”, and defaults to “0” when omitted.

+ + + + +name
+ +string + + + +(Optional) +

Name is passed to tikv-ctl compact-log-backup as –name when configured.

+ + + + env
@@ -5854,6 +5879,31 @@ int32 +physicalFileCacheCapacity
+ +string + + + +(Optional) +

PhysicalFileCacheCapacity is passed to tikv-ctl compact-log-backup as –physical-file-cache-capacity. +It uses Kubernetes quantity format, e.g. “150G” or “150Gi”, and defaults to “0” when omitted.

+ + + + +name
+ +string + + + +(Optional) +

Name is passed to tikv-ctl compact-log-backup as –name when configured.

+ + + + env
diff --git a/images/tidb-backup-manager/Dockerfile b/images/tidb-backup-manager/Dockerfile index 09eb7619fe..6700b3e934 100644 --- a/images/tidb-backup-manager/Dockerfile +++ b/images/tidb-backup-manager/Dockerfile @@ -2,15 +2,16 @@ FROM ghcr.io/pingcap-qe/bases/pingcap-base:v1.11.1@sha256:24ecefceae6b114e11f6ae ARG TARGETARCH ARG RCLONE_VERSION=v1.71.2 ARG SHUSH_VERSION=v1.5.5 +ARG WGET_RETRY_OPTS="--tries=5 --timeout=60 --waitretry=5 --retry-connrefused" RUN dnf install -y ca-certificates bind-utils wget nc unzip && dnf clean all -RUN wget -nv https://github.com/ncw/rclone/releases/download/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-${TARGETARCH}.zip \ +RUN wget -nv ${WGET_RETRY_OPTS} https://github.com/ncw/rclone/releases/download/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-${TARGETARCH}.zip \ && unzip rclone-${RCLONE_VERSION}-linux-${TARGETARCH}.zip \ && mv rclone-${RCLONE_VERSION}-linux-${TARGETARCH}/rclone /usr/local/bin \ && chmod 755 /usr/local/bin/rclone \ && rm -rf rclone-${RCLONE_VERSION}-linux-${TARGETARCH}.zip rclone-${RCLONE_VERSION}-linux-${TARGETARCH} -RUN wget -nv https://github.com/realestate-com-au/shush/releases/download/${SHUSH_VERSION}/shush_linux_${TARGETARCH} \ +RUN wget -nv ${WGET_RETRY_OPTS} https://github.com/realestate-com-au/shush/releases/download/${SHUSH_VERSION}/shush_linux_${TARGETARCH} \ && mv shush_linux_${TARGETARCH} /usr/local/bin/shush \ && chmod 755 /usr/local/bin/shush diff --git a/images/tidb-backup-manager/Dockerfile.e2e b/images/tidb-backup-manager/Dockerfile.e2e index 77f6eb9ce9..fa0fdad546 100644 --- a/images/tidb-backup-manager/Dockerfile.e2e +++ b/images/tidb-backup-manager/Dockerfile.e2e @@ -2,15 +2,16 @@ FROM ghcr.io/pingcap-qe/bases/pingcap-base:v1.11.1@sha256:24ecefceae6b114e11f6ae ARG TARGETARCH=amd64 ARG RCLONE_VERSION=v1.71.2 ARG SHUSH_VERSION=v1.5.5 +ARG WGET_RETRY_OPTS="--tries=5 --timeout=60 --waitretry=5 --retry-connrefused" RUN dnf install -y ca-certificates bind-utils wget nc unzip && dnf clean all -RUN wget -nv https://github.com/ncw/rclone/releases/download/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-${TARGETARCH}.zip \ +RUN wget -nv ${WGET_RETRY_OPTS} https://github.com/ncw/rclone/releases/download/${RCLONE_VERSION}/rclone-${RCLONE_VERSION}-linux-${TARGETARCH}.zip \ && unzip rclone-${RCLONE_VERSION}-linux-${TARGETARCH}.zip \ && mv rclone-${RCLONE_VERSION}-linux-${TARGETARCH}/rclone /usr/local/bin \ && chmod 755 /usr/local/bin/rclone \ && rm -rf rclone-${RCLONE_VERSION}-linux-${TARGETARCH}.zip rclone-${RCLONE_VERSION}-linux-${TARGETARCH} -RUN wget -nv https://github.com/realestate-com-au/shush/releases/download/${SHUSH_VERSION}/shush_linux_${TARGETARCH} \ +RUN wget -nv ${WGET_RETRY_OPTS} https://github.com/realestate-com-au/shush/releases/download/${SHUSH_VERSION}/shush_linux_${TARGETARCH} \ && mv shush_linux_${TARGETARCH} /usr/local/bin/shush \ && chmod 755 /usr/local/bin/shush diff --git a/manifests/crd.yaml b/manifests/crd.yaml index 9175dd8c22..bd39d2b32c 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -7108,6 +7108,10 @@ spec: - "" - sharded type: string + name: + type: string + physicalFileCacheCapacity: + type: string podSecurityContext: properties: fsGroup: @@ -12624,6 +12628,10 @@ spec: - "" - sharded type: string + name: + type: string + physicalFileCacheCapacity: + type: string podSecurityContext: properties: fsGroup: diff --git a/manifests/crd/v1/pingcap.com_backupschedules.yaml b/manifests/crd/v1/pingcap.com_backupschedules.yaml index ed8f093461..2fb10add5a 100644 --- a/manifests/crd/v1/pingcap.com_backupschedules.yaml +++ b/manifests/crd/v1/pingcap.com_backupschedules.yaml @@ -4541,6 +4541,10 @@ spec: - "" - sharded type: string + name: + type: string + physicalFileCacheCapacity: + type: string podSecurityContext: properties: fsGroup: diff --git a/manifests/crd/v1/pingcap.com_compactbackups.yaml b/manifests/crd/v1/pingcap.com_compactbackups.yaml index 7a1cff6aaf..45caa0eade 100644 --- a/manifests/crd/v1/pingcap.com_compactbackups.yaml +++ b/manifests/crd/v1/pingcap.com_compactbackups.yaml @@ -2134,6 +2134,10 @@ spec: - "" - sharded type: string + name: + type: string + physicalFileCacheCapacity: + type: string podSecurityContext: properties: fsGroup: diff --git a/pkg/apis/label/label.go b/pkg/apis/label/label.go index 75c62fc87d..93e85ccc0d 100644 --- a/pkg/apis/label/label.go +++ b/pkg/apis/label/label.go @@ -59,6 +59,9 @@ const ( // BackupLabelKey is backup key BackupLabelKey string = "tidb.pingcap.com/backup" + // CompactLabelKey is compact backup key + CompactLabelKey string = "tidb.pingcap.com/compact" + // RestoreLabelKey is restore key RestoreLabelKey string = "tidb.pingcap.com/restore" // RestoreWarmUpLabelKey defines which pod the restore warms up @@ -224,6 +227,8 @@ const ( RestoreWarmUpJobLabelVal string = "warmup" // BackupJobLabelVal is backup job label value BackupJobLabelVal string = "backup" + // CompactJobLabelVal is compact backup job label value + CompactJobLabelVal string = "compactbackup" // BackupScheduleJobLabelVal is backup schedule job label value BackupScheduleJobLabelVal string = "backup-schedule" // InitJobLabelVal is TiDB initializer job label value @@ -305,6 +310,14 @@ func NewBackup() Label { } } +// NewCompactBackup initialize a new Label for Jobs of compact backup +func NewCompactBackup() Label { + return Label{ + NameLabelKey: CompactJobLabelVal, + ManagedByLabelKey: "compact-backup-operator", + } +} + // NewRestore initialize a new Label for Jobs of restore func NewRestore() Label { return Label{ @@ -419,6 +432,11 @@ func (l Label) BackupJob() Label { return l.Component(BackupJobLabelVal) } +// CompactJob assigns compact backup to component key in label +func (l Label) CompactJob() Label { + return l.Component(CompactJobLabelVal) +} + // RestoreJob assigns restore to component key in label func (l Label) RestoreJob() Label { return l.Component(RestoreJobLabelVal) @@ -434,6 +452,12 @@ func (l Label) Backup(val string) Label { return l } +// Compact assigns specific value to compact backup key in label +func (l Label) Compact(val string) Label { + l[CompactLabelKey] = val + return l +} + // BackupSchedule assigns specific value to backup schedule key in label func (l Label) BackupSchedule(val string) Label { l[BackupScheduleLabelKey] = val diff --git a/pkg/apis/pingcap/v1alpha1/compact_backup.go b/pkg/apis/pingcap/v1alpha1/compact_backup.go new file mode 100644 index 0000000000..af4e9d6166 --- /dev/null +++ b/pkg/apis/pingcap/v1alpha1/compact_backup.go @@ -0,0 +1,26 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha1 + +import "github.com/pingcap/tidb-operator/pkg/apis/label" + +// GetInstanceName return the compact backup instance name. +func (cb *CompactBackup) GetInstanceName() string { + if cb.Labels != nil { + if v, ok := cb.Labels[label.InstanceLabelKey]; ok { + return v + } + } + return cb.Name +} diff --git a/pkg/apis/pingcap/v1alpha1/openapi_generated.go b/pkg/apis/pingcap/v1alpha1/openapi_generated.go index 2b42d55c51..2afe96848f 100644 --- a/pkg/apis/pingcap/v1alpha1/openapi_generated.go +++ b/pkg/apis/pingcap/v1alpha1/openapi_generated.go @@ -1614,6 +1614,20 @@ func schema_pkg_apis_pingcap_v1alpha1_CompactSpec(ref common.ReferenceCallback) Format: "int32", }, }, + "physicalFileCacheCapacity": { + SchemaProps: spec.SchemaProps{ + Description: "PhysicalFileCacheCapacity is passed to tikv-ctl compact-log-backup as --physical-file-cache-capacity. It uses Kubernetes quantity format, e.g. \"150G\" or \"150Gi\", and defaults to \"0\" when omitted.", + Type: []string{"string"}, + Format: "", + }, + }, + "name": { + SchemaProps: spec.SchemaProps{ + Description: "Name is passed to tikv-ctl compact-log-backup as --name when configured.", + Type: []string{"string"}, + Format: "", + }, + }, "env": { SchemaProps: spec.SchemaProps{ Description: "List of environment variables to set in the container, like v1.Container.Env. Note that the following builtin env vars will be overwritten by values set here - S3_PROVIDER - S3_ENDPOINT - AWS_REGION - AWS_ACL - AWS_STORAGE_CLASS - AWS_DEFAULT_REGION - AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY - GCS_PROJECT_ID - GCS_OBJECT_ACL - GCS_BUCKET_ACL - GCS_LOCATION - GCS_STORAGE_CLASS - GCS_SERVICE_ACCOUNT_JSON_KEY - BR_LOG_TO_TERM", diff --git a/pkg/apis/pingcap/v1alpha1/types.go b/pkg/apis/pingcap/v1alpha1/types.go index 60fd5cc1f5..766f8d575a 100644 --- a/pkg/apis/pingcap/v1alpha1/types.go +++ b/pkg/apis/pingcap/v1alpha1/types.go @@ -3965,6 +3965,13 @@ type CompactSpec struct { // +kubebuilder:validation:Minimum=1 // +optional ShardCount *int32 `json:"shardCount,omitempty"` + // PhysicalFileCacheCapacity is passed to tikv-ctl compact-log-backup as --physical-file-cache-capacity. + // It uses Kubernetes quantity format, e.g. "150G" or "150Gi", and defaults to "0" when omitted. + // +optional + PhysicalFileCacheCapacity string `json:"physicalFileCacheCapacity,omitempty"` + // Name is passed to tikv-ctl compact-log-backup as --name when configured. + // +optional + Name string `json:"name,omitempty"` // List of environment variables to set in the container, like v1.Container.Env. // Note that the following builtin env vars will be overwritten by values set here // - S3_PROVIDER diff --git a/pkg/controller/compactbackup/compact_backup_controller.go b/pkg/controller/compactbackup/compact_backup_controller.go index 9096aacb16..86c6ac28fc 100644 --- a/pkg/controller/compactbackup/compact_backup_controller.go +++ b/pkg/controller/compactbackup/compact_backup_controller.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math" + "strings" "time" perrors "github.com/pingcap/errors" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/util" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -400,8 +402,7 @@ func (c *Controller) makeCompactJob(compact *v1alpha1.CompactBackup) (*batchv1.J } klog.Infof("compact %s/%s use br image %s and tikv image %s", ns, name, brImage, tikvImage) - //TODO: (Ris)What is the instance here? - jobLabels := util.CombineStringMap(label.NewBackup().Instance("Compact-Backup").BackupJob().Backup(name), compact.Labels) + jobLabels := util.CombineStringMap(label.NewCompactBackup().Instance(compact.GetInstanceName()).CompactJob().Compact(name), compact.Labels) podLabels := jobLabels jobAnnotations := compact.Annotations podAnnotations := jobAnnotations @@ -607,6 +608,18 @@ func (c *Controller) validate(compact *v1alpha1.CompactBackup) error { if spec.Mode == v1alpha1.CompactModeSharded && (spec.ShardCount == nil || *spec.ShardCount < 1) { return perrors.NewNoStackError("shardCount must be greater than or equal to 1 when mode is sharded") } + if spec.Mode == v1alpha1.CompactModeSharded { + physicalFileCacheCapacity := strings.TrimSpace(spec.PhysicalFileCacheCapacity) + if physicalFileCacheCapacity != "" { + capacity, err := resource.ParseQuantity(physicalFileCacheCapacity) + if err != nil { + return perrors.NewNoStackError(fmt.Sprintf("invalid physicalFileCacheCapacity %q: %v", physicalFileCacheCapacity, err)) + } + if capacity.Sign() < 0 { + return perrors.NewNoStackError("physicalFileCacheCapacity must be greater than or equal to 0") + } + } + } if spec.Mode != v1alpha1.CompactModeSharded && spec.ShardCount != nil { return perrors.NewNoStackError("shardCount can only be set when mode is sharded") } diff --git a/pkg/controller/compactbackup/compact_backup_controller_sharded_test.go b/pkg/controller/compactbackup/compact_backup_controller_sharded_test.go index 3e427655ed..4a3a0e449d 100644 --- a/pkg/controller/compactbackup/compact_backup_controller_sharded_test.go +++ b/pkg/controller/compactbackup/compact_backup_controller_sharded_test.go @@ -69,6 +69,41 @@ func TestMakeCompactJobDefaultModeUnchanged(t *testing.T) { } } +func TestMakeCompactJobUsesCompactBackupLabels(t *testing.T) { + c := newTestController(t) + compact := newCompactBackupForTest() + compact.Labels = map[string]string{ + "app.kubernetes.io/instance": "daily-log-backup", + } + + job, reason, err := c.makeCompactJob(compact) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if reason != "" { + t.Fatalf("expected empty reason, got %q", reason) + } + + expectedLabels := map[string]string{ + "app.kubernetes.io/name": "compactbackup", + "app.kubernetes.io/managed-by": "compact-backup-operator", + "app.kubernetes.io/instance": "daily-log-backup", + "app.kubernetes.io/component": "compactbackup", + "tidb.pingcap.com/compact": compact.Name, + } + for k, v := range expectedLabels { + if job.Labels[k] != v { + t.Fatalf("expected job label %s=%q, got %q", k, v, job.Labels[k]) + } + if job.Spec.Template.Labels[k] != v { + t.Fatalf("expected pod template label %s=%q, got %q", k, v, job.Spec.Template.Labels[k]) + } + } + if _, ok := job.Labels["tidb.pingcap.com/backup"]; ok { + t.Fatalf("compact job should not use backup label key, got labels %v", job.Labels) + } +} + func TestMakeCompactJobShardedMode(t *testing.T) { c := newTestController(t) compact := newCompactBackupForTest() @@ -110,6 +145,7 @@ func TestValidateShardedModeRequiresPositiveShardCount(t *testing.T) { c := newTestController(t) compact := newCompactBackupForTest() compact.Spec.Mode = v1alpha1.CompactModeSharded + compact.Spec.PhysicalFileCacheCapacity = "200G" err := c.validate(compact) if err == nil { @@ -120,6 +156,71 @@ func TestValidateShardedModeRequiresPositiveShardCount(t *testing.T) { } } +func TestValidateShardedModeAllowsMissingPhysicalFileCacheCapacity(t *testing.T) { + c := newTestController(t) + compact := newCompactBackupForTest() + shardCount := int32(3) + compact.Spec.Mode = v1alpha1.CompactModeSharded + compact.Spec.ShardCount = &shardCount + + err := c.validate(compact) + if err != nil { + t.Fatalf("expected no validation error, got %v", err) + } +} + +func TestValidateShardedModeAllowsZeroPhysicalFileCacheCapacity(t *testing.T) { + c := newTestController(t) + compact := newCompactBackupForTest() + shardCount := int32(3) + compact.Spec.Mode = v1alpha1.CompactModeSharded + compact.Spec.ShardCount = &shardCount + compact.Spec.PhysicalFileCacheCapacity = "0" + + err := c.validate(compact) + if err != nil { + t.Fatalf("expected no validation error, got %v", err) + } +} + +func TestValidateShardedModeRejectsInvalidPhysicalFileCacheCapacity(t *testing.T) { + testCases := []struct { + name string + capacity string + wantErr string + }{ + { + name: "invalid quantity", + capacity: "150GB", + wantErr: "invalid physicalFileCacheCapacity", + }, + { + name: "negative quantity", + capacity: "-1G", + wantErr: "must be greater than or equal to 0", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c := newTestController(t) + compact := newCompactBackupForTest() + shardCount := int32(3) + compact.Spec.Mode = v1alpha1.CompactModeSharded + compact.Spec.ShardCount = &shardCount + compact.Spec.PhysicalFileCacheCapacity = tc.capacity + + err := c.validate(compact) + if err == nil { + t.Fatal("expected validation error, got nil") + } + if !strings.Contains(err.Error(), tc.wantErr) { + t.Fatalf("expected error containing %q, got %v", tc.wantErr, err) + } + }) + } +} + func TestValidateRejectsShardCountWithoutShardedMode(t *testing.T) { c := newTestController(t) compact := newCompactBackupForTest() @@ -142,6 +243,7 @@ func TestValidateAllowsEmptyEndTsOnlyForShardedCCRCheckpointMode(t *testing.T) { shardCount := int32(3) compact.Spec.Mode = v1alpha1.CompactModeSharded compact.Spec.ShardCount = &shardCount + compact.Spec.PhysicalFileCacheCapacity = "200G" compact.Spec.EndTs = "" err := c.validate(compact) @@ -172,6 +274,7 @@ func TestSyncShardedModeRequiresSupportedK8sVersion(t *testing.T) { shardCount := int32(2) compact.Spec.Mode = v1alpha1.CompactModeSharded compact.Spec.ShardCount = &shardCount + compact.Spec.PhysicalFileCacheCapacity = "200G" fakeDiscovery := c.deps.KubeClientset.Discovery().(*fakediscovery.FakeDiscovery) fakeDiscovery.FakedServerVersion = &version.Info{Major: "1", Minor: "28"} @@ -215,6 +318,7 @@ func TestSyncShardedModeRequeuesOnDiscoveryError(t *testing.T) { shardCount := int32(2) compact.Spec.Mode = v1alpha1.CompactModeSharded compact.Spec.ShardCount = &shardCount + compact.Spec.PhysicalFileCacheCapacity = "200G" fakeDiscovery := c.deps.KubeClientset.Discovery().(*fakediscovery.FakeDiscovery) fakeDiscovery.PrependReactor("get", "version", func(action k8stesting.Action) (bool, runtime.Object, error) {