Skip to content

Commit 5ac0a97

Browse files
Merge pull request #1 from actionforge/improve-length
Add stream support to length node
2 parents 09a3c5c + 5ae810b commit 5ac0a97

18 files changed

Lines changed: 172 additions & 6 deletions

.github/workflows/graphs/build-quick.act

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ executions:
119119
dst:
120120
node: branch-v1-penguin-monkey-kiwi
121121
port: exec
122+
- src:
123+
node: gh-start
124+
port: exec-on_pull_request
125+
dst:
126+
node: gh-actions-checkout-gray-butterfly-apple
127+
port: exec
122128
- src:
123129
node: gh-start
124130
port: exec-on_workflow_dispatch

.github/workflows/graphs/build-test-publish.act

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6272,6 +6272,12 @@ executions:
62726272
dst:
62736273
node: group-v1-orange-tiger-grapefruit
62746274
port: exec-lion-beige-dragonfruit
6275+
- src:
6276+
node: gh-start
6277+
port: exec-on_pull_request
6278+
dst:
6279+
node: group-v1-orange-tiger-grapefruit
6280+
port: exec-lion-beige-dragonfruit
62756281
- src:
62766282
node: gh-start
62776283
port: exec-on_workflow_dispatch

.github/workflows/graphs/test_env.act

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,12 @@ executions:
266266
dst:
267267
node: run-v1-apple-silver-lion
268268
port: exec
269+
- src:
270+
node: gh-start
271+
port: exec-on_pull_request
272+
dst:
273+
node: run-v1-apple-silver-lion
274+
port: exec
269275
- src:
270276
node: gh-start
271277
port: exec-on_workflow_dispatch

.github/workflows/workflow.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ on:
55
branches:
66
- main
77

8+
pull_request:
9+
branches:
10+
- main
11+
812
workflow_dispatch:
913
inputs:
1014
session_token:

core/base.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package core
22

33
import (
4+
"bytes"
45
"errors"
56
"fmt"
67
"io"
@@ -83,6 +84,7 @@ type DataStreamFactory struct {
8384
SourcePath string
8485
SourceProvider any
8586
Reader io.Reader
87+
Length int64
8688
}
8789

8890
func (dsf *DataStreamFactory) CloseStream() error {
@@ -93,6 +95,25 @@ func (dsf *DataStreamFactory) CloseStreamAndIgnoreError() {
9395
_ = dsf.CloseStream()
9496
}
9597

98+
func GetReaderLength(r io.Reader) int64 {
99+
switch v := r.(type) {
100+
case *bytes.Buffer:
101+
return int64(v.Len())
102+
case *bytes.Reader:
103+
return int64(v.Len())
104+
case *strings.Reader:
105+
return int64(v.Len())
106+
case *os.File:
107+
stat, err := v.Stat()
108+
if err != nil {
109+
return 0
110+
}
111+
return stat.Size()
112+
default:
113+
return -1
114+
}
115+
}
116+
96117
var (
97118
onceReIsExec sync.Once
98119
reIsExec *regexp.Regexp
@@ -130,9 +151,11 @@ type NodeBaseInterface interface {
130151
SetId(id string)
131152
GetNodeTypeId() string
132153
GetName() string
154+
GetLabel() string
133155
GetId() string
134156
GetCacheId() string
135157
SetName(name string)
158+
SetLabel(label string)
136159
GetGraph() *ActionGraph
137160

138161
// Instead of checking for 'HasExecutionInterface',
@@ -151,6 +174,7 @@ type NodeBaseInterface interface {
151174
// The node that implements this component has outgoing connections.
152175
type NodeBaseComponent struct {
153176
Name string // Human readable name of the node
177+
Label string // Label of the node shown in the graph editor
154178
Id string // Unique identifier for the node
155179
FullPath string // Full path of the node within the graph hierarchy
156180
CacheId string // Unique identifier for the cache
@@ -221,10 +245,18 @@ func (n *NodeBaseComponent) GetName() string {
221245
return n.Name
222246
}
223247

248+
func (n *NodeBaseComponent) GetLabel() string {
249+
return n.Label
250+
}
251+
224252
func (n *NodeBaseComponent) SetName(name string) {
225253
n.Name = name
226254
}
227255

256+
func (n *NodeBaseComponent) SetLabel(label string) {
257+
n.Label = label
258+
}
259+
228260
func IsValidIndexPortId(id string) (string, int, bool) {
229261
indexPortMatch := getIndexPortRegex().FindStringSubmatch(id)
230262
if len(indexPortMatch) < 3 {

core/inputs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ func ConvertValue(c *ExecutionState, v reflect.Value, requestedType reflect.Type
603603
if err != nil {
604604
return nil, err
605605
}
606-
return DataStreamFactory{Reader: reader}, nil
606+
return DataStreamFactory{Reader: reader, Length: GetReaderLength(reader)}, nil
607607
case reflectValueType:
608608
// It is common to request a value with the type `any`.
609609
// ... core.InputValueById[any](...)

nodes/file-compress@v1.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func (n *FileZipNode) ExecuteImpl(c *core.ExecutionState, inputId core.InputId,
7272

7373
dsf := core.DataStreamFactory{
7474
Reader: reader,
75+
Length: core.GetReaderLength(reader),
7576
}
7677

7778
err = n.Outputs.SetOutputValue(c, ni.Core_file_compress_v1_Output_data, dsf, core.SetOutputValueOpts{})

nodes/file-read@v1.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ func (n *FileReadNode) ExecuteImpl(c *core.ExecutionState, inputId core.InputId,
3030
return core.CreateErr(c, err)
3131
}
3232

33-
defer fp.Close()
34-
3533
dsf := core.DataStreamFactory{
3634
SourcePath: path,
3735
Reader: fp,
36+
Length: core.GetReaderLength(fp),
3837
}
38+
defer dsf.CloseStreamAndIgnoreError()
3939

4040
err = n.Outputs.SetOutputValue(c, ni.Core_file_read_v1_Output_data, dsf, core.SetOutputValueOpts{})
4141
if err != nil {

nodes/http@v1.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func (n *HttpNode) ExecuteImpl(c *core.ExecutionState, inputId core.InputId, pre
118118
var statusCode int
119119
if resp != nil {
120120
dsf.Reader = resp.Body
121+
dsf.Length = resp.ContentLength
121122
statusCode = resp.StatusCode
122123
}
123124

nodes/length@v1.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package nodes
22

33
import (
44
_ "embed"
5+
"io"
56
"reflect"
67

78
"github.com/actionforge/actrun-cli/core"
@@ -23,8 +24,15 @@ func (n *LengthNode) OutputValueById(c *core.ExecutionState, outputId core.Outpu
2324
return nil, err
2425
}
2526

26-
v := reflect.ValueOf(inputs)
27+
if dsf, ok := inputs.(core.DataStreamFactory); ok {
28+
if dsf.Length != -1 {
29+
return dsf.Length, nil
30+
}
31+
} else if r, ok := inputs.(io.Reader); ok {
32+
return core.GetReaderLength(r), nil
33+
}
2734

35+
v := reflect.ValueOf(inputs)
2836
switch v.Kind() {
2937
case reflect.Array, reflect.Map, reflect.Slice, reflect.String:
3038
return v.Len(), nil

0 commit comments

Comments
 (0)