Skip to content

Commit 0531cbb

Browse files
authored
Fix bulking behavior in gremlin-go for TP 4.0 (#3397)
1 parent eb276fd commit 0531cbb

17 files changed

Lines changed: 499 additions & 61 deletions

docs/src/reference/gremlin-variants.asciidoc

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,13 @@ Some connection options can also be set on individual requests made through the
216216
results, err := g.With("evaluationTimeout", 500).V().Out("knows").ToList()
217217
----
218218
219-
The following options are allowed on a per-request basis in this fashion: `batchSize`, `userAgent` and
219+
The following options are allowed on a per-request basis in this fashion: `batchSize`, `bulkResults`, `userAgent` and
220220
`evaluationTimeout`.
221221
222+
NOTE: When submitting traversals through `DriverRemoteConnection`, `bulkResults` defaults to `true` per-request
223+
to optimize result transfer. This does not apply to direct `Client.Submit()` calls, where `bulkResults` must be
224+
set explicitly if desired.
225+
222226
anchor:go-imports[]
223227
[[gremlin-go-imports]]
224228
=== Common Imports
@@ -351,7 +355,7 @@ options := new(RequestOptionsBuilder).
351355
resultSet, err := client.SubmitWithOptions("g.V(x).count()", options)
352356
----
353357
354-
The following options are allowed on a per-request basis in this fashion: `batchSize`, `userAgent`,
358+
The following options are allowed on a per-request basis in this fashion: `batchSize`, `bulkResults`, `userAgent`,
355359
`evaluationTimeout` and `materializeProperties`.
356360
`RequestOptions` may also contain a map of variable `bindings` to be applied to the supplied
357361
traversal string.
@@ -813,9 +817,13 @@ GraphTraversalSource g = traversal().with(conf);
813817
List<Vertex> vertices = g.with(Tokens.ARGS_EVAL_TIMEOUT, 500L).V().out("knows").toList()
814818
----
815819
816-
The following options are allowed on a per-request basis in this fashion: `batchSize`, `userAgent`,
820+
The following options are allowed on a per-request basis in this fashion: `batchSize`, `bulkResults`, `userAgent`,
817821
`materializeProperties` and `evaluationTimeout`. Use of `Tokens` to reference these options is preferred.
818822
823+
NOTE: When submitting traversals through `DriverRemoteConnection`, `bulkResults` defaults to `true` per-request
824+
to optimize result transfer. This does not apply to direct `Client.submit()` calls, where `bulkResults` must be
825+
set explicitly if desired.
826+
819827
anchor:java-imports[]
820828
[[gremlin-java-imports]]
821829
=== Common Imports
@@ -1590,6 +1598,10 @@ const vertices = await g.with_('evaluationTimeout', 500).V().out('knows').toList
15901598
The following options are allowed on a per-request basis in this fashion: `batchSize`, `requestId`, `userAgent`,
15911599
`bulkResults`, `materializeProperties` and `evaluationTimeout`.
15921600
1601+
NOTE: When submitting traversals through `DriverRemoteConnection`, `bulkResults` defaults to `true` per-request
1602+
to optimize result transfer. This does not apply to direct `Client.submit()` calls, where `bulkResults` must be
1603+
set explicitly if desired.
1604+
15931605
[[gremlin-javascript-imports]]
15941606
=== Common Imports
15951607
@@ -2124,10 +2136,14 @@ For instance to set request timeout to 500 milliseconds:
21242136
var l = g.With(Tokens.ArgsEvalTimeout, 500).V().Out("knows").Count().ToList();
21252137
----
21262138
2127-
The following options are allowed on a per-request basis in this fashion: `batchSize`, `userAgent`,
2139+
The following options are allowed on a per-request basis in this fashion: `batchSize`, `bulkResults`, `userAgent`,
21282140
`materializeProperties`, and `evaluationTimeout`. These options are available as constants on the
21292141
`Gremlin.Net.Driver.Tokens` class.
21302142
2143+
NOTE: When submitting traversals through `DriverRemoteConnection`, `bulkResults` defaults to `true` per-request
2144+
to optimize result transfer. This does not apply to direct `GremlinClient.SubmitAsync()` calls, where `bulkResults`
2145+
must be set explicitly if desired.
2146+
21312147
[[gremlin-dotnet-imports]]
21322148
=== Common Imports
21332149
@@ -2270,7 +2286,7 @@ feature is to set a per-request override to the `evaluationTimeout` so that it o
22702286
include::../../../gremlin-dotnet/test/Gremlin.Net.IntegrationTest/Docs/Reference/GremlinVariantsTests.cs[tags=submittingScriptsWithTimeout]
22712287
----
22722288
2273-
The following options are allowed on a per-request basis in this fashion: `batchSize`, `userAgent`, `materializeProperties`
2289+
The following options are allowed on a per-request basis in this fashion: `batchSize`, `bulkResults`, `userAgent`, `materializeProperties`
22742290
and `evaluationTimeout`. These options are available as constants on the `Gremlin.Net.Driver.Tokens` class.
22752291
22762292
==== Request Interceptors
@@ -2582,6 +2598,10 @@ vertices = g.with_('evaluationTimeout', 500).V().out('knows').to_list()
25822598
The following options are allowed on a per-request basis in this fashion: `batchSize`, `bulkResults`, `language`,
25832599
`materializeProperties`, `userAgent`, and `evaluationTimeout`.
25842600
2601+
NOTE: When submitting traversals through `DriverRemoteConnection`, `bulkResults` defaults to `True` per-request
2602+
to optimize result transfer. This does not apply to direct `Client.submit()` calls, where `bulkResults` must be
2603+
set explicitly if desired.
2604+
25852605
anchor:python-imports[]
25862606
[[gremlin-python-imports]]
25872607
=== Common Imports
@@ -2808,7 +2828,7 @@ request.
28082828
result_set = client.submit('g.V().repeat(both()).times(100)', request_options={'evaluationTimeout': 5000})
28092829
----
28102830
2811-
The following options are allowed on a per-request basis in this fashion: `batchSize`, `requestId`, `userAgent`,
2831+
The following options are allowed on a per-request basis in this fashion: `batchSize`, `bulkResults`, `requestId`, `userAgent`,
28122832
`materializeProperties` and `evaluationTimeout` (formerly `scriptEvaluationTimeout` which is also supported but now deprecated).
28132833
28142834
IMPORTANT: The preferred method for setting a per-request timeout for scripts is demonstrated above, but those familiar

gremlin-go/driver/client.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,12 @@ func (client *Client) submitGremlinLang(gremlinLang *GremlinLang) (ResultSet, er
170170
requestOptionsBuilder = applyOptionsConfig(requestOptionsBuilder, gremlinLang.optionsStrategies[0].configuration)
171171
}
172172

173+
// default bulkResults to true when using DRC through request options
174+
// consistent with Java RequestOptions.getRequestOptions and Python extract_request_options
175+
if requestOptionsBuilder.bulkResults == nil {
176+
requestOptionsBuilder.SetBulkResults(true)
177+
}
178+
173179
request := MakeStringRequest(gremlinLang.GetGremlin(), client.traversalSource, requestOptionsBuilder.Create())
174180
return client.conn.submit(&request)
175181
}
@@ -179,12 +185,12 @@ func applyOptionsConfig(builder *RequestOptionsBuilder, config map[string]interf
179185

180186
// Map configuration keys to setter method names
181187
setterMap := map[string]string{
182-
"requestId": "SetRequestId",
183188
"evaluationTimeout": "SetEvaluationTimeout",
184189
"batchSize": "SetBatchSize",
185190
"userAgent": "SetUserAgent",
186191
"bindings": "SetBindings",
187192
"materializeProperties": "SetMaterializeProperties",
193+
"bulkResults": "SetBulkResults",
188194
}
189195

190196
for key, value := range config {

gremlin-go/driver/client_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,60 @@ func TestClient(t *testing.T) {
158158
AssertMarkoVertexWithoutProperties(t, result)
159159
})
160160

161+
t.Run("Test client.SubmitWithOptions() with bulkResults true", func(t *testing.T) {
162+
skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable)
163+
client, err := NewClient(testNoAuthUrl,
164+
func(settings *ClientSettings) {
165+
settings.TlsConfig = testNoAuthTlsConfig
166+
})
167+
assert.NoError(t, err)
168+
assert.NotNil(t, client)
169+
defer client.Close()
170+
171+
resultSet, err := client.SubmitWithOptions("g.inject(1,2,3,2,1)",
172+
new(RequestOptionsBuilder).SetBulkResults(true).Create())
173+
assert.NoError(t, err)
174+
assert.NotNil(t, resultSet)
175+
results, err := resultSet.All()
176+
assert.NoError(t, err)
177+
// With bulkResults=true, the ResultSet contains Traverser objects (one per unique value).
178+
// This is consistent with Java, Python, .NET, and JS which all expose Traverser objects
179+
// at the ResultSet level when bulking is enabled.
180+
// g.inject(1,2,3,2,1) has 3 unique values: 1 (bulk=2), 2 (bulk=2), 3 (bulk=1)
181+
assert.Equal(t, 3, len(results))
182+
totalBulk := int64(0)
183+
for _, r := range results {
184+
tr, err := r.GetTraverser()
185+
assert.NoError(t, err)
186+
totalBulk += tr.Bulk
187+
}
188+
assert.Equal(t, int64(5), totalBulk)
189+
})
190+
191+
t.Run("Test client.SubmitWithOptions() with bulkResults false", func(t *testing.T) {
192+
skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable)
193+
client, err := NewClient(testNoAuthUrl,
194+
func(settings *ClientSettings) {
195+
settings.TlsConfig = testNoAuthTlsConfig
196+
})
197+
assert.NoError(t, err)
198+
assert.NotNil(t, client)
199+
defer client.Close()
200+
201+
resultSet, err := client.SubmitWithOptions("g.inject(1,2,3,2,1)",
202+
new(RequestOptionsBuilder).SetBulkResults(false).Create())
203+
assert.NoError(t, err)
204+
assert.NotNil(t, resultSet)
205+
results, err := resultSet.All()
206+
assert.NoError(t, err)
207+
// With bulkResults=false, the ResultSet contains raw values (no Traverser wrapping).
208+
assert.Equal(t, 5, len(results))
209+
for _, r := range results {
210+
_, err := r.GetTraverser()
211+
assert.Error(t, err, "expected raw value, not Traverser")
212+
}
213+
})
214+
161215
t.Run("Test deserialization of VertexProperty with properties", func(t *testing.T) {
162216
skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable)
163217
client, err := NewClient(testNoAuthUrl,

gremlin-go/driver/connection.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,23 @@ func (c *connection) streamToResultSet(reader io.Reader, rs ResultSet) {
317317
return
318318
}
319319

320-
rs.Channel() <- &Result{obj}
320+
if d.IsBulked() {
321+
bulkObj, err := d.ReadFullyQualified()
322+
if err != nil {
323+
c.logHandler.logf(Error, failedToReceiveResponse, err.Error())
324+
rs.setError(err)
325+
return
326+
}
327+
bulk, ok := bulkObj.(int64)
328+
if !ok {
329+
c.logHandler.logf(Error, failedToReceiveResponse, "expected int64 bulk count")
330+
rs.setError(fmt.Errorf("expected int64 bulk count, got %T", bulkObj))
331+
return
332+
}
333+
rs.Channel() <- &Result{&Traverser{Bulk: bulk, Value: obj}}
334+
} else {
335+
rs.Channel() <- &Result{obj}
336+
}
321337
}
322338
}
323339

