Skip to content

Commit e463295

Browse files
committed
Add shallow unmarshal step to MultiSearch
1 parent b273a7a commit e463295

6 files changed

Lines changed: 23 additions & 17 deletions

File tree

pkg/quickwit/client/client.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type ConfiguredFields struct {
3434

3535
// Client represents a client which can interact with elasticsearch api
3636
type Client interface {
37-
ExecuteMultisearch(r []*SearchRequest) (*MultiSearchResponse, error)
37+
ExecuteMultisearch(r []*SearchRequest) ([]*json.RawMessage, error)
3838
}
3939

4040
var logger = log.New()
@@ -77,7 +77,12 @@ func (c *baseClientImpl) makeRequest(method, uriPath, uriQuery string, body []by
7777
return req, nil
7878
}
7979

80-
func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) (*MultiSearchResponse, error) {
80+
// Multisearch uses a shallow unmarshalled struct to defer the decoding to downstream handlers
81+
type MultiSearchResponse struct {
82+
Responses []*json.RawMessage `json:"responses"`
83+
}
84+
85+
func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) ([]*json.RawMessage, error) {
8186
req, err := c.createMultiSearchRequest(requests, c.index)
8287
if err != nil {
8388
return nil, err
@@ -122,7 +127,7 @@ func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) (*MultiSe
122127
elapsed := time.Since(start)
123128
logger.Debug("Decoded multisearch json response", "took", elapsed)
124129

125-
return &msr, nil
130+
return msr.Responses, nil
126131
}
127132

128133
func (c *baseClientImpl) makeMultiSearchPayload(searchRequests []*SearchRequest, index string) ([]byte, error) {

pkg/quickwit/client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func TestClient_ExecuteMultisearch(t *testing.T) {
9494

9595
assert.Equal(t, "15s", jBody.GetPath("aggs", "2", "date_histogram", "fixed_interval").MustString())
9696

97-
require.Len(t, res.Responses, 1)
97+
require.Len(t, res, 1)
9898
})
9999
}
100100

pkg/quickwit/client/models.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ type SearchResponse struct {
5959
Hits *SearchResponseHits `json:"hits"`
6060
}
6161

62-
// MultiSearchResponse represents a multi search response
63-
type MultiSearchResponse struct {
64-
Responses []*SearchResponse `json:"responses"`
65-
}
66-
6762
// Query represents a query
6863
type Query struct {
6964
Bool *BoolQuery `json:"bool"`

pkg/quickwit/data_query_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,9 +1667,9 @@ func newFakeClient() *fakeClient {
16671667
}
16681668
}
16691669

1670-
func (c *fakeClient) ExecuteMultisearch(r []*es.SearchRequest) (*es.MultiSearchResponse, error) {
1670+
func (c *fakeClient) ExecuteMultisearch(r []*es.SearchRequest) ([]*json.RawMessage, error) {
16711671
c.multisearchRequests = append(c.multisearchRequests, r)
1672-
return c.multiSearchResponse, c.multiSearchError
1672+
return c.multiSearchResponse.Responses, c.multiSearchError
16731673
}
16741674

16751675
func newDataQuery(body string) (backend.QueryDataRequest, error) {
@@ -1721,5 +1721,5 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time)
17211721
return &backend.QueryDataResponse{}, err
17221722
}
17231723

1724-
return parseResponse(res.Responses, queries, configuredFields)
1724+
return parseResponse(res, queries, configuredFields)
17251725
}

pkg/quickwit/elasticsearch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func queryData(ctx context.Context, dataQueries []backend.DataQuery, dsInfo *es.
4545
return &backend.QueryDataResponse{}, err
4646
}
4747

48-
return parseResponse(res.Responses, queries, dsInfo.ConfiguredFields)
48+
return parseResponse(res, queries, dsInfo.ConfiguredFields)
4949
}
5050

5151
func handleQuickwitErrors(err error) (*backend.QueryDataResponse, error) {

pkg/quickwit/response_parser.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,23 @@ const (
4040

4141
var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString))
4242

43-
func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) {
43+
func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) {
4444
result := backend.QueryDataResponse{
4545
Responses: backend.Responses{},
4646
}
47-
if responses == nil {
47+
if rawResponses == nil {
4848
return &result, nil
4949
}
5050

51-
for i, res := range responses {
52-
target := targets[i]
51+
for i, rawRes := range rawResponses {
52+
var res *es.SearchResponse
53+
err := json.Unmarshal([]byte(*rawRes), &res)
54+
if nil != err {
55+
qwlog.Debug("Failed to unmarshal response", "err", err.Error(), "byteRes", *rawRes)
56+
continue
57+
}
5358

59+
target := targets[i]
5460
if res.Error != nil {
5561
errResult := getErrorFromElasticResponse(res)
5662
result.Responses[target.RefID] = backend.DataResponse{

0 commit comments

Comments
 (0)