Skip to content

Commit 8edec32

Browse files
committed
Merge branch 'claude/find-rds-refresh-component-zCRUu' into 'master'
Auto-detect parallelism for pg_dump and pg_restore from RDS instance type See merge request postgres-ai/database-lab!1133
2 parents ed8f7e6 + 9e9505e commit 8edec32

5 files changed

Lines changed: 488 additions & 21 deletions

File tree

engine/internal/rdsrefresh/dblab.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ type SourceConfigUpdate struct {
180180
Password string
181181
// RDSIAMDBInstance is the RDS DB instance identifier for IAM auth. When empty, this field is omitted from the config update.
182182
RDSIAMDBInstance string
183+
// DumpParallelJobs sets the -j flag for pg_dump. When zero, the existing value is preserved.
184+
DumpParallelJobs int
185+
// RestoreParallelJobs sets the -j flag for pg_restore. When zero, the existing value is preserved.
186+
RestoreParallelJobs int
183187
}
184188

185189
// UpdateSourceConfig updates the source database connection in DBLab config.
@@ -198,6 +202,16 @@ func (c *DBLabClient) UpdateSourceConfig(ctx context.Context, update SourceConfi
198202
proj.RDSIAMDBInstance = &update.RDSIAMDBInstance
199203
}
200204

205+
if update.DumpParallelJobs > 0 {
206+
dumpJobs := int64(update.DumpParallelJobs)
207+
proj.DumpParallelJobs = &dumpJobs
208+
}
209+
210+
if update.RestoreParallelJobs > 0 {
211+
restoreJobs := int64(update.RestoreParallelJobs)
212+
proj.RestoreParallelJobs = &restoreJobs
213+
}
214+
201215
nested := map[string]interface{}{}
202216

203217
// defensive error check: StoreJSON only fails if target is not an addressable struct,

engine/internal/rdsrefresh/dblab_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,70 @@ func TestDBLabClientUpdateSourceConfig(t *testing.T) {
191191
assert.Nil(t, receivedConfig.RDSIAMDBInstance)
192192
})
193193

