Skip to content

Commit a0ccabb

Browse files
CementZhangRader
authored andcommitted
feat: 适配前端PD 分离部署逻辑
1 parent ef0d779 commit a0ccabb

7 files changed

Lines changed: 285 additions & 8 deletions

File tree

builder/deploy/deployer_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,3 +1250,131 @@ func TestDeployer_Wakeup(t *testing.T) {
12501250
time.Sleep(100 * time.Millisecond)
12511251
})
12521252
}
1253+
1254+
// TestDeployer_serverlessDeploy_PD verifies that PD config is correctly
1255+
// propagated from DeployRequest to the database Deploy record in
1256+
// serverlessDeploy. Before the fix, field shadowing caused dr.PD to always
1257+
// be nil even when DeployExtend.PD was set.
1258+
func TestDeployer_serverlessDeploy_PD(t *testing.T) {
1259+
t.Run("deploy model with PD", func(t *testing.T) {
1260+
var oldDeploy database.Deploy
1261+
oldDeploy.ID = 1
1262+
1263+
pdConfig := &types.PDConfig{
1264+
Enabled: true,
1265+
PrefillReplicas: 2,
1266+
DecodeReplicas: 2,
1267+
Prefill: &types.PDRoleRuntimeConfig{
1268+
TP: 2, EP: 1, DP: 1, TotalGPUs: 2,
1269+
},
1270+
Decode: &types.PDRoleRuntimeConfig{
1271+
TP: 2, EP: 1, DP: 1, TotalGPUs: 2,
1272+
},
1273+
}
1274+
1275+
dr := types.DeployRequest{
1276+
RepoID: 1,
1277+
Type: types.InferenceType,
1278+
UserUUID: "1",
1279+
SKU: "1",
1280+
DeployExtend: types.DeployExtend{
1281+
PD: pdConfig,
1282+
},
1283+
}
1284+
1285+
newDeploy := oldDeploy
1286+
newDeploy.UserUUID = dr.UserUUID
1287+
newDeploy.SKU = dr.SKU
1288+
newDeploy.PD = dr.PD
1289+
1290+
mockTaskStore := mockdb.NewMockDeployTaskStore(t)
1291+
mockTaskStore.EXPECT().GetServerlessDeployByRepID(mock.Anything, dr.RepoID).Return(&oldDeploy, nil)
1292+
mockTaskStore.EXPECT().UpdateDeploy(mock.Anything, &newDeploy).Return(nil)
1293+
1294+
d := &deployer{
1295+
deployTaskStore: mockTaskStore,
1296+
}
1297+
dbdeploy, err := d.serverlessDeploy(context.TODO(), dr)
1298+
require.Nil(t, err)
1299+
require.NotNil(t, dbdeploy.PD)
1300+
require.True(t, dbdeploy.PD.Enabled)
1301+
require.Equal(t, 2, dbdeploy.PD.PrefillReplicas)
1302+
require.Equal(t, 2, dbdeploy.PD.DecodeReplicas)
1303+
require.Same(t, pdConfig, dbdeploy.PD)
1304+
})
1305+
1306+
t.Run("deploy model without PD (nil)", func(t *testing.T) {
1307+
var oldDeploy database.Deploy
1308+
oldDeploy.ID = 1
1309+
1310+
dr := types.DeployRequest{
1311+
RepoID: 1,
1312+
Type: types.InferenceType,
1313+
UserUUID: "1",
1314+
SKU: "1",
1315+
}
1316+
1317+
newDeploy := oldDeploy
1318+
newDeploy.UserUUID = dr.UserUUID
1319+
newDeploy.SKU = dr.SKU
1320+
newDeploy.PD = nil
1321+
1322+
mockTaskStore := mockdb.NewMockDeployTaskStore(t)
1323+
mockTaskStore.EXPECT().GetServerlessDeployByRepID(mock.Anything, dr.RepoID).Return(&oldDeploy, nil)
1324+
mockTaskStore.EXPECT().UpdateDeploy(mock.Anything, &newDeploy).Return(nil)
1325+
1326+
d := &deployer{
1327+
deployTaskStore: mockTaskStore,
1328+
}
1329+
dbdeploy, err := d.serverlessDeploy(context.TODO(), dr)
1330+
require.Nil(t, err)
1331+
require.Nil(t, dbdeploy.PD)
1332+
})
1333+
}
1334+
1335+
// TestDeployer_dedicatedDeploy_PD verifies that PD config is correctly
1336+
// propagated from DeployRequest to the database Deploy record in
1337+
// dedicatedDeploy.
1338+
func TestDeployer_dedicatedDeploy_PD(t *testing.T) {
1339+
pdConfig := &types.PDConfig{
1340+
Enabled: true,
1341+
PrefillReplicas: 1,
1342+
DecodeReplicas: 1,
1343+
Prefill: &types.PDRoleRuntimeConfig{
1344+
TP: 2, EP: 2, DP: 1, TotalGPUs: 2,
1345+
},
1346+
Decode: &types.PDRoleRuntimeConfig{
1347+
TP: 2, EP: 2, DP: 1, TotalGPUs: 2,
1348+
},
1349+
}
1350+
1351+
dr := types.DeployRequest{
1352+
Path: "namespace/name",
1353+
Type: types.InferenceType,
1354+
DeployExtend: types.DeployExtend{
1355+
PD: pdConfig,
1356+
},
1357+
}
1358+
1359+
var capturedDeploy *database.Deploy
1360+
mockTaskStore := mockdb.NewMockDeployTaskStore(t)
1361+
mockTaskStore.EXPECT().CreateDeploy(mock.Anything, mock.MatchedBy(func(d *database.Deploy) bool {
1362+
capturedDeploy = d
1363+
return true
1364+
})).Return(nil)
1365+
1366+
node, _ := snowflake.NewNode(1)
1367+
d := &deployer{
1368+
snowflakeNode: node,
1369+
deployTaskStore: mockTaskStore,
1370+
}
1371+
1372+
_, err := d.dedicatedDeploy(context.TODO(), dr)
1373+
require.Nil(t, err)
1374+
require.NotNil(t, capturedDeploy)
1375+
require.NotNil(t, capturedDeploy.PD)
1376+
require.True(t, capturedDeploy.PD.Enabled)
1377+
require.Equal(t, 1, capturedDeploy.PD.PrefillReplicas)
1378+
require.Equal(t, 1, capturedDeploy.PD.DecodeReplicas)
1379+
require.Same(t, pdConfig, capturedDeploy.PD)
1380+
}

