@@ -35,8 +35,10 @@ type DatapointClient interface {
3535 // - WithTemporalExtent: specifies the time or data point interval for which data should be loaded. (Required)
3636 // - WithSpatialExtent: specifies the spatial extent for which data should be loaded. (Optional)
3737 // - WithSkipData: can be used to skip the actual data when loading datapoints, only returning required datapoint fields. (Optional)
38+ // - WithCursor: starts the query after a cursor returned by a previous page. (Optional)
39+ // - WithLimit: limits the total number of datapoints returned. Defaults to unlimited.
3840 //
39- // The datapoints are lazily loaded and returned as a sequence of bytes.
41+ // Pagination is handled automatically under the hood. The datapoints are lazily loaded and returned as a sequence of bytes.
4042 // The output sequence can be transformed into a proto.Message using CollectAs / As.
4143 //
4244 // Example usage:
@@ -56,6 +58,17 @@ type DatapointClient interface {
5658 // Documentation: https://docs.tilebox.com/datasets/query/querying-data
5759 Query (ctx context.Context , datasetID uuid.UUID , options ... QueryOption ) iter.Seq2 [[]byte , error ]
5860
61+ // QueryPage returns a single page of datapoints from one or more collections of the same dataset.
62+ //
63+ // Options:
64+ // - WithCollections / WithCollectionIDs: specifies the collections to query. If no collections are specified, all collections of the dataset will be queried. (Optional)
65+ // - WithTemporalExtent: specifies the time or data point interval for which data should be loaded. (Required)
66+ // - WithSpatialExtent: specifies the spatial extent for which data should be loaded. (Optional)
67+ // - WithSkipData: can be used to skip the actual data when loading datapoints, only returning required datapoint fields. (Optional)
68+ // - WithCursor: starts the query after a cursor returned by a previous page. (Optional)
69+ // - WithLimit: limits the number of datapoints returned in this page. Defaults to the server default.
70+ QueryPage (ctx context.Context , datasetID uuid.UUID , options ... QueryOption ) (* DatapointPage , error )
71+
5972 // QueryInto queries datapoints from one or more collections of the same dataset into a slice of datapoints of a
6073 // compatible proto.Message type.
6174 //
@@ -90,6 +103,27 @@ type DatapointClient interface {
90103 DeleteIDs (ctx context.Context , collectionID uuid.UUID , datapointIDs []uuid.UUID ) (int64 , error )
91104}
92105
106+ // DatapointPage is a single page of datapoints returned by a manual page query.
107+ type DatapointPage struct {
108+ // Datapoints is the raw protobuf-encoded datapoint data in this page.
109+ Datapoints [][]byte
110+ // NextCursor can be used to request the next page. Nil means there are no more pages.
111+ NextCursor * Cursor
112+ }
113+
114+ // Cursor identifies where to continue a paginated datapoint query.
115+ type Cursor = query.Cursor
116+
117+ // NewCursor creates a cursor that starts after the datapoint with the given ID.
118+ func NewCursor (startingAfter uuid.UUID ) * Cursor {
119+ return query .NewCursor (startingAfter )
120+ }
121+
122+ // ParseCursor parses a cursor string returned by Cursor.String.
123+ func ParseCursor (value string ) (* Cursor , error ) {
124+ return query .ParseCursor (value )
125+ }
126+
93127var _ DatapointClient = & datapointClient {}
94128
95129type datapointClient struct {
@@ -103,6 +137,8 @@ type queryOptions struct {
103137 spatialExtent query.SpatialExtent
104138 skipData bool
105139 collectionIDs []uuid.UUID
140+ cursor * Cursor
141+ limit int64
106142}
107143
108144// QueryOption is an interface for configuring a Query request.
@@ -156,6 +192,27 @@ func WithSkipData() QueryOption {
156192 }
157193}
158194
195+ // WithCursor starts the query after a cursor returned by a previous page.
196+ func WithCursor (cursor * Cursor ) QueryOption {
197+ return func (cfg * queryOptions ) {
198+ cfg .cursor = cursor
199+ }
200+ }
201+
202+ // WithLimit limits the number of query results returned.
203+ //
204+ // For auto-paginated query methods, the limit applies to the total number of datapoints yielded. For page query
205+ // methods, the limit applies to the single page returned.
206+ //
207+ // Defaults to unlimited.
208+ func WithLimit (limit int64 ) QueryOption {
209+ return func (cfg * queryOptions ) {
210+ if limit > 0 {
211+ cfg .limit = limit
212+ }
213+ }
214+ }
215+
159216func (d datapointClient ) GetInto (ctx context.Context , datasetID uuid.UUID , datapointID uuid.UUID , datapoint proto.Message , options ... QueryOption ) error {
160217 cfg := & queryOptions {
161218 skipData : false ,
@@ -185,60 +242,86 @@ func (d datapointClient) Query(ctx context.Context, datasetID uuid.UUID, options
185242 option (cfg )
186243 }
187244
188- if cfg .temporalExtent == nil {
189- return func (yield func ([]byte , error ) bool ) {
190- // right now we return an error, in the future we might want to support queries without a temporal extent
191- yield (nil , errors .New ("temporal extent is required" ))
192- }
193- }
194-
195245 return func (yield func ([]byte , error ) bool ) {
196- var page * tileboxv1.Pagination // nil for the first request
197-
198- // we already validated that temporalExtent is not nil
199- timeInterval := cfg .temporalExtent .ToProtoTimeInterval ()
200- datapointInterval := cfg .temporalExtent .ToProtoIDInterval ()
201-
202- if timeInterval == nil && datapointInterval == nil {
203- yield (nil , errors .New ("invalid temporal extent" ))
204- return
205- }
206-
207- filters := datasetsv1.QueryFilters_builder {
208- TimeInterval : timeInterval ,
209- DatapointInterval : datapointInterval ,
210- }.Build ()
246+ cursor := cfg .cursor
247+ remaining := cfg .limit
211248
212- if cfg .spatialExtent != nil {
213- spatialExtent , err := cfg .spatialExtent .ToProtoSpatialFilter ()
214- if err != nil {
215- yield (nil , err )
216- return
249+ for {
250+ pageOpts := * cfg
251+ pageOpts .cursor = cursor
252+ if cfg .limit > 0 {
253+ if remaining == 0 {
254+ break
255+ }
256+ pageOpts .limit = remaining
217257 }
218- filters .SetSpatialExtent (spatialExtent )
219- }
220258
221- for {
222- datapointsMessage , err := d .dataAccessService .Query (ctx , datasetID , cfg .collectionIDs , filters , page , cfg .skipData )
259+ datapointsPage , err := d .queryPage (ctx , datasetID , & pageOpts )
223260 if err != nil {
224261 yield (nil , err )
225262 return
226263 }
227264
228- for _ , data := range datapointsMessage .GetData ().GetValue () {
265+ for _ , data := range datapointsPage .Datapoints {
266+ if cfg .limit > 0 && remaining == 0 {
267+ return
268+ }
269+
229270 if ! yield (data , nil ) {
230271 return
231272 }
273+ if cfg .limit > 0 {
274+ remaining --
275+ }
232276 }
233277
234- page = datapointsMessage . GetNextPage ()
235- if page == nil {
278+ cursor = datapointsPage . NextCursor
279+ if cursor == nil || ( cfg . limit > 0 && remaining == 0 ) {
236280 break
237281 }
238282 }
239283 }
240284}
241285
286+ func (d datapointClient ) QueryPage (ctx context.Context , datasetID uuid.UUID , options ... QueryOption ) (* DatapointPage , error ) {
287+ cfg := & queryOptions {
288+ skipData : false ,
289+ }
290+ for _ , option := range options {
291+ option (cfg )
292+ }
293+ return d .queryPage (ctx , datasetID , cfg )
294+ }
295+
296+ func paginationFromOptions (limit int64 , cursor * Cursor ) * tileboxv1.Pagination {
297+ if limit <= 0 && cursor == nil {
298+ return nil
299+ }
300+
301+ var startingAfter * tileboxv1.ID
302+ if cursor != nil {
303+ startingAfter = tileboxv1 .NewUUID (cursor .StartingAfter ())
304+ }
305+
306+ if limit <= 0 {
307+ return tileboxv1.Pagination_builder {
308+ StartingAfter : startingAfter ,
309+ }.Build ()
310+ }
311+
312+ return tileboxv1.Pagination_builder {
313+ Limit : & limit ,
314+ StartingAfter : startingAfter ,
315+ }.Build ()
316+ }
317+
318+ func cursorFromPagination (page * tileboxv1.Pagination ) * Cursor {
319+ if page == nil || page .GetStartingAfter () == nil {
320+ return nil
321+ }
322+ return NewCursor (page .GetStartingAfter ().AsUUID ())
323+ }
324+
242325func (d datapointClient ) QueryInto (ctx context.Context , datasetID uuid.UUID , datapoints any , options ... QueryOption ) error {
243326 err := validateDatapoints (datapoints )
244327 if err != nil {
@@ -387,6 +470,42 @@ func (d datapointClient) DeleteIDs(ctx context.Context, collectionID uuid.UUID,
387470 return numDeleted , nil
388471}
389472
473+ func (d datapointClient ) queryPage (ctx context.Context , datasetID uuid.UUID , cfg * queryOptions ) (* DatapointPage , error ) {
474+ if cfg .temporalExtent == nil {
475+ // right now we return an error, in the future we might want to support queries without a temporal extent
476+ return nil , errors .New ("temporal extent is required" )
477+ }
478+
479+ timeInterval := cfg .temporalExtent .ToProtoTimeInterval ()
480+ datapointInterval := cfg .temporalExtent .ToProtoIDInterval ()
481+ if timeInterval == nil && datapointInterval == nil {
482+ return nil , errors .New ("invalid temporal extent" )
483+ }
484+
485+ filters := datasetsv1.QueryFilters_builder {
486+ TimeInterval : timeInterval ,
487+ DatapointInterval : datapointInterval ,
488+ }.Build ()
489+
490+ if cfg .spatialExtent != nil {
491+ spatialExtent , err := cfg .spatialExtent .ToProtoSpatialFilter ()
492+ if err != nil {
493+ return nil , err
494+ }
495+ filters .SetSpatialExtent (spatialExtent )
496+ }
497+
498+ datapointsMessage , err := d .dataAccessService .Query (ctx , datasetID , cfg .collectionIDs , filters , paginationFromOptions (cfg .limit , cfg .cursor ), cfg .skipData )
499+ if err != nil {
500+ return nil , err
501+ }
502+
503+ return & DatapointPage {
504+ Datapoints : datapointsMessage .GetData ().GetValue (),
505+ NextCursor : cursorFromPagination (datapointsMessage .GetNextPage ()),
506+ }, nil
507+ }
508+
390509// CollectAs converts a sequence of bytes into a slice of proto.Message.
391510func CollectAs [T proto.Message ](seq iter.Seq2 [[]byte , error ]) ([]T , error ) {
392511 return Collect (As [T ](seq ))
0 commit comments