Skip to content

Commit cbe279c

Browse files
authored
Merge pull request #408 from machbase/wip-bindata
Enhance binary data handling and update neo-client dependency
1 parent 0ebf93c commit cbe279c

54 files changed

Lines changed: 2759 additions & 657 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api/append_worker.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,18 @@ var appenders map[string]*AppendWorker
3333
var appendersLock sync.Mutex
3434
var appendersFlusher chan struct{}
3535
var appendersFlusherWg sync.WaitGroup
36+
var appendersControl chan *AppendWorkerControl // send table name to stop the appender
37+
38+
type AppendWorkerControl struct {
39+
TableName string
40+
ack chan struct{}
41+
}
3642

3743
func StartAppendWorkers() {
3844
appenders = make(map[string]*AppendWorker)
3945
appendersFlusher = make(chan struct{})
4046
appendersFlusherWg.Add(1)
47+
appendersControl = make(chan *AppendWorkerControl)
4148
go func() {
4249
defer appendersFlusherWg.Done()
4350
for {
@@ -55,6 +62,14 @@ func StartAppendWorkers() {
5562
delete(appenders, tableName)
5663
}
5764
appendersLock.Unlock()
65+
case control := <-appendersControl:
66+
appendersLock.Lock()
67+
if value, exists := appenders[control.TableName]; exists {
68+
value.Stop()
69+
delete(appenders, control.TableName)
70+
}
71+
appendersLock.Unlock()
72+
close(control.ack)
5873
case <-appendersFlusher:
5974
return
6075
}
@@ -70,6 +85,17 @@ func StopAppendWorkers() {
7085
}
7186
}
7287

88+
// StopAppendWorker stops the append worker for the specified table
89+
// and returns a channel to wait for the stop to complete
90+
func StopAppendWorker(tableName string) chan struct{} {
91+
ack := make(chan struct{})
92+
appendersControl <- &AppendWorkerControl{
93+
TableName: strings.ToLower(tableName),
94+
ack: ack,
95+
}
96+
return ack
97+
}
98+
7399
// FlushAppendWorkers flushes all append workers
74100
// tables: table names to flush
75101
// if tables is empty, flush all append workers
@@ -84,6 +110,7 @@ func FlushAppendWorkers(tables ...string) {
84110
} else {
85111
var deleting []string
86112
for _, tableName := range tables {
113+
tableName = strings.ToLower(tableName)
87114
if value, exists := appenders[tableName]; exists {
88115
value.Stop()
89116
deleting = append(deleting, tableName)
@@ -99,6 +126,7 @@ func GetAppendWorker(ctx context.Context, db api.Database, tableName string) (*A
99126
appendersLock.Lock()
100127
defer appendersLock.Unlock()
101128

129+
tableName = strings.ToLower(tableName)
102130
if aw, exists := appenders[tableName]; exists {
103131
aw.lastTime = time.Now()
104132
atomic.AddInt32(&aw.refCount, 1)
@@ -135,7 +163,7 @@ func GetAppendWorker(ctx context.Context, db api.Database, tableName string) (*A
135163
tableDesc: tableDesc,
136164
lastTime: time.Now(),
137165
refCount: 1,
138-
log: logging.GetLog(fmt.Sprintf("appender-%s", strings.ToLower(tableName))),
166+
log: logging.GetLog(fmt.Sprintf("appender-%s", tableName)),
139167
}
140168
appenders[tableName] = ret
141169
ret.Start()

api/append_worker_test.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package api
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
"testing"
7+
"time"
8+
9+
clientapi "github.com/machbase/neo-client/api"
10+
"github.com/machbase/neo-server/v8/mods/logging"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
type appendWorkerTestConn struct {
15+
closed int32
16+
}
17+
18+
func (c *appendWorkerTestConn) Close() error {
19+
atomic.AddInt32(&c.closed, 1)
20+
return nil
21+
}
22+
23+
func (c *appendWorkerTestConn) Exec(context.Context, string, ...any) clientapi.Result {
24+
return nil
25+
}
26+
27+
func (c *appendWorkerTestConn) Query(context.Context, string, ...any) (clientapi.Rows, error) {
28+
return nil, nil
29+
}
30+
31+
func (c *appendWorkerTestConn) QueryRow(context.Context, string, ...any) clientapi.Row {
32+
return nil
33+
}
34+
35+
func (c *appendWorkerTestConn) Prepare(context.Context, string) (clientapi.Stmt, error) {
36+
return nil, nil
37+
}
38+
39+
func (c *appendWorkerTestConn) Appender(context.Context, string, ...clientapi.AppenderOption) (clientapi.Appender, error) {
40+
return nil, nil
41+
}
42+
43+
func (c *appendWorkerTestConn) Explain(context.Context, string, bool) (string, error) {
44+
return "", nil
45+
}
46+
47+
type appendWorkerTestAppender struct {
48+
tableName string
49+
tableType clientapi.TableType
50+
columns clientapi.Columns
51+
appendRows [][]any
52+
closed int32
53+
}
54+
55+
func (a *appendWorkerTestAppender) TableName() string {
56+
return a.tableName
57+
}
58+
59+
func (a *appendWorkerTestAppender) Append(values ...any) error {
60+
a.appendRows = append(a.appendRows, values)
61+
return nil
62+
}
63+
64+
func (a *appendWorkerTestAppender) AppendLogTime(ts time.Time, values ...any) error {
65+
row := append([]any{ts}, values...)
66+
a.appendRows = append(a.appendRows, row)
67+
return nil
68+
}
69+
70+
func (a *appendWorkerTestAppender) Close() (int64, int64, error) {
71+
atomic.AddInt32(&a.closed, 1)
72+
return int64(len(a.appendRows)), 0, nil
73+
}
74+
75+
func (a *appendWorkerTestAppender) Columns() (clientapi.Columns, error) {
76+
return a.columns, nil
77+
}
78+
79+
func (a *appendWorkerTestAppender) TableType() clientapi.TableType {
80+
return a.tableType
81+
}
82+
83+
func (a *appendWorkerTestAppender) WithInputColumns(...string) clientapi.Appender {
84+
return a
85+
}
86+
87+
func (a *appendWorkerTestAppender) WithInputFormats(...string) clientapi.Appender {
88+
return a
89+
}
90+
91+
func (a *appendWorkerTestAppender) WithBatchMaxRows(int) clientapi.Appender {
92+
return a
93+
}
94+
95+
func (a *appendWorkerTestAppender) WithBatchMaxBytes(int) clientapi.Appender {
96+
return a
97+
}
98+
99+
func (a *appendWorkerTestAppender) WithBatchMaxDelay(time.Duration) clientapi.Appender {
100+
return a
101+
}
102+
103+
func newAppendWorkerForTest(tableName string) (*AppendWorker, *appendWorkerTestAppender, *appendWorkerTestConn) {
104+
ctx, cancel := context.WithCancel(context.Background())
105+
appender := &appendWorkerTestAppender{
106+
tableName: tableName,
107+
tableType: clientapi.TableTypeLog,
108+
columns: clientapi.Columns{
109+
{Name: "NAME", DataType: clientapi.DataTypeString},
110+
{Name: "VALUE", DataType: clientapi.DataTypeFloat64},
111+
},
112+
}
113+
conn := &appendWorkerTestConn{}
114+
return &AppendWorker{
115+
ctx: ctx,
116+
ctxCancel: cancel,
117+
conn: conn,
118+
appender: appender,
119+
tableDesc: &clientapi.TableDescription{Name: tableName},
120+
lastTime: time.Now(),
121+
log: logging.GetLog("append-worker-test"),
122+
}, appender, conn
123+
}
124+
125+
func TestAppendWorkerRegistryStopsByLowerCaseName(t *testing.T) {
126+
StartAppendWorkers()
127+
t.Cleanup(StopAppendWorkers)
128+
129+
worker, appender, conn := newAppendWorkerForTest("sensor")
130+
appendersLock.Lock()
131+
appenders["sensor"] = worker
132+
appendersLock.Unlock()
133+
134+
ack := StopAppendWorker("SENSOR")
135+
select {
136+
case <-ack:
137+
case <-time.After(time.Second):
138+
t.Fatal("timed out waiting for append worker stop ack")
139+
}
140+
141+
appendersLock.Lock()
142+
_, exists := appenders["sensor"]
143+
appendersLock.Unlock()
144+
require.False(t, exists)
145+
require.Equal(t, int32(1), atomic.LoadInt32(&appender.closed))
146+
require.Equal(t, int32(1), atomic.LoadInt32(&conn.closed))
147+
}
148+
149+
func TestFlushAppendWorkersMatchesNamesCaseInsensitively(t *testing.T) {
150+
StartAppendWorkers()
151+
t.Cleanup(StopAppendWorkers)
152+
153+
sensor, sensorAppender, _ := newAppendWorkerForTest("sensor")
154+
metric, metricAppender, _ := newAppendWorkerForTest("metric")
155+
appendersLock.Lock()
156+
appenders["sensor"] = sensor
157+
appenders["metric"] = metric
158+
appendersLock.Unlock()
159+
160+
FlushAppendWorkers("SENSOR")
161+
162+
appendersLock.Lock()
163+
_, sensorExists := appenders["sensor"]
164+
_, metricExists := appenders["metric"]
165+
appendersLock.Unlock()
166+
require.False(t, sensorExists)
167+
require.True(t, metricExists)
168+
require.Equal(t, int32(1), atomic.LoadInt32(&sensorAppender.closed))
169+
require.Equal(t, int32(0), atomic.LoadInt32(&metricAppender.closed))
170+
171+
FlushAppendWorkers()
172+
require.Equal(t, int32(1), atomic.LoadInt32(&metricAppender.closed))
173+
require.Empty(t, appenders)
174+
}
175+
176+
func TestGetAppendWorkerReusesRegisteredWorkerCaseInsensitively(t *testing.T) {
177+
StartAppendWorkers()
178+
t.Cleanup(StopAppendWorkers)
179+
180+
worker, _, _ := newAppendWorkerForTest("sensor")
181+
appendersLock.Lock()
182+
appenders["sensor"] = worker
183+
appendersLock.Unlock()
184+
185+
got, err := GetAppendWorker(context.Background(), nil, "SENSOR")
186+
require.NoError(t, err)
187+
require.Same(t, worker, got)
188+
require.Equal(t, int32(1), atomic.LoadInt32(&got.refCount))
189+
}
190+
191+
func TestAppenderWithWorkerMapsInputColumns(t *testing.T) {
192+
worker, _, _ := newAppendWorkerForTest("sensor")
193+
worker.appendC = make(chan []interface{}, 1)
194+
195+
wrapped := worker.WithInputColumns("value", "name")
196+
require.NoError(t, wrapped.Append(3.14, "temperature"))
197+
require.Equal(t, []interface{}{"temperature", 3.14}, <-worker.appendC)
198+
199+
require.EqualError(t, worker.WithInputColumns().Append("only-name"), "value count 1, table 'sensor' requires 2 columns to append")
200+
}
201+
202+
func TestAppendWorkerAppendLogTimeRequiresLogTable(t *testing.T) {
203+
worker, appender, _ := newAppendWorkerForTest("sensor")
204+
worker.appendC = make(chan []interface{}, 1)
205+
ts := time.Unix(1, 2)
206+
require.NoError(t, worker.AppendLogTime(ts, "temperature", 3.14))
207+
require.Equal(t, []interface{}{ts, "temperature", 3.14}, <-worker.appendC)
208+
209+
appender.tableType = clientapi.TableTypeFixed
210+
err := worker.AppendLogTime(ts, "temperature", 3.14)
211+
require.EqualError(t, err, "sensor is not a log table, use Append() instead")
212+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ require (
2828
github.com/jedib0t/go-pretty/v6 v6.7.8
2929
github.com/jellydator/ttlcache/v3 v3.3.0
3030
github.com/lib/pq v1.10.9
31-
github.com/machbase/neo-client v1.3.1
31+
github.com/machbase/neo-client v1.3.2-0.20260515002807-552d3a7e27b8
3232
github.com/machbase/neo-engine/v8 v8.5.2
3333
github.com/machbase/neo-pkgdev v0.0.0-20240911234518-701b00a03b6b
3434
github.com/magefile/mage v1.16.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,8 @@ github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69
284284
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
285285
github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3 h1:PwQumkgq4/acIiZhtifTV5OUqqiP82UAl0h87xj/l9k=
286286
github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3/go.mod h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg=
287-
github.com/machbase/neo-client v1.3.1 h1:DpHJ7rU3FpJPtdt3j26TI0fQIx7ZIClabpsqAByd0zU=
288-
github.com/machbase/neo-client v1.3.1/go.mod h1:rCaZqbs86TUVDghcOW4XH9Htodk9D7t5Cu5j3aWphPU=
287+
github.com/machbase/neo-client v1.3.2-0.20260515002807-552d3a7e27b8 h1:CcUAVaifOv3ouSHugOH5LClb+EipbZHMNO5J3lW0yMo=
288+
github.com/machbase/neo-client v1.3.2-0.20260515002807-552d3a7e27b8/go.mod h1:rCaZqbs86TUVDghcOW4XH9Htodk9D7t5Cu5j3aWphPU=
289289
github.com/machbase/neo-engine/v8 v8.5.2 h1:X1dkfp0QsStQDR11SYLu0WaHINgaObaKpv7R5nOXYsg=
290290
github.com/machbase/neo-engine/v8 v8.5.2/go.mod h1:Z2Ey5LqiXunFTTBsPBeeuoeQEJ/Rky92flsHyo6MkfA=
291291
github.com/machbase/neo-pkgdev v0.0.0-20240911234518-701b00a03b6b h1:yVScvJT9bIGaJmajia+/xz5tkOpRQ5utmpXstV2C37k=

jsh/engine/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (jr *JSRuntime) setContext(ctx context.Context) {
7070

7171
func (jr *JSRuntime) RegisterNativeModule(name string, loader NativeModuleLoader) {
7272
jr.registry.RegisterNativeModule(name, func(rt *goja.Runtime, module *goja.Object) {
73-
loader(jr.currentContext(), rt, module)
73+
loader(ContextWithEventLoop(jr.currentContext(), jr.eventLoop), rt, module)
7474
})
7575
}
7676

jsh/engine/events.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package engine
22

33
import (
4+
"context"
5+
46
"github.com/dop251/goja"
57
"github.com/dop251/goja_nodejs/eventloop"
68
)
@@ -9,6 +11,23 @@ func NewEventLoop(opts ...eventloop.Option) *eventloop.EventLoop {
911
return eventloop.NewEventLoop(opts...)
1012
}
1113

14+
type eventLoopContextKey struct{}
15+
16+
func ContextWithEventLoop(ctx context.Context, loop *eventloop.EventLoop) context.Context {
17+
if ctx == nil {
18+
ctx = context.Background()
19+
}
20+
return context.WithValue(ctx, eventLoopContextKey{}, loop)
21+
}
22+
23+
func EventLoopFromContext(ctx context.Context) *eventloop.EventLoop {
24+
if ctx == nil {
25+
return nil
26+
}
27+
loop, _ := ctx.Value(eventLoopContextKey{}).(*eventloop.EventLoop)
28+
return loop
29+
}
30+
1231
type EventValueProvider interface {
1332
EventValue(vm *goja.Runtime) goja.Value
1433
}

0 commit comments

Comments
 (0)