Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions cmd/backup-manager/app/compact/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type Manager struct {
options options.CompactOpts
}

const crrCheckpointPrefix = "crr-checkpoint"
const redactedCompactArgValue = "<redacted>"

// NewManager return a Manager
Expand Down Expand Up @@ -190,6 +189,9 @@ func (cm *Manager) buildCompactArgs(base64Storage string) []string {
"-N",
strconv.FormatUint(cm.options.Concurrency, 10),
}
if cm.options.Name != "" {
args = append(args, "--name", cm.options.Name)
}

if cm.options.Sharded {
return cm.buildShardedCompactArgs(args)
Expand All @@ -208,16 +210,11 @@ func (cm *Manager) buildShardedCompactArgs(args []string) []string {
args = append(args,
"--cal-shift-ts",
"--physical-file-cache-capacity",
"150G",
cm.options.PhysicalFileCacheCapacity,
)

// When the CR sets EndTs explicitly, honor it as a hard upper bound via
// --until. Otherwise let tikv-ctl resolve until-ts from the replication
// checkpoint stored under the fixed crr-checkpoint sub-prefix.
if cm.options.UntilTS != 0 {
args = append(args, "--until", strconv.FormatUint(cm.options.UntilTS, 10))
} else {
args = append(args, "--crr-checkpoint-prefix", crrCheckpointPrefix)
}

// --shard tells tikv-ctl this pod's slice of the keyspace partition.
Expand Down
212 changes: 103 additions & 109 deletions cmd/backup-manager/app/compact/manager_sharded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,107 +28,110 @@ import (
"k8s.io/client-go/tools/cache"
)

func TestBuildCompactArgsDefaultMode(t *testing.T) {
manager := &Manager{
compact: &v1alpha1.CompactBackup{},
options: options.CompactOpts{
FromTS: 11,
UntilTS: 22,
Concurrency: 4,
func TestBuildCompactArgs(t *testing.T) {
testCases := []struct {
name string
opts options.CompactOpts
want []string
}{
{
name: "default mode",
opts: options.CompactOpts{
FromTS: 11,
UntilTS: 22,
Concurrency: 4,
},
want: []string{
"--log-level", "INFO",
"--log-format", "json",
"compact-log-backup",
"--storage-base64", "storage-base64",
"--from", "11",
"-N", "4",
"--until", "22",
},
},
}

args := manager.buildCompactArgs("storage-base64")
want := []string{
"--log-level", "INFO",
"--log-format", "json",
"compact-log-backup",
"--storage-base64", "storage-base64",
"--from", "11",
"-N", "4",
"--until", "22",
}

assertStringSliceEqual(t, args, want)
}

func TestBuildCompactArgsShardedMode(t *testing.T) {
manager := &Manager{
compact: &v1alpha1.CompactBackup{},
options: options.CompactOpts{
FromTS: 11,
UntilTS: 22,
Concurrency: 4,
Sharded: true,
ShardIndex: 1,
ShardCount: 3,
{
name: "default mode with name",
opts: options.CompactOpts{
FromTS: 11,
UntilTS: 22,
Name: "compact-task-a",
Concurrency: 4,
},
want: []string{
"--log-level", "INFO",
"--log-format", "json",
"compact-log-backup",
"--storage-base64", "storage-base64",
"--from", "11",
"-N", "4",
"--name", "compact-task-a",
"--until", "22",
},
},
}

args := manager.buildCompactArgs("storage-base64")
want := []string{
"--log-level", "INFO",
"--log-format", "json",
"compact-log-backup",
"--storage-base64", "storage-base64",
"--from", "11",
"-N", "4",
"--cal-shift-ts",
"--physical-file-cache-capacity", "150G",
"--until", "22",
"--shard", "2/3",
"--minimal-compaction-size", "0",
}

assertStringSliceEqual(t, args, want)
}

func TestBuildCompactArgsCRRModeShardedUsesCheckpointPrefix(t *testing.T) {
manager := &Manager{
compact: &v1alpha1.CompactBackup{},
options: options.CompactOpts{
FromTS: 11,
UntilTS: 0,
Concurrency: 4,
Sharded: true,
ShardIndex: 1,
ShardCount: 3,
{
name: "sharded mode",
opts: options.CompactOpts{
FromTS: 11,
UntilTS: 22,
Concurrency: 4,
PhysicalFileCacheCapacity: "200G",
Sharded: true,
ShardIndex: 0,
ShardCount: 3,
},
want: []string{
"--log-level", "INFO",
"--log-format", "json",
"compact-log-backup",
"--storage-base64", "storage-base64",
"--from", "11",
"-N", "4",
"--cal-shift-ts",
"--physical-file-cache-capacity", "200G",
"--until", "22",
"--shard", "1/3",
"--minimal-compaction-size", "0",
},
},
{
name: "sharded mode without until",
opts: options.CompactOpts{
FromTS: 11,
UntilTS: 0,
Concurrency: 4,
PhysicalFileCacheCapacity: "200G",
Sharded: true,
ShardIndex: 1,
ShardCount: 3,
},
want: []string{
"--log-level", "INFO",
"--log-format", "json",
"compact-log-backup",
"--storage-base64", "storage-base64",
"--from", "11",
"-N", "4",
"--cal-shift-ts",
"--physical-file-cache-capacity", "200G",
"--shard", "2/3",
"--minimal-compaction-size", "0",
},
},
}

args := manager.buildCompactArgs("storage-base64")
want := []string{
"--log-level", "INFO",
"--log-format", "json",
"compact-log-backup",
"--storage-base64", "storage-base64",
"--from", "11",
"-N", "4",
"--cal-shift-ts",
"--physical-file-cache-capacity", "150G",
"--crr-checkpoint-prefix", "crr-checkpoint",
"--shard", "2/3",
"--minimal-compaction-size", "0",
}

assertStringSliceEqual(t, args, want)
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
manager := &Manager{
compact: &v1alpha1.CompactBackup{},
options: tc.opts,
}

func TestBuildCompactArgsShardedModeConvertsKubernetesIndexToOneBasedShard(t *testing.T) {
manager := &Manager{
compact: &v1alpha1.CompactBackup{},
options: options.CompactOpts{
FromTS: 11,
UntilTS: 22,
Concurrency: 4,
Sharded: true,
ShardIndex: 0,
ShardCount: 3,
},
args := manager.buildCompactArgs("storage-base64")
assertStringSliceEqual(t, args, tc.want)
})
}

args := manager.buildCompactArgs("storage-base64")
assertStringSliceContainsPair(t, args, "--shard", "1/3")
}

func TestSanitizeCompactCommandArgsRedactsStorageBase64Value(t *testing.T) {
Expand Down Expand Up @@ -231,16 +234,6 @@ func assertStringSliceEqual(t *testing.T, got, want []string) {
}
}

func assertStringSliceContainsPair(t *testing.T, got []string, key, value string) {
t.Helper()
for i := 0; i+1 < len(got); i++ {
if got[i] == key && got[i+1] == value {
return
}
}
t.Fatalf("expected args to contain %q %q, got %#v", key, value, got)
}

func newManagerForProcessCompactTest(t *testing.T, compact *v1alpha1.CompactBackup) *Manager {
t.Helper()

Expand Down Expand Up @@ -271,11 +264,12 @@ func newShardedCompactBackupForManagerTest() *v1alpha1.CompactBackup {
Namespace: "default",
},
Spec: v1alpha1.CompactSpec{
StartTs: "400036290571534337",
EndTs: "400036290571534338",
Concurrency: 4,
Mode: v1alpha1.CompactModeSharded,
ShardCount: &shardCount,
StartTs: "400036290571534337",
EndTs: "400036290571534338",
Concurrency: 4,
Mode: v1alpha1.CompactModeSharded,
ShardCount: &shardCount,
PhysicalFileCacheCapacity: "200G",
},
}
}
Expand Down
48 changes: 33 additions & 15 deletions cmd/backup-manager/app/compact/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"math"
"os"
"strconv"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/apis/util/config"
"k8s.io/apimachinery/pkg/api/resource"
)

const (
Expand All @@ -30,16 +32,17 @@ const (
)

type CompactOpts struct {
FromTS uint64
UntilTS uint64
Name string
Concurrency uint64
ShardIndex int
ShardCount int
Sharded bool
Namespace string `json:"namespace"`
ResourceName string `json:"resourceName"`
TikvVersion string `json:"tikvVersion"`
FromTS uint64
UntilTS uint64
Name string
Concurrency uint64
PhysicalFileCacheCapacity string
ShardIndex int
ShardCount int
Sharded bool
Namespace string `json:"namespace"`
ResourceName string `json:"resourceName"`
TikvVersion string `json:"tikvVersion"`
}

func ParseCompactOptions(compact *v1alpha1.CompactBackup, opts *CompactOpts) error {
Expand All @@ -55,8 +58,12 @@ func ParseCompactOptions(compact *v1alpha1.CompactBackup, opts *CompactOpts) err
opts.FromTS = startTs
opts.UntilTS = endTs

opts.Name = compact.Name
opts.Name = strings.TrimSpace(compact.Spec.Name)
opts.Concurrency = uint64(compact.Spec.Concurrency)
opts.PhysicalFileCacheCapacity = strings.TrimSpace(compact.Spec.PhysicalFileCacheCapacity)
if opts.PhysicalFileCacheCapacity == "" {
opts.PhysicalFileCacheCapacity = "0"
}
opts.Sharded = compact.Spec.Mode == v1alpha1.CompactModeSharded
opts.ShardIndex = 0
opts.ShardCount = 0
Expand All @@ -81,10 +88,10 @@ func (c *CompactOpts) Verify() error {
if c.FromTS == fromTSUnset {
return errors.New("from-ts must be set")
}
// UntilTS unset is valid only in sharded CCR checkpoint mode: tikv-ctl
// reads the until-ts from the log-backup global checkpoint via
// --crr-checkpoint-prefix. Non-sharded compact keeps the existing
// requirement that EndTs/UntilTS must be set explicitly.
// UntilTS unset is valid only in sharded CCR checkpoint mode: tikv-ctl can
// read the until-ts from the log-backup global checkpoint. Non-sharded
// compact keeps the existing requirement that EndTs/UntilTS must be set
// explicitly.
if c.UntilTS == untilTSUnset && !c.Sharded {
return errors.New("until-ts must be set")
}
Expand All @@ -95,6 +102,17 @@ func (c *CompactOpts) Verify() error {
return errors.Errorf("concurrency %d must be greater than 0", c.Concurrency)
}
if c.Sharded {
c.PhysicalFileCacheCapacity = strings.TrimSpace(c.PhysicalFileCacheCapacity)
if c.PhysicalFileCacheCapacity == "" {
c.PhysicalFileCacheCapacity = "0"
}
capacity, err := resource.ParseQuantity(c.PhysicalFileCacheCapacity)
if err != nil {
return errors.Annotatef(err, "invalid physicalFileCacheCapacity %q", c.PhysicalFileCacheCapacity)
}
if capacity.Sign() < 0 {
return errors.New("physicalFileCacheCapacity must be greater than or equal to 0")
}
if c.ShardCount <= 0 {
return errors.Errorf("shard-count %d must be greater than 0", c.ShardCount)
}
Expand Down
Loading
Loading