Skip to content

Commit 75dc693

Browse files
authored
Add native vector builder (#6812)
This commit adds a native vector builder to the vector package that concats vectors of the same type together. The previous Builder is renamed to ValueBuilder. The interface for vector aggregates is also changed to return vectors instead of scalar values and the vector runtime aggregate uses the new Builder to concat these results together.
1 parent a441266 commit 75dc693

34 files changed

Lines changed: 932 additions & 377 deletions

csup/csup_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestCSUPBatchBug(t *testing.T) {
6666
}
6767

6868
func valToVec(sctx *super.Context, val super.Value) vector.Any {
69-
b := vector.NewDynamicBuilder()
69+
b := vector.NewDynamicValueBuilder()
7070
b.Write(val)
7171
return b.Build(sctx)
7272
}

csup/object_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func TestObjectProjectMetadata(t *testing.T) {
2222
"{a:2,b:{c:5,d:0.8}}",
2323
"{a:3,b:{c:6,d:0.9}}",
2424
}
25-
builder := vector.NewDynamicBuilder()
25+
builder := vector.NewDynamicValueBuilder()
2626
for _, s := range supValues {
2727
builder.Write(sup.MustParseValue(sctx, s))
2828
}

csup/writer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (w *Serializer) finalizeObject() error {
103103
type ValWriter struct {
104104
sctx *super.Context
105105
serializer *Serializer
106-
builder *vector.DynamicBuilder
106+
builder *vector.DynamicValueBuilder
107107
}
108108

109109
var _ sio.Writer = (*ValWriter)(nil)
@@ -113,7 +113,7 @@ func NewValWriter(w io.WriteCloser) *ValWriter {
113113
return &ValWriter{
114114
sctx: sctx,
115115
serializer: NewSerializer(w),
116-
builder: vector.NewDynamicBuilder(),
116+
builder: vector.NewDynamicValueBuilder(),
117117
}
118118
}
119119

db/writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ again:
217217
// XXX TBD: this is slow and creates a vector per value when writing vectors
218218
// to a database. This will change when we convert the database from
219219
// mixed BSUP/CSUP to CSUP only.
220-
builder := vector.NewBuilder(val.Type())
220+
builder := vector.NewValueBuilder(val.Type())
221221
builder.Write(val.Bytes())
222222
if err := w.vectorWriter.Push(builder.Build(w.sctx)); err != nil {
223223
w.Abort()

runtime/sam/op/debug/op.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (o *Op) Pull(done bool) (sbuf.Batch, error) {
6565
}
6666

6767
func (o *Op) evalBatch(in sbuf.Batch) vector.Any {
68-
builder := vector.NewDynamicBuilder()
68+
builder := vector.NewDynamicValueBuilder()
6969
for _, x := range in.Values() {
7070
if o.filter == nil || o.where(x) {
7171
builder.Write(o.expr.Eval(x))

runtime/vam/expr/agg/agg.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010
type Func interface {
1111
Consume(vector.Any)
1212
ConsumeAsPartial(vector.Any)
13-
Result(*super.Context) super.Value
14-
ResultAsPartial(*super.Context) super.Value
13+
Result(*super.Context) vector.Any
14+
ResultAsPartial(*super.Context) vector.Any
1515
}
1616

1717
type Pattern func() Func

runtime/vam/expr/agg/any.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,35 @@ package agg
22

33
import (
44
"github.com/brimdata/super"
5-
"github.com/brimdata/super/scode"
65
"github.com/brimdata/super/vector"
76
)
87

98
type Any struct {
10-
val super.Value
9+
result vector.Any
1110
}
1211

1312
func NewAny() *Any {
14-
return &Any{val: super.Null}
13+
return &Any{}
1514
}
1615

1716
func (a *Any) Consume(vec vector.Any) {
18-
if !a.val.IsNull() || vec.Kind() == vector.KindNull {
17+
if a.result != nil || vec.Kind() == vector.KindNull {
1918
return
2019
}
21-
var b scode.Builder
22-
vec.Serialize(&b, 0)
23-
a.val = super.NewValue(vec.Type(), b.Bytes().Body())
20+
a.result = vector.Pick(vec, []uint32{0})
2421
}
2522

2623
func (a *Any) ConsumeAsPartial(vec vector.Any) {
2724
a.Consume(vec)
2825
}
2926

30-
func (a *Any) Result(*super.Context) super.Value {
31-
return a.val
27+
func (a *Any) Result(sctx *super.Context) vector.Any {
28+
if a.result == nil {
29+
return vector.NewNull(1)
30+
}
31+
return a.result
3232
}
3333

34-
func (a *Any) ResultAsPartial(*super.Context) super.Value {
34+
func (a *Any) ResultAsPartial(*super.Context) vector.Any {
3535
return a.Result(nil)
3636
}

runtime/vam/expr/agg/avg.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package agg
22

33
import (
44
"github.com/brimdata/super"
5-
"github.com/brimdata/super/scode"
65
"github.com/brimdata/super/vector"
76
)
87

@@ -22,11 +21,12 @@ func (a *avg) Consume(vec vector.Any) {
2221
a.sum = sum(a.sum, vec)
2322
}
2423

25-
func (a *avg) Result(*super.Context) super.Value {
24+
func (a *avg) Result(*super.Context) vector.Any {
2625
if a.count > 0 {
27-
return super.NewFloat64(a.sum / float64(a.count))
26+
f := a.sum / float64(a.count)
27+
return vector.NewFloat(super.TypeFloat64, []float64{f})
2828
}
29-
return super.Null
29+
return vector.NewNull(1)
3030
}
3131

3232
const (
@@ -62,13 +62,12 @@ func (a *avg) ConsumeAsPartial(partial vector.Any) {
6262
a.count += vector.UintValue(countVal, idx)
6363
}
6464

65-
func (a *avg) ResultAsPartial(sctx *super.Context) super.Value {
66-
var b scode.Builder
67-
b.Append(super.EncodeFloat64(a.sum))
68-
b.Append(super.EncodeUint(a.count))
65+
func (a *avg) ResultAsPartial(sctx *super.Context) vector.Any {
66+
sum := vector.NewFloat(super.TypeFloat64, []float64{a.sum})
67+
count := vector.NewUint(super.TypeUint64, []uint64{a.count})
6968
typ := sctx.MustLookupTypeRecord([]super.Field{
7069
super.NewField(sumName, super.TypeFloat64),
7170
super.NewField(countName, super.TypeUint64),
7271
})
73-
return super.NewValue(typ, b.Bytes())
72+
return vector.NewRecord(typ, []vector.Any{sum, count}, 1)
7473
}

runtime/vam/expr/agg/collect.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package agg
33
import (
44
"github.com/brimdata/super"
55
samagg "github.com/brimdata/super/runtime/sam/expr/agg"
6+
"github.com/brimdata/super/sbuf"
67
"github.com/brimdata/super/scode"
78
"github.com/brimdata/super/vector"
89
)
@@ -21,8 +22,9 @@ func (c *collect) Consume(vec vector.Any) {
2122
}
2223
}
2324

24-
func (c *collect) Result(sctx *super.Context) super.Value {
25-
return c.samcollect.Result(sctx)
25+
func (c *collect) Result(sctx *super.Context) vector.Any {
26+
val := c.samcollect.Result(sctx)
27+
return sbuf.Dematerialize(sctx, sbuf.NewArray([]super.Value{val}))
2628
}
2729

2830
func (c *collect) ConsumeAsPartial(partial vector.Any) {
@@ -53,6 +55,7 @@ func (c *collect) ConsumeAsPartial(partial vector.Any) {
5355
}
5456
}
5557

56-
func (c *collect) ResultAsPartial(sctx *super.Context) super.Value {
57-
return c.samcollect.ResultAsPartial(sctx)
58+
func (c *collect) ResultAsPartial(sctx *super.Context) vector.Any {
59+
val := c.samcollect.ResultAsPartial(sctx)
60+
return sbuf.Dematerialize(sctx, sbuf.NewArray([]super.Value{val}))
5861
}

runtime/vam/expr/agg/collectmap.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package agg
33
import (
44
"github.com/brimdata/super"
55
samagg "github.com/brimdata/super/runtime/sam/expr/agg"
6+
"github.com/brimdata/super/sbuf"
67
"github.com/brimdata/super/scode"
78
"github.com/brimdata/super/vector"
89
)
@@ -28,14 +29,16 @@ func (c *collectMap) Consume(vec vector.Any) {
2829
}
2930
}
3031

31-
func (c *collectMap) Result(sctx *super.Context) super.Value {
32-
return c.samCollectMap.Result(sctx)
32+
func (c *collectMap) Result(sctx *super.Context) vector.Any {
33+
val := c.samCollectMap.Result(sctx)
34+
return sbuf.Dematerialize(sctx, sbuf.NewArray([]super.Value{val}))
3335
}
3436

3537
func (c *collectMap) ConsumeAsPartial(partial vector.Any) {
3638
c.Consume(partial)
3739
}
3840

39-
func (c *collectMap) ResultAsPartial(sctx *super.Context) super.Value {
40-
return c.samCollectMap.ResultAsPartial(sctx)
41+
func (c *collectMap) ResultAsPartial(sctx *super.Context) vector.Any {
42+
val := c.samCollectMap.ResultAsPartial(sctx)
43+
return sbuf.Dematerialize(sctx, sbuf.NewArray([]super.Value{val}))
4144
}

0 commit comments

Comments
 (0)