Skip to content

Commit f8a952b

Browse files
committed
Removed fromTask field in PartialData
1 parent fa34ee4 commit f8a952b

9 files changed

Lines changed: 26 additions & 30 deletions

File tree

internal/test/partial_data_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@ func TestPartialDataMarshaling(t *testing.T) {
1616
data["num"] = 2
1717
data["list"] = []string{"uno", "due", "tre"}
1818
partialData := workflow.PartialData{
19-
ReqId: workflow.ReqId("abc"),
20-
ForTask: "fai13p102",
21-
FromTask: "120e8d12d",
22-
Data: data,
19+
ReqId: workflow.ReqId("abc"),
20+
ForTask: "fai13p102",
21+
Data: data,
2322
}
2423
marshal, errMarshal := json.Marshal(partialData)
2524
u.AssertNilMsg(t, errMarshal, "error during marshaling")
@@ -104,9 +103,8 @@ func TestPartialDataCache(t *testing.T) {
104103

105104
func initPartialData(reqId workflow.ReqId, to, from workflow.TaskId, data map[string]interface{}) *workflow.PartialData {
106105
return &workflow.PartialData{
107-
ReqId: reqId,
108-
ForTask: to,
109-
FromTask: from,
110-
Data: data,
106+
ReqId: reqId,
107+
ForTask: to,
108+
Data: data,
111109
}
112110
}

internal/test/workflow_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ func TestResumeWorkflow(t *testing.T) {
601601
request := workflow.NewRequest(shortuuid.New(), wflow, params)
602602

603603
progress := workflow.InitProgress(workflow.ReqId(request.Id), wflow)
604-
pd := workflow.NewPartialData(workflow.ReqId(request.Id), wflow.Start.Id, "", request.Params)
604+
pd := workflow.NewPartialData(workflow.ReqId(request.Id), wflow.Start.Id, request.Params)
605605

606606
err = workflow.SaveProgress(progress, true)
607607
u.AssertNil(t, err)

internal/workflow/choice_task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (c *ChoiceTask) AddOutput(workflow *Workflow, taskId TaskId) error {
6262

6363
func (c *ChoiceTask) execute(progress *Progress, input *PartialData, r *Request) (*PartialData, *Progress, bool, error) {
6464

65-
outputData := NewPartialData(ReqId(r.Id), "", c.GetId(), nil) // partial initialization of outputData
65+
outputData := NewPartialData(ReqId(r.Id), c.GetId(), nil) // partial initialization of outputData
6666

6767
// NOTE: we do not call task.CheckInput() as this task has no signature to match against
6868

internal/workflow/fail_task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (f *FailureTask) execute(progress *Progress, r *Request) (*PartialData, *Pr
4444

4545
output := make(map[string]interface{})
4646
output[f.Error] = f.Cause
47-
outputData := NewPartialData(ReqId(r.Id), f.GetNext()[0], f.GetId(), output)
47+
outputData := NewPartialData(ReqId(r.Id), f.GetNext()[0], output)
4848

4949
progress.Complete(f.GetId())
5050

internal/workflow/fanin_task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func NewFanInTask(fanInDegree int) *FanInTask {
2626
}
2727

2828
func (f *FanInTask) execute(progress *Progress, input *PartialData, r *Request) (*PartialData, *Progress, bool, error) {
29-
outputData := NewPartialData(ReqId(r.Id), f.GetNext()[0], f.GetId(), input.Data)
29+
outputData := NewPartialData(ReqId(r.Id), f.GetNext()[0], input.Data)
3030
progress.Complete(f.GetId())
3131
err := progress.AddReadyTask(f.GetNext()[0])
3232
if err != nil {

internal/workflow/fanout_task.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ func (f *FanOutTask) execute(progress *Progress, input *PartialData, r *Request)
9393
* case with Data field which contains a map[string]interface{} with the key set
9494
* to taskId and the value which is also a map[string]interface{} containing the
9595
* effective input for the nth-parallel task */
96-
outputData := NewPartialData(ReqId(r.Id), "", f.GetId(), output)
96+
// TODO: fix this
97+
outputData := NewPartialData(ReqId(r.Id), "", output)
9798
//newOutputDataMap := make(map[string]interface{}) // TODO: consider using a map of PartialData rather than a single PartialData object
9899

99100
progress.Complete(f.GetId())

internal/workflow/partial_data.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ func newPartialDataId(reqId ReqId) PartialDataId {
2424

2525
// PartialData is saved separately from progressData to avoid cluttering the Progress struct and each Serverledge node's cache
2626
type PartialData struct {
27-
ReqId ReqId // request referring to this partial data
28-
ForTask TaskId // task that should receive this partial data
29-
FromTask TaskId // useful for fanIn
30-
Data map[string]interface{}
27+
ReqId ReqId // request referring to this partial data
28+
ForTask TaskId // task that should receive this partial data
29+
Data map[string]interface{}
3130
}
3231

3332
var pdCache = sync.Map{}
@@ -47,24 +46,22 @@ func (pd PartialData) Equals(pd2 *PartialData) bool {
4746
}
4847
}
4948

50-
return pd.ReqId == pd2.ReqId && pd.FromTask == pd2.FromTask && pd.ForTask == pd2.ForTask
49+
return pd.ReqId == pd2.ReqId && pd.ForTask == pd2.ForTask
5150
}
5251

5352
func (pd PartialData) String() string {
5453
return fmt.Sprintf(`PartialData{
5554
Id: %s,
5655
ForTask: %s,
57-
FromTask: %s,
5856
Data: %v,
59-
}`, pd.ReqId, pd.ForTask, pd.FromTask, pd.Data)
57+
}`, pd.ReqId, pd.ForTask, pd.Data)
6058
}
6159

62-
func NewPartialData(reqId ReqId, forTask TaskId, fromTask TaskId, data map[string]interface{}) *PartialData {
60+
func NewPartialData(reqId ReqId, forTask TaskId, data map[string]interface{}) *PartialData {
6361
return &PartialData{
64-
ReqId: reqId,
65-
ForTask: forTask,
66-
FromTask: fromTask,
67-
Data: data,
62+
ReqId: reqId,
63+
ForTask: forTask,
64+
Data: data,
6865
}
6966
}
7067

internal/workflow/simple_task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (s *SimpleTask) execute(progress *Progress, input *PartialData, r *Request)
4141
}
4242

4343
nextTask := s.GetNext()[0]
44-
outputData := NewPartialData(ReqId(r.Id), nextTask, s.Id, output)
44+
outputData := NewPartialData(ReqId(r.Id), nextTask, output)
4545

4646
progress.Complete(s.Id)
4747
err = progress.AddReadyTask(nextTask)

internal/workflow/workflow.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,16 +232,16 @@ func (workflow *Workflow) executeParallel(progress *Progress, input *PartialData
232232
progress.Complete(parallelTasks[i].GetId())
233233
}
234234

235-
outputData := NewPartialData(ReqId(r.Id), "", "", outputMap) // partial initialization of outputData
236-
outputData.ForTask = parallelTasks[0].GetNext()[0] // TODO: we are assuming that the next task is unique for all the parallel tasks (i.e. a FanIn)
235+
outputData := NewPartialData(ReqId(r.Id), "", outputMap) // partial initialization of outputData
236+
outputData.ForTask = parallelTasks[0].GetNext()[0] // TODO: we are assuming that the next task is unique for all the parallel tasks (i.e. a FanIn)
237237
progress.AddReadyTask(parallelTasks[0].GetNext()[0])
238238
return outputData, progress, nil
239239
}
240240

241241
func (workflow *Workflow) doNothingExec(progress *Progress, input *PartialData, task Task, r *Request) (*PartialData, *Progress, bool, error) {
242242

243243
output := input.Data
244-
outputData := NewPartialData(ReqId(r.Id), task.GetNext()[0], task.GetId(), output)
244+
outputData := NewPartialData(ReqId(r.Id), task.GetNext()[0], output)
245245

246246
progress.Complete(task.GetId())
247247

@@ -436,7 +436,7 @@ func (workflow *Workflow) Invoke(r *Request) (ExecutionReport, error) {
436436
// TODO: move into a function?
437437
if !r.Resuming {
438438
progress = InitProgress(requestId, workflow)
439-
pd = NewPartialData(requestId, workflow.Start.Id, "", r.Params)
439+
pd = NewPartialData(requestId, workflow.Start.Id, r.Params)
440440
} else {
441441
var found bool
442442
progress, found = RetrieveProgress(requestId, true)

0 commit comments

Comments
 (0)