common/types/deploy_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,3 +241,76 @@ func TestSVCRequest_PDThroughDeployExtend(t *testing.T) {
241241
require.Equal(t, 2, req.PD.PrefillReplicas)
242242
require.Equal(t, 2, req.PD.DecodeReplicas)
243243
}
244+
245+
// TestDeployRequest_PDNoShadow verifies that DeployRequest.PD (from embedded
246+
// DeployExtend) is not shadowed by an explicit PD field. Before the fix,
247+
// DeployRequest had both an explicit PD field and an embedded DeployExtend.PD,
248+
// causing field shadowing: setting DeployExtend.PD was invisible when reading
249+
// DeployRequest.PD. This test ensures they are the same field.
250+
func TestDeployRequest_PDNoShadow(t *testing.T) {
251+
pdConfig := &PDConfig{
252+
Enabled: true,
253+
PrefillReplicas: 2,
254+
DecodeReplicas: 3,
255+
Prefill: &PDRoleRuntimeConfig{
256+
TP: 2, EP: 1, DP: 1, TotalGPUs: 2,
257+
},
258+
Decode: &PDRoleRuntimeConfig{
259+
TP: 2, EP: 1, DP: 1, TotalGPUs: 2,
260+
},
261+
}
262+
263+
dr := DeployRequest{}
264+
// Set PD via DeployExtend (the embedded field)
265+
dr.DeployExtend.PD = pdConfig
266+
267+
// Reading dr.PD should return the same pointer (no shadowing)
268+
require.NotNil(t, dr.PD)
269+
require.Same(t, pdConfig, dr.PD)
270+
require.True(t, dr.PD.Enabled)
271+
require.Equal(t, 2, dr.PD.PrefillReplicas)
272+
require.Equal(t, 3, dr.PD.DecodeReplicas)
273+
274+
// Setting dr.PD should also set DeployExtend.PD (same field)
275+
dr.PD.Enabled = false
276+
require.False(t, dr.DeployExtend.PD.Enabled)
277+
}
278+
279+
// TestDeployRequest_PDJSONBinding verifies that JSON unmarshaling correctly
280+
// populates DeployRequest.PD through the embedded DeployExtend field.
281+
func TestDeployRequest_PDJSONBinding(t *testing.T) {
282+
jsonStr := `{
283+
"deploy_name": "test-deploy",
284+
"pd": {
285+
"enabled": true,
286+
"prefill_replicas": 2,
287+
"decode_replicas": 2,
288+
"prefill": {"tp": 2, "ep": 1, "dp": 1, "total_gpus": 2},
289+
"decode": {"tp": 2, "ep": 1, "dp": 1, "total_gpus": 2}
290+
}
291+
}`
292+
293+
var dr DeployRequest
294+
err := json.Unmarshal([]byte(jsonStr), &dr)
295+
require.NoError(t, err)
296+
require.Equal(t, "test-deploy", dr.DeployName)
297+
298+
// PD should be populated via embedded DeployExtend
299+
require.NotNil(t, dr.PD)
300+
require.True(t, dr.PD.Enabled)
301+
require.Equal(t, 2, dr.PD.PrefillReplicas)
302+
require.Equal(t, 2, dr.PD.DecodeReplicas)
303+
require.NotNil(t, dr.PD.Prefill)
304+
require.Equal(t, 2, dr.PD.Prefill.TP)
305+
require.NotNil(t, dr.PD.Decode)
306+
require.Equal(t, 2, dr.PD.Decode.TP)
307+
308+
// Verify JSON marshaling round-trips correctly
309+
out, err := json.Marshal(dr)
310+
require.NoError(t, err)
311+
var dr2 DeployRequest
312+
err = json.Unmarshal(out, &dr2)
313+
require.NoError(t, err)
314+
require.NotNil(t, dr2.PD)
315+
require.True(t, dr2.PD.Enabled)
316+
}

common/types/model.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,10 @@ type InstanceRunReq struct {
308308
Revision string `json:"revision"`
309309
OrderDetailID int64 `json:"order_detail_id"`
310310
EngineArgs string `json:"engine_args"`
311+
// EnablePD enables PD (Prefill-Decode) disaggregation inference architecture.
312+
// When true, the system checks the model metadata for PD recommendation,
313+
// validates hardware resources, and splits resources between prefill and decode.
314+
EnablePD bool `json:"enable_pd"`
311315
// OwnerNamespace is optional. If set, the finetune is created under this namespace (user or org); path {namespace} remains the model's owner.
312316
OwnerNamespace string `json:"owner_namespace,omitempty"`
313317
}

common/types/repo.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,15 @@ type Repository struct {
168168
}
169169

170170
type Metadata struct {
171-
ModelParams float32 `json:"model_params"`
172-
TensorType string `json:"tensor_type"`
173-
Architecture string `json:"architecture"`
174-
MiniGPUMemoryGB float32 `json:"mini_gpu_memory_gb"`
175-
MiniGPUFinetuneGB float32 `json:"mini_gpu_finetune_gb"`
176-
ModelType string `json:"model_type"`
177-
ClassName string `json:"class_name"`
178-
Quantizations []Quantization `json:"quantizations,omitempty"`
171+
ModelParams float32 `json:"model_params"`
172+
TensorType string `json:"tensor_type"`
173+
Architecture string `json:"architecture"`
174+
MiniGPUMemoryGB float32 `json:"mini_gpu_memory_gb"`
175+
MiniGPUFinetuneGB float32 `json:"mini_gpu_finetune_gb"`
176+
ModelType string `json:"model_type"`
177+
ClassName string `json:"class_name"`
178+
Quantizations []Quantization `json:"quantizations,omitempty"`
179+
PDRecommendation *PDRecommendation `json:"pd_recommendation,omitempty"`
179180
}
180181

181182
type RepoPageOpts struct {

component/model.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,7 @@ func (c *modelComponentImpl) Show(ctx context.Context, namespace, name, currentU
569569
ClassName: model.Repository.Metadata.ClassName,
570570
Quantizations: model.Repository.Metadata.Quantizations,
571571
MiniGPUFinetuneGB: model.Repository.Metadata.MiniGPUFinetuneGB,
572+
PDRecommendation: model.Repository.Metadata.PDRecommendation,
572573
},
573574

574575
MultiSource: types.MultiSource{

component/repo_deploy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,7 @@ func (c *repoComponentImpl) DeployDetail(ctx context.Context, detailReq types.De
575575
UserUUID: deploy.UserUUID,
576576
OwnerNamespace: deploy.OwnerNamespace,
577577
}
578+
resDeploy.PD = deploy.PD
578579

579580
return &resDeploy, nil
580581
}

component/repo_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,6 +1430,75 @@ func TestRepoComponent_DeployDetail(t *testing.T) {
14301430

14311431
}
14321432

1433+
func TestRepoComponent_DeployDetailWithPD(t *testing.T) {
1434+
ctx := context.TODO()
1435+
repo := initializeTestRepoComponent(ctx, t)
1436+
mockUserRepoAdminPermission(ctx, repo.mocks.stores, "user")
1437+
1438+
pdConfig := &types.PDConfig{
1439+
Enabled: true,
1440+
PrefillReplicas: 2,
1441+
DecodeReplicas: 2,
1442+
Prefill: &types.PDRoleRuntimeConfig{
1443+
TP: 2, EP: 1, DP: 1, TotalGPUs: 2,
1444+
},
1445+
Decode: &types.PDRoleRuntimeConfig{
1446+
TP: 2, EP: 1, DP: 1, TotalGPUs: 2,
1447+
},
1448+
}
1449+
1450+
repo.mocks.stores.ClusterInfoMock().EXPECT().ByClusterID(ctx, "cluster").Return(database.ClusterInfo{
1451+
Zone: "z",
1452+
}, nil)
1453+
repo.mocks.stores.DeployTaskMock().EXPECT().GetDeployByID(ctx, int64(1)).Return(&database.Deploy{
1454+
RepoID: 1,
1455+
UserUUID: "uuid",
1456+
OrderDetailID: 11,
1457+
ClusterID: "cluster",
1458+
SvcName: "svc",
1459+
Status: deployStatus.Running,
1460+
PD: pdConfig,
1461+
}, nil)
1462+
1463+
repo.mocks.deployer.EXPECT().GetReplica(ctx, types.DeployRequest{
1464+
Namespace: "ns",
1465+
Name: "n",
1466+
ClusterID: "cluster",
1467+
SvcName: "svc",
1468+
}).Return(1, 2, []types.Instance{{Name: "i1"}}, nil)
1469+
1470+
repo.mocks.deployer.EXPECT().Status(ctx, types.DeployRequest{
1471+
DeployID: 0,
1472+
SpaceID: 0,
1473+
ModelID: 0,
1474+
Namespace: "ns",
1475+
Name: "n",
1476+
SvcName: "svc",
1477+
ClusterID: "cluster",
1478+
}, false).Return("svc", 23, nil, nil)
1479+
1480+
dp, err := repo.DeployDetail(ctx, types.DeployActReq{
1481+
RepoType: types.ModelRepo,
1482+
Namespace: "ns",
1483+
Name: "n",
1484+
CurrentUser: "user",
1485+
DeployID: 1,
1486+
DeployType: 2,
1487+
InstanceName: "i1",
1488+
})
1489+
require.Nil(t, err)
1490+
1491+
// PD config should be populated in the response
1492+
require.NotNil(t, dp.PD)
1493+
require.True(t, dp.PD.Enabled)
1494+
require.Equal(t, 2, dp.PD.PrefillReplicas)
1495+
require.Equal(t, 2, dp.PD.DecodeReplicas)
1496+
require.NotNil(t, dp.PD.Prefill)
1497+
require.Equal(t, 2, dp.PD.Prefill.TP)
1498+
require.NotNil(t, dp.PD.Decode)
1499+
require.Equal(t, 2, dp.PD.Decode.TP)
1500+
}
1501+
14331502
func TestRepoComponent_DeployInstanceLogs(t *testing.T) {
14341503
ctx := context.TODO()
14351504
repo := initializeTestRepoComponent(ctx, t)

0 commit comments

Comments
 (0)