gremlin-go/driver/connection_test.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,11 @@ func TestConnection(t *testing.T) {
376376
var names []string
377377
for _, res := range allResults {
378378
assert.NotNil(t, res)
379-
vp, err := res.GetVertexProperty()
380-
assert.Nil(t, err)
379+
// DRC defaults bulkResults=true, so results should be Traverser-wrapped.
380+
tr, ok := res.Data.(*Traverser)
381+
assert.True(t, ok, "expected *Traverser from DRC path with bulkResults=true, got %T", res.Data)
382+
vp, ok := tr.Value.(*VertexProperty)
383+
assert.True(t, ok)
381384
names = append(names, vp.Value.(string))
382385
}
383386
assert.True(t, sortAndCompareTwoStringSlices(names, testNames))
@@ -775,6 +778,35 @@ func TestConnection(t *testing.T) {
775778
assert.Greater(t, len(props), 0)
776779
}
777780
})
781+
782+
t.Run("Test bulkResults with DRC request option", func(t *testing.T) {
783+
skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable)
784+
785+
g := getModernGraph(t, testNoAuthUrl, &tls.Config{})
786+
defer g.remoteConnection.Close()
787+
788+
// bulkResults is defaulted to true in submitGremlinLang, results should still be correct
789+
results, err := g.Inject(1, 2, 3, 2, 1).ToList()
790+
assert.Nil(t, err)
791+
assert.Equal(t, 5, len(results))
792+
})
793+
794+
t.Run("Test bulkResults with explicit With option", func(t *testing.T) {
795+
skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable)
796+
797+
g := getModernGraph(t, testNoAuthUrl, &tls.Config{})
798+
defer g.remoteConnection.Close()
799+
800+
// explicitly set bulkResults to true via With
801+
results, err := g.With("bulkResults", true).Inject(1, 2, 3, 2, 1).ToList()
802+
assert.Nil(t, err)
803+
assert.Equal(t, 5, len(results))
804+
805+
// explicitly set bulkResults to false via With
806+
results, err = g.With("bulkResults", false).Inject(1, 2, 3, 2, 1).ToList()
807+
assert.Nil(t, err)
808+
assert.Equal(t, 5, len(results))
809+
})
778810
}
779811

