@@ -77,7 +77,7 @@ pub trait SeriesValue: fmt::Debug + Clone + Send + Sync + 'static {
7777 fn should_stop ( & self , end : Self , step : & Self :: StepType , include_end : bool ) -> bool ;
7878
7979 /// Advance to the next value in the series
80- fn advance ( & mut self , step : & Self :: StepType ) -> Result < ( ) > ;
80+ fn advance ( & mut self , end : & mut Self , step : & Self :: StepType ) -> Result < ( ) > ;
8181
8282 /// Create an Arrow array from a vector of values
8383 fn create_array ( & self , values : Vec < Self :: ValueType > ) -> Result < ArrayRef > ;
@@ -97,8 +97,16 @@ impl SeriesValue for i64 {
9797 reach_end_int64 ( * self , end, * step, include_end)
9898 }
9999
100- fn advance ( & mut self , step : & Self :: StepType ) -> Result < ( ) > {
101- * self += step;
100+ fn advance ( & mut self , end : & mut Self , step : & Self :: StepType ) -> Result < ( ) > {
101+ if let Some ( next) = self . checked_add ( * step) {
102+ * self = next;
103+ } else {
104+ * end = if * step > 0 {
105+ self . saturating_sub ( 1 )
106+ } else {
107+ self . saturating_add ( 1 )
108+ } ;
109+ }
102110 Ok ( ( ) )
103111 }
104112
@@ -152,7 +160,7 @@ impl SeriesValue for TimestampValue {
152160 }
153161 }
154162
155- fn advance ( & mut self , step : & Self :: StepType ) -> Result < ( ) > {
163+ fn advance ( & mut self , _end : & mut Self , step : & Self :: StepType ) -> Result < ( ) > {
156164 let tz = self
157165 . parsed_tz
158166 . unwrap_or_else ( || Tz :: from_str ( "+00:00" ) . unwrap ( ) ) ;
@@ -250,16 +258,18 @@ impl GenerateSeriesTable {
250258 step,
251259 include_end,
252260 name,
253- } => Arc :: new ( RwLock :: new ( GenericSeriesState {
254- schema : self . schema ( ) ,
255- start : * start,
256- end : * end,
257- step : * step,
258- current : * start,
259- batch_size,
260- include_end : * include_end,
261- name,
262- } ) ) ,
261+ } => {
262+ Arc :: new ( RwLock :: new ( GenericSeriesState {
263+ schema : self . schema ( ) ,
264+ start : * start,
265+ end : * end,
266+ step : * step,
267+ current : * start,
268+ batch_size,
269+ include_end : * include_end,
270+ name,
271+ } ) )
272+ }
263273 GenSeriesArgs :: TimestampArgs {
264274 start,
265275 end,
@@ -391,7 +401,15 @@ impl<T: SeriesValue> LazyBatchGenerator for GenericSeriesState<T> {
391401 . should_stop ( self . end . clone ( ) , & self . step , self . include_end )
392402 {
393403 buf. push ( self . current . to_value_type ( ) ) ;
394- self . current . advance ( & self . step ) ?;
404+ if self
405+ . current
406+ . should_stop ( self . end . clone ( ) , & self . step , false )
407+ {
408+ self . current . advance ( & mut self . end , & self . step ) ?;
409+ break ;
410+ }
411+
412+ self . current . advance ( & mut self . end , & self . step ) ?;
395413 }
396414
397415 if buf. is_empty ( ) {
0 commit comments