Skip to content

Commit 1b4deb7

Browse files
grussorussomatnar
andcommitted
Saving FunctionTask inputs to file for profiling
Co-authored-by: Matteo Nardelli <matnar@gmail.com>
1 parent ccb2b87 commit 1b4deb7

3 files changed

Lines changed: 29 additions & 28 deletions

File tree

internal/config/keys.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,5 +111,8 @@ const WORKFLOW_THRESHOLD_BASED_POLICY_THRESHOLD = "workflow.offloading.policy.th
111111
// Max number of tasks offloaded at once in the threshold-based offloading policy
112112
const WORKFLOW_THRESHOLD_BASED_POLICY_MAX_OFFLOADED = "workflow.offloading.policy.threshold.offloaded.max"
113113

114+
// For profiling: save function input to file
115+
const WORKFLOW_SAVE_FUNCTION_INPUT_TO_FILE = "workflow.profiling.saveinput"
116+
114117
const LOAD_BALANCER_POLICY = "loadbalancer.policy"
115118
const LOAD_BALANCER_TARGET_UPDATE_INTERVAL = "loadbalancer.targets.interval"

internal/function/signature.go

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ func (s *Signature) CheckOrMatchInputs(inputMap map[string]interface{}) error {
190190
err := def.CheckInput(inputMap)
191191

192192
if err != nil && len(s.Inputs) == 1 {
193+
// TODO: Consider a better solution. If there is a single input parameter, we just try to match type
193194
key, ok := def.FindEntryThatTypeChecks(inputMap)
194195
if ok {
195196
val := inputMap[key]
@@ -295,30 +296,3 @@ func datatypeToString(dataType DataTypeEnum) string {
295296
return ""
296297
}
297298
}
298-
299-
// SignatureInference is a best-effort function that tries to infer signature from a function without a defined signature. Maybe we do not need it.
300-
func SignatureInference(params map[string]interface{}) *Signature {
301-
signatureBuilder := NewSignature()
302-
303-
for k, v := range params {
304-
typeList := []DataTypeEnum{
305-
Float{},
306-
Int{},
307-
Bool{},
308-
Text{},
309-
Array[Float]{},
310-
Array[Int]{},
311-
Array[Bool]{},
312-
Array[Text]{},
313-
Void{},
314-
}
315-
for _, t := range typeList {
316-
if t.TypeCheck(v) == nil {
317-
signatureBuilder.AddInput(k, t)
318-
break
319-
}
320-
}
321-
}
322-
323-
return signatureBuilder.AddOutput("result", Text{}).Build()
324-
}

internal/workflow/function_task.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"github.com/serverledge-faas/serverledge/internal/config"
8+
"log"
9+
"os"
710
"time"
811

912
"github.com/serverledge-faas/serverledge/internal/node"
@@ -40,7 +43,6 @@ func (s *FunctionTask) execute(input *TaskData, r *Request) (map[string]interfac
4043

4144
// FIXME: We are adding additional inputs from r.Params to match the function signature. This workaround should be
4245
// dropped when we introduce the possibility to specify additional parameters for functions in a workflow.
43-
4446
funct, exists := function.GetFunction(s.Func)
4547
if !exists {
4648
return nil, fmt.Errorf("funtion %s doesn't exists", s.Func)
@@ -60,6 +62,28 @@ func (s *FunctionTask) execute(input *TaskData, r *Request) (map[string]interfac
6062
if err != nil {
6163
return nil, err
6264
}
65+
66+
if config.GetBool(config.WORKFLOW_SAVE_FUNCTION_INPUT_TO_FILE, false) {
67+
// Create directory if it doesn't exist
68+
dirname := "saved-inputs"
69+
err := os.MkdirAll(dirname, 0755)
70+
if err != nil {
71+
log.Printf("could not create directory: %v", err)
72+
}
73+
74+
payload, err := json.Marshal(input.Data)
75+
if err != nil {
76+
log.Printf("could not marshal input data: %v", err)
77+
} else {
78+
// Write to file
79+
filename := fmt.Sprintf("%s/%s-%s-%s.json", dirname, s.Id, s.Func, r.Id)
80+
err = os.WriteFile(filename, payload, 0644)
81+
if err != nil {
82+
log.Printf("could not write input data to file: %v", err)
83+
}
84+
}
85+
}
86+
6387
output, err := s.exec(r, input.Data)
6488
if err != nil {
6589
return nil, err

0 commit comments

Comments
 (0)