780812
func submitCount(i int, client *Client, t *testing.T) {

gremlin-go/driver/graphBinaryDeserializer.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ import (
5757
// The bufio.Reader wrapper provides efficient buffering without affecting the
5858
// streaming semantics - it simply reduces the number of underlying read syscalls.
5959
type GraphBinaryDeserializer struct {
60-
r *bufio.Reader
61-
buf [8]byte
62-
err error // sticky error
60+
r *bufio.Reader
61+
buf [8]byte
62+
err error // sticky error
63+
bulked bool // whether the response stream uses bulked encoding
6364
}
6465

6566
// GraphBinary flag for bulked list/set
@@ -133,13 +134,24 @@ func (d *GraphBinaryDeserializer) readInt64() (int64, error) {
133134
}
134135

135136
// ReadHeader reads and validates the GraphBinary response header.
137+
// The header consists of a version byte and a bulking flag byte (0x00 = not bulked, 0x01 = bulked).
136138
// This must be called before reading any objects from the stream.
137139
func (d *GraphBinaryDeserializer) ReadHeader() error {
138140
if _, err := d.readByte(); err != nil {
139141
return err
140142
}
141-
_, err := d.readByte()
142-
return err
143+
flag, err := d.readByte()
144+
if err != nil {
145+
return err
146+
}
147+
d.bulked = flag == 0x01
148+
return nil
149+
}
150+
151+
// IsBulked returns whether the response stream uses bulked encoding.
152+
// This is determined by the header flag read during ReadHeader().
153+
func (d *GraphBinaryDeserializer) IsBulked() bool {
154+
return d.bulked
143155
}
144156

145157
// ReadFullyQualified reads the next fully-qualified GraphBinary value from the stream.

gremlin-go/driver/request.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ under the License.
1919

2020
package gremlingo
2121

22+
import "strconv"
23+
2224
// RequestMessage represents a request to the server.
2325
type RequestMessage struct {
2426
Gremlin string
@@ -71,6 +73,10 @@ func MakeStringRequest(stringGremlin string, traversalSource string, requestOpti
7173
newFields["materializeProperties"] = requestOptions.materializeProperties
7274
}
7375

76+
if requestOptions.bulkResults != nil {
77+
newFields["bulkResults"] = strconv.FormatBool(*requestOptions.bulkResults)
78+
}
79+
7480
return RequestMessage{
7581
Gremlin: stringGremlin,
7682
Fields: newFields,
@@ -82,7 +88,7 @@ func MakeStringRequest(stringGremlin string, traversalSource string, requestOpti
8288
var allowedReqArgs = map[string]bool{
8389
"evaluationTimeout": true,
8490
"batchSize": true,
85-
"requestId": true,
8691
"userAgent": true,
8792
"materializeProperties": true,
93+
"bulkResults": true,
8894
}

gremlin-go/driver/requestOptions.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,22 @@ under the License.
1919

2020
package gremlingo
2121

22-
import (
23-
"github.com/google/uuid"
24-
)
25-
2622
type RequestOptions struct {
27-
requestID uuid.UUID
2823
evaluationTimeout int
2924
batchSize int
3025
userAgent string
3126
bindings map[string]interface{}
3227
materializeProperties string
28+
bulkResults *bool
3329
}
3430

3531
type RequestOptionsBuilder struct {
36-
requestID uuid.UUID
3732
evaluationTimeout int
3833
batchSize int
3934
userAgent string
4035
bindings map[string]interface{}
4136
materializeProperties string
42-
}
43-
44-
func (builder *RequestOptionsBuilder) SetRequestId(requestId uuid.UUID) *RequestOptionsBuilder {
45-
builder.requestID = requestId
46-
return builder
37+
bulkResults *bool
4738
}
4839

4940
func (builder *RequestOptionsBuilder) SetEvaluationTimeout(evaluationTimeout int) *RequestOptionsBuilder {
@@ -71,6 +62,11 @@ func (builder *RequestOptionsBuilder) SetMaterializeProperties(materializeProper
7162
return builder
7263
}
7364

65+
func (builder *RequestOptionsBuilder) SetBulkResults(bulkResults bool) *RequestOptionsBuilder {
66+
builder.bulkResults = &bulkResults
67+
return builder
68+
}
69+
7470
func (builder *RequestOptionsBuilder) AddBinding(key string, binding interface{}) *RequestOptionsBuilder {
7571
if builder.bindings == nil {
7672
builder.bindings = make(map[string]interface{})
@@ -82,12 +78,12 @@ func (builder *RequestOptionsBuilder) AddBinding(key string, binding interface{}
8278
func (builder *RequestOptionsBuilder) Create() RequestOptions {
8379
requestOptions := new(RequestOptions)
8480

85-
requestOptions.requestID = builder.requestID
8681
requestOptions.evaluationTimeout = builder.evaluationTimeout
8782
requestOptions.batchSize = builder.batchSize
8883
requestOptions.userAgent = builder.userAgent
8984
requestOptions.bindings = builder.bindings
9085
requestOptions.materializeProperties = builder.materializeProperties
86+
requestOptions.bulkResults = builder.bulkResults
9187

9288
return *requestOptions
9389
}

0 commit comments

Comments
 (0)