Skip to content

Commit 8f49c72

Browse files
committed
Removes ReqId field in PartialData
1 parent 0a34056 commit 8f49c72

4 files changed

Lines changed: 19 additions & 23 deletions

File tree

internal/test/partial_data_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ func TestPartialDataMarshaling(t *testing.T) {
1414
data["num"] = 2
1515
data["list"] = []string{"uno", "due", "tre"}
1616
partialData := workflow.PartialData{
17-
ReqId: workflow.ReqId("abc"),
18-
ForTask: "fai13p102",
19-
Data: data,
17+
Task: "fai13p102",
18+
Data: data,
2019
}
2120
marshal, errMarshal := json.Marshal(partialData)
2221
u.AssertNilMsg(t, errMarshal, "error during marshaling")

internal/test/workflow_integration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,11 +513,11 @@ func TestResumeWorkflow(t *testing.T) {
513513
request := workflow.NewRequest(shortuuid.New(), wflow, params)
514514

515515
progress := workflow.InitProgress(workflow.ReqId(request.Id), wflow)
516-
pd := workflow.NewPartialData(workflow.ReqId(request.Id), wflow.Start.Id, request.Params)
516+
pd := workflow.NewPartialData(wflow.Start.Id, request.Params)
517517

518518
err = workflow.SaveProgress(progress)
519519
u.AssertNil(t, err)
520-
err = workflow.SavePartialData(pd)
520+
err = workflow.SavePartialData(pd, workflow.ReqId(request.Id))
521521
u.AssertNil(t, err)
522522

523523
resumedRequest := workflow.NewRequest(request.Id, wflow, params)

internal/workflow/partial_data.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@ import (
1313

1414
// PartialData is saved separately from progressData to avoid cluttering the Progress struct and each Serverledge node's cache
1515
type PartialData struct {
16-
ReqId ReqId // request referring to this partial data
17-
ForTask TaskId // task that should receive this partial data
18-
Data map[string]interface{}
16+
Task TaskId
17+
Data map[string]interface{}
1918
}
2019

2120
func (pd PartialData) Equals(pd2 *PartialData) bool {
@@ -33,30 +32,28 @@ func (pd PartialData) Equals(pd2 *PartialData) bool {
3332
}
3433
}
3534

36-
return pd.ReqId == pd2.ReqId && pd.ForTask == pd2.ForTask
35+
return pd.Task == pd2.Task
3736
}
3837

3938
func (pd PartialData) String() string {
4039
return fmt.Sprintf(`PartialData{
41-
Id: %s,
42-
ForTask: %s,
40+
Task: %s,
4341
Data: %v,
44-
}`, pd.ReqId, pd.ForTask, pd.Data)
42+
}`, pd.Task, pd.Data)
4543
}
4644

47-
func NewPartialData(reqId ReqId, forTask TaskId, data map[string]interface{}) *PartialData {
45+
func NewPartialData(task TaskId, data map[string]interface{}) *PartialData {
4846
return &PartialData{
49-
ReqId: reqId,
50-
ForTask: forTask,
51-
Data: data,
47+
Task: task,
48+
Data: data,
5249
}
5350
}
5451

5552
func getPartialDataEtcdKey(reqId ReqId, nodeId TaskId) string {
5653
return fmt.Sprintf("/partialData/%s/%s", reqId, nodeId)
5754
}
5855

59-
func SavePartialData(pd *PartialData) error {
56+
func SavePartialData(pd *PartialData, reqId ReqId) error {
6057
cli, err := utils.GetEtcdClient()
6158
if err != nil {
6259
return err
@@ -68,7 +65,7 @@ func SavePartialData(pd *PartialData) error {
6865
return fmt.Errorf("could not marshal progress: %v", err)
6966
}
7067
// saves the json object into etcd
71-
key := getPartialDataEtcdKey(pd.ReqId, pd.ForTask)
68+
key := getPartialDataEtcdKey(reqId, pd.Task)
7269
log.Printf("Saving PD on etcd with key: %s\n", key)
7370

7471
_, err = cli.Put(ctx, key, string(payload))

internal/workflow/workflow.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func (workflow *Workflow) Execute(r *Request, input *PartialData, progress *Prog
214214
progress.Complete(task.GetId())
215215

216216
nextTask := task.GetNext()
217-
outputData = NewPartialData(ReqId(r.Id), nextTask, output)
217+
outputData = NewPartialData(nextTask, output)
218218
if workflow.IsTaskEligibleForExecution(nextTask, progress) {
219219
progress.ReadyToExecute = append(progress.ReadyToExecute, nextTask)
220220
}
@@ -245,7 +245,7 @@ func (workflow *Workflow) Execute(r *Request, input *PartialData, progress *Prog
245245
}
246246
progress.Complete(task.GetId())
247247

248-
outputData = NewPartialData(ReqId(r.Id), nextTaskId, input.Data)
248+
outputData = NewPartialData(nextTaskId, input.Data)
249249
if workflow.IsTaskEligibleForExecution(nextTaskId, progress) {
250250
progress.ReadyToExecute = append(progress.ReadyToExecute, nextTaskId)
251251
}
@@ -262,7 +262,7 @@ func (workflow *Workflow) Execute(r *Request, input *PartialData, progress *Prog
262262
if err != nil {
263263
return nil, progress, false, err
264264
}
265-
err = SavePartialData(input)
265+
err = SavePartialData(input, ReqId(r.Id))
266266
if err != nil {
267267
return nil, progress, false, err
268268
}
@@ -398,7 +398,7 @@ func (workflow *Workflow) Invoke(r *Request) error {
398398

399399
if !r.Resuming {
400400
progress = InitProgress(requestId, workflow)
401-
pd = NewPartialData(requestId, workflow.Start.Id, r.Params)
401+
pd = NewPartialData(workflow.Start.Id, r.Params)
402402
} else {
403403
progress, err = RetrieveProgress(requestId)
404404
if err != nil {
@@ -430,7 +430,7 @@ func (workflow *Workflow) Invoke(r *Request) error {
430430
if err != nil {
431431
return fmt.Errorf("Could not save progress: %v", err)
432432
}
433-
err = SavePartialData(pd)
433+
err = SavePartialData(pd, ReqId(r.Id))
434434
if err != nil {
435435
return fmt.Errorf("Could not save partial data: %v", err)
436436
}

0 commit comments

Comments
 (0)