forked from databricks/databricks-sql-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathipc_stream_iterator.go
More file actions
115 lines (97 loc) · 3.15 KB
/
ipc_stream_iterator.go
File metadata and controls
115 lines (97 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package arrowbased
import (
"bytes"
"context"
"io"
"github.com/databricks/databricks-sql-go/internal/cli_service"
"github.com/databricks/databricks-sql-go/internal/config"
"github.com/databricks/databricks-sql-go/internal/rows/rowscanner"
dbsqlrows "github.com/databricks/databricks-sql-go/rows"
"github.com/pierrec/lz4/v4"
)
// ipcStreamIterator provides access to raw Arrow IPC streams without deserialization
type ipcStreamIterator struct {
ctx context.Context
resultPageIterator rowscanner.ResultPageIterator
currentBatches []*cli_service.TSparkArrowBatch
currentIndex int
arrowSchemaBytes []byte
useLz4 bool
hasMorePages bool
}
// NewIPCStreamIterator creates an iterator that returns raw IPC streams
func NewIPCStreamIterator(
ctx context.Context,
resultPageIterator rowscanner.ResultPageIterator,
initialRowSet *cli_service.TRowSet,
schemaBytes []byte,
cfg *config.Config,
) (dbsqlrows.IPCStreamIterator, error) {
var useLz4 bool
if cfg != nil {
useLz4 = cfg.UseLz4Compression
}
var batches []*cli_service.TSparkArrowBatch
if initialRowSet != nil {
batches = initialRowSet.ArrowBatches
}
return &ipcStreamIterator{
ctx: ctx,
resultPageIterator: resultPageIterator,
currentBatches: batches,
currentIndex: 0,
arrowSchemaBytes: schemaBytes,
useLz4: useLz4,
hasMorePages: resultPageIterator != nil && resultPageIterator.HasNext(),
}, nil
}
// NextIPCStream returns the next Arrow batch as a raw IPC stream
func (it *ipcStreamIterator) NextIPCStream() (io.Reader, error) {
// Check if we need to load more batches from the next page
if it.currentIndex >= len(it.currentBatches) {
if !it.hasMorePages || it.resultPageIterator == nil {
return nil, io.EOF
}
// Fetch next page
fetchResult, err := it.resultPageIterator.Next()
if err != nil {
return nil, err
}
if fetchResult == nil || fetchResult.Results == nil || fetchResult.Results.ArrowBatches == nil {
return nil, io.EOF
}
it.currentBatches = fetchResult.Results.ArrowBatches
it.currentIndex = 0
it.hasMorePages = it.resultPageIterator.HasNext()
// If no batches in this page, recurse to try next page
if len(it.currentBatches) == 0 {
return it.NextIPCStream()
}
}
batch := it.currentBatches[it.currentIndex]
it.currentIndex++
// Create reader for the batch data
var batchReader io.Reader = bytes.NewReader(batch.Batch)
// Handle LZ4 decompression if needed
if it.useLz4 {
batchReader = lz4.NewReader(batchReader)
}
// Combine schema and batch data into a complete IPC stream
// Arrow IPC format expects: [Schema][Batch1][Batch2]...
return io.MultiReader(
bytes.NewReader(it.arrowSchemaBytes),
batchReader,
), nil
}
// HasNext returns true if there are more batches
func (it *ipcStreamIterator) HasNext() bool {
return it.currentIndex < len(it.currentBatches) || it.hasMorePages
}
// Close releases any resources
func (it *ipcStreamIterator) Close() {
// Nothing to close for this implementation
}
// GetSchemaBytes returns the Arrow schema in IPC format
func (it *ipcStreamIterator) GetSchemaBytes() ([]byte, error) {
return it.arrowSchemaBytes, nil
}