Skip to content

Commit bdcd6c0

Browse files
committed
chore: review
Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
1 parent 39e1836 commit bdcd6c0

2 files changed

Lines changed: 38 additions & 7 deletions

File tree

docs/examples/minio-store.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ spec:
2121
key: ACCESS_SECRET_KEY
2222
wal:
2323
compression: gzip
24+
maxParallel: 8
2425
data:
2526
additionalCommandArgs:
2627
- "--min-chunk-size=5MB"

internal/cnpgi/common/wal.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
barmanRestorer "github.com/cloudnative-pg/barman-cloud/pkg/restorer"
1515
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
1616
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
17+
walUtils "github.com/cloudnative-pg/machinery/pkg/fileutils/wals"
1718
"github.com/cloudnative-pg/machinery/pkg/log"
1819
apierrors "k8s.io/apimachinery/pkg/api/errors"
1920
"k8s.io/apimachinery/pkg/types"
@@ -67,6 +68,8 @@ func (w WALServiceImplementation) Archive(
6768
contextLogger := log.FromContext(ctx)
6869
contextLogger.Debug("starting wal archive")
6970

71+
baseWalName := path.Base(request.GetSourceFileName())
72+
7073
configuration, err := config.NewFromClusterJSON(request.ClusterDefinition)
7174
if err != nil {
7275
return nil, err
@@ -103,17 +106,44 @@ func (w WALServiceImplementation) Archive(
103106
return nil, err
104107
}
105108

109+
// Step 2: check if this WAL file has not been already archived
110+
var isDeletedFromSpool bool
111+
isDeletedFromSpool, err = arch.DeleteFromSpool(baseWalName)
112+
if err != nil {
113+
// TODO(leonardoce): factor out errors
114+
return nil, fmt.Errorf("while testing the existence of the WAL file in the spool directory: %w", err)
115+
}
116+
if isDeletedFromSpool {
117+
contextLogger.Info("Archived WAL file (parallel)",
118+
"walName", baseWalName)
119+
return nil, nil
120+
}
121+
122+
// Step 3: gather the WAL files names to archive
106123
options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, configuration.ServerName)
107124
if err != nil {
108125
return nil, err
109126
}
110-
barmanConfiguration := &objectStore.Spec.Configuration
111-
maxParallel := 1
112-
if barmanConfiguration.Wal != nil && barmanConfiguration.Wal.MaxParallel > 1 {
113-
maxParallel = barmanConfiguration.Wal.MaxParallel
114-
}
115-
walList := arch.GatherWALFilesToArchive(ctx, request.GetSourceFileName(), maxParallel)
116-
result := arch.ArchiveList(ctx, walList, options)
127+
128+
maxParallel := 1
129+
if objectStore.Spec.Configuration.Wal != nil {
130+
maxParallel = objectStore.Spec.Configuration.Wal.MaxParallel
131+
}
132+
133+
walFilesList := walUtils.GatherReadyWALFiles(
134+
ctx,
135+
walUtils.GatherReadyWALFilesConfig{
136+
MaxResults: maxParallel,
137+
SkipWALs: []string{baseWalName},
138+
PgDataPath: w.PGDataPath,
139+
},
140+
)
141+
142+
// Ensure the requested WAL file is always the first one being
143+
// archived
144+
walFilesList.Ready = append([]string{request.GetSourceFileName()}, walFilesList.Ready...)
145+
146+
result := arch.ArchiveList(ctx, walFilesList.ReadyItemsToSlice(), options)
117147
for _, archiverResult := range result {
118148
if archiverResult.Err != nil {
119149
return nil, archiverResult.Err

0 commit comments

Comments
 (0)