194+
t.Run("successful with parallelism settings", func(t *testing.T) {
195+
var receivedConfig models.ConfigProjection
196+
197+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
198+
var nested map[string]interface{}
199+
err := json.NewDecoder(r.Body).Decode(&nested)
200+
require.NoError(t, err)
201+
202+
err = projection.LoadJSON(&receivedConfig, nested, projection.LoadOptions{
203+
Groups: []string{"default", "sensitive"},
204+
})
205+
require.NoError(t, err)
206+
207+
w.WriteHeader(http.StatusOK)
208+
}))
209+
defer server.Close()
210+
211+
client, err := NewDBLabClient(&DBLabConfig{APIEndpoint: server.URL, Token: "test-token"})
212+
require.NoError(t, err)
213+
214+
err = client.UpdateSourceConfig(context.Background(), SourceConfigUpdate{
215+
Host: "clone-host.rds.amazonaws.com", Port: 5432, DBName: "postgres",
216+
Username: "dbuser", Password: "dbpass",
217+
DumpParallelJobs: 4, RestoreParallelJobs: 8,
218+
})
219+
require.NoError(t, err)
220+
221+
require.NotNil(t, receivedConfig.DumpParallelJobs)
222+
assert.Equal(t, int64(4), *receivedConfig.DumpParallelJobs)
223+
require.NotNil(t, receivedConfig.RestoreParallelJobs)
224+
assert.Equal(t, int64(8), *receivedConfig.RestoreParallelJobs)
225+
})
226+
227+
t.Run("omits parallelism when zero", func(t *testing.T) {
228+
var receivedConfig models.ConfigProjection
229+
230+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
231+
var nested map[string]interface{}
232+
err := json.NewDecoder(r.Body).Decode(&nested)
233+
require.NoError(t, err)
234+
235+
err = projection.LoadJSON(&receivedConfig, nested, projection.LoadOptions{
236+
Groups: []string{"default", "sensitive"},
237+
})
238+
require.NoError(t, err)
239+
240+
w.WriteHeader(http.StatusOK)
241+
}))
242+
defer server.Close()
243+
244+
client, err := NewDBLabClient(&DBLabConfig{APIEndpoint: server.URL, Token: "test-token"})
245+
require.NoError(t, err)
246+
247+
err = client.UpdateSourceConfig(context.Background(), SourceConfigUpdate{
248+
Host: "host.rds.amazonaws.com", Port: 5432, DBName: "postgres",
249+
Username: "dbuser", Password: "dbpass",
250+
DumpParallelJobs: 0, RestoreParallelJobs: 0,
251+
})
252+
require.NoError(t, err)
253+
254+
assert.Nil(t, receivedConfig.DumpParallelJobs)
255+
assert.Nil(t, receivedConfig.RestoreParallelJobs)
256+
})
257+
194258
t.Run("error on non-2xx status", func(t *testing.T) {
195259
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
196260
w.WriteHeader(http.StatusBadRequest)
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
2026 © PostgresAI
3+
*/
4+
5+
package rdsrefresh
6+
7+
import (
8+
"fmt"
9+
"runtime"
10+
"strconv"
11+
"strings"
12+
13+
"gitlab.com/postgres-ai/database-lab/v3/pkg/log"
14+
)
15+
16+
const (
17+
// rdsInstanceClassPrefix is stripped to derive the instance size.
18+
rdsInstanceClassPrefix = "db."
19+
20+
// minParallelJobs is the minimum parallelism level.
21+
minParallelJobs = 1
22+
23+
// dumpParallelDivisor scales down vCPU count for dump parallelism.
24+
// pg_dump is I/O-bound against the remote RDS instance, so using half the vCPUs avoids over-subscribing.
25+
dumpParallelDivisor = 2
26+
)
27+
28+
// instanceSizeVCPUs maps AWS instance size suffixes to their vCPU count.
29+
// sizes micro and small only appear in t-family (burstable) RDS instances, both with 2 vCPUs.
30+
// from large upward, the mapping is consistent across instance families (m5, m6g, r5, r6g, c5, etc.).
31+
// "metal" is handled separately via metalFamilyVCPUs since its vCPU count depends on the family.
32+
//
33+
//nolint:mnd
34+
var instanceSizeVCPUs = map[string]int{
35+
"micro": 2,
36+
"small": 2,
37+
"medium": 2,
38+
"large": 2,
39+
"xlarge": 4,
40+
"2xlarge": 8,
41+
"3xlarge": 12,
42+
"4xlarge": 16,
43+
"6xlarge": 24,
44+
"8xlarge": 32,
45+
"9xlarge": 36,
46+
"10xlarge": 40,
47+
"12xlarge": 48,
48+
"16xlarge": 64,
49+
"18xlarge": 72,
50+
"24xlarge": 96,
51+
"32xlarge": 128,
52+
"48xlarge": 192,
53+
}
54+
55+
// metalFamilyVCPUs maps the base family (without storage suffix like "d") to metal vCPU counts.
56+
// storage variants (m5d, r6gd, etc.) share the same vCPU count as their base family.
57+
//
58+
//nolint:mnd
59+
var metalFamilyVCPUs = map[string]int{
60+
"m5": 96, "r5": 96, "c5": 96, "x1": 128,
61+
"m6g": 64, "r6g": 64, "c6g": 64, "x2g": 64,
62+
"m6i": 128, "r6i": 128, "c6i": 128,
63+
"m7g": 64, "r7g": 64, "c7g": 64,
64+
"m7i": 192, "r7i": 192, "c7i": 192,
65+
}
66+
67+
// ParallelismConfig holds the computed parallelism levels for dump and restore.
68+
type ParallelismConfig struct {
69+
DumpJobs int
70+
RestoreJobs int
71+
}
72+
73+
// ResolveParallelism determines the optimal parallelism levels for pg_dump and pg_restore.
74+
// dump parallelism is half the vCPU count of the RDS clone instance (I/O-bound, conservative).
75+
// restore parallelism is the full local CPU count (CPU-bound for index rebuilds).
76+
// note: dump parallelism > 1 is incompatible with immediateRestore mode.
77+
// when immediateRestore is enabled, the caller should ignore DumpJobs or cap it at 1.
78+
func ResolveParallelism(cfg *Config) (*ParallelismConfig, error) {
79+
vcpus, err := resolveRDSInstanceVCPUs(cfg.RDSClone.InstanceClass)
80+
if err != nil {
81+
return nil, fmt.Errorf("failed to resolve RDS instance vCPUs: %w", err)
82+
}
83+
84+
dumpJobs := vcpus / dumpParallelDivisor
85+
if dumpJobs < minParallelJobs {
86+
dumpJobs = minParallelJobs
87+
}
88+
89+
restoreJobs := resolveLocalVCPUs()
90+
91+
log.Msg("auto-parallelism: dump jobs =", dumpJobs, "(RDS clone vCPUs/2), restore jobs =", restoreJobs, "(local vCPUs)")
92+
93+
return &ParallelismConfig{
94+
DumpJobs: dumpJobs,
95+
RestoreJobs: restoreJobs,
96+
}, nil
97+
}
98+
99+
// resolveRDSInstanceVCPUs estimates the vCPU count for the given RDS instance class
100+
// by parsing the instance size suffix (e.g. "xlarge" from "db.m5.xlarge").
101+
// the mapping covers standard AWS size naming used across RDS instance families.
102+
// for "metal" instances, the family is used to look up family-specific vCPU counts.
103+
// if the size is not recognized, it attempts to parse a numeric multiplier prefix
104+
// (e.g. "2xlarge" → 8 vCPUs).
105+
func resolveRDSInstanceVCPUs(instanceClass string) (int, error) {
106+
family, size, err := parseInstanceClass(instanceClass)
107+
if err != nil {
108+
return 0, err
109+
}
110+
111+
if size == "metal" {
112+
return resolveMetalVCPUs(family, instanceClass)
113+
}
114+
115+
if vcpus, ok := instanceSizeVCPUs[size]; ok {
116+
return vcpus, nil
117+
}
118+
119+
// handle unlisted NUMxlarge sizes by parsing the multiplier
120+
vcpus, err := parseXlargeMultiplier(size)
121+
if err != nil {
122+
return 0, fmt.Errorf("unknown instance size %q in class %q", size, instanceClass)
123+
}
124+
125+
return vcpus, nil
126+
}
127+
128+
// parseInstanceClass extracts the family and size from an RDS instance class.
129+
// for example, "db.m5.xlarge" → ("m5", "xlarge"), "db.r6gd.metal" → ("r6gd", "metal").
130+
func parseInstanceClass(instanceClass string) (string, string, error) {
131+
if !strings.HasPrefix(instanceClass, rdsInstanceClassPrefix) {
132+
return "", "", fmt.Errorf("invalid RDS instance class %q: expected %q prefix", instanceClass, rdsInstanceClassPrefix)
133+
}
134+
135+
withoutPrefix := strings.TrimPrefix(instanceClass, rdsInstanceClassPrefix)
136+
137+
// format is "family.size", e.g. "m5.xlarge" or "r6g.2xlarge"
138+
parts := strings.SplitN(withoutPrefix, ".", 2)
139+
140+
const expectedParts = 2
141+
if len(parts) != expectedParts || parts[1] == "" {
142+
return "", "", fmt.Errorf("invalid RDS instance class %q: expected format db.<family>.<size>", instanceClass)
143+
}
144+
145+
return parts[0], parts[1], nil
146+
}
147+
148+
// resolveMetalVCPUs looks up the vCPU count for a metal instance by family.
149+
// storage variants (e.g. "m5d", "r6gd") are mapped to their base family ("m5", "r6g").
150+
func resolveMetalVCPUs(family, instanceClass string) (int, error) {
151+
if vcpus, ok := metalFamilyVCPUs[family]; ok {
152+
return vcpus, nil
153+
}
154+
155+
// strip trailing "d" storage suffix and retry (e.g. "m5d" → "m5", "r6gd" → "r6g")
156+
base := strings.TrimSuffix(family, "d")
157+
if base != family {
158+
if vcpus, ok := metalFamilyVCPUs[base]; ok {
159+
return vcpus, nil
160+
}
161+
}
162+
163+
return 0, fmt.Errorf("unknown metal instance family %q in class %q", family, instanceClass)
164+
}
165+
166+
// parseXlargeMultiplier handles NUMxlarge patterns not in the static map.
167+
// for example, "5xlarge" → 5 * 4 = 20 vCPUs.
168+
func parseXlargeMultiplier(size string) (int, error) {
169+
idx := strings.Index(size, "xlarge")
170+
if idx <= 0 {
171+
return 0, fmt.Errorf("not an xlarge variant: %q", size)
172+
}
173+
174+
multiplier, err := strconv.Atoi(size[:idx])
175+
if err != nil {
176+
return 0, fmt.Errorf("invalid multiplier in %q: %w", size, err)
177+
}
178+
179+
const vcpusPerXlarge = 4
180+
181+
return multiplier * vcpusPerXlarge, nil
182+
}
183+
184+
// resolveLocalVCPUs returns the number of logical CPUs available on the local machine.
185+
// uses runtime.NumCPU() which reads from /proc/cpuinfo on Linux
186+
// (the target platform for DBLab Engine).
187+
func resolveLocalVCPUs() int {
188+
cpus := runtime.NumCPU()
189+
if cpus < minParallelJobs {
190+
return minParallelJobs
191+
}
192+
193+
return cpus
194+
}

0 commit comments

Comments
 (0)