@@ -29,7 +29,7 @@ module DataStores
2929
3030 class DirectFileStore
3131 class InvalidStoreSettingsError < StandardError ; end
32- AGGREGATION_MODES = [ MAX = :max , MIN = :min , SUM = :sum , ALL = :all ]
32+ AGGREGATION_MODES = [ MAX = :max , MIN = :min , SUM = :sum , ALL = :all , MOST_RECENT = :most_recent ]
3333 DEFAULT_METRIC_SETTINGS = { aggregation : SUM }
3434 DEFAULT_GAUGE_SETTINGS = { aggregation : ALL }
3535
@@ -121,15 +121,15 @@ def all_values
121121 stores_for_metric . each do |file_path |
122122 begin
123123 store = FileMappedDict . new ( file_path , true )
124- store . all_values . each do |( labelset_qs , v ) |
124+ store . all_values . each do |( labelset_qs , v , ts ) |
125125 # Labels come as a query string, and CGI::parse returns arrays for each key
126126 # "foo=bar&x=y" => { "foo" => ["bar"], "x" => ["y"] }
127127 # Turn the keys back into symbols, and remove the arrays
128128 label_set = CGI ::parse ( labelset_qs ) . map do |k , vs |
129129 [ k . to_sym , vs . first ]
130130 end . to_h
131131
132- stores_data [ label_set ] << v
132+ stores_data [ label_set ] << [ v , ts ]
133133 end
134134 ensure
135135 store . close if store
@@ -181,30 +181,41 @@ def process_id
181181 end
182182
183183 def aggregate_values ( values )
184- if @values_aggregation_mode == SUM
185- values . inject { |sum , element | sum + element }
186- elsif @values_aggregation_mode == MAX
187- values . max
188- elsif @values_aggregation_mode == MIN
189- values . min
190- elsif @values_aggregation_mode == ALL
191- values . first
184+ # Each entry in the `values` array is a tuple of `value` and `timestamp`,
185+ # so for all aggregations except `MOST_RECENT`, we need to only take the
186+ # first value in each entry and ignore the second.
187+ if @values_aggregation_mode == MOST_RECENT
188+ latest_tuple = values . max { |a , b | a [ 1 ] <=> b [ 1 ] }
189+ latest_tuple . first # return the value without the timestamp
192190 else
193- raise InvalidStoreSettingsError ,
194- "Invalid Aggregation Mode: #{ @values_aggregation_mode } "
191+ values = values . map ( &:first ) # Discard timestamps
192+
193+ if @values_aggregation_mode == SUM
194+ values . inject { |sum , element | sum + element }
195+ elsif @values_aggregation_mode == MAX
196+ values . max
197+ elsif @values_aggregation_mode == MIN
198+ values . min
199+ elsif @values_aggregation_mode == ALL
200+ values . first
201+ else
202+ raise InvalidStoreSettingsError ,
203+ "Invalid Aggregation Mode: #{ @values_aggregation_mode } "
204+ end
195205 end
196206 end
197207 end
198208
199209 private_constant :MetricStore
200210
201- # A dict of doubles, backed by an file we access directly a a byte array.
211+ # A dict of doubles, backed by an file we access directly as a byte array.
202212 #
203213 # The file starts with a 4 byte int, indicating how much of it is used.
204214 # Then 4 bytes of padding.
205215 # There's then a number of entries, consisting of a 4 byte int which is the
206216 # size of the next field, a utf-8 encoded string key, padding to an 8 byte
207- # alignment, and then a 8 byte float which is the value.
217+ # alignment, and then a 8 byte float which is the value, and then a 8 byte
218+ # float which is the unix timestamp when the value was set.
208219 class FileMappedDict
209220 INITIAL_FILE_SIZE = 1024 *1024
210221
@@ -236,7 +247,8 @@ def all_values
236247 @positions . map do |key , pos |
237248 @f . seek ( pos )
238249 value = @f . read ( 8 ) . unpack ( 'd' ) [ 0 ]
239- [ key , value ]
250+ timestamp = @f . read ( 8 ) . unpack ( 'd' ) [ 0 ]
251+ [ key , value , timestamp ]
240252 end
241253 end
242254 end
@@ -258,7 +270,7 @@ def write_value(key, value)
258270
259271 pos = @positions [ key ]
260272 @f . seek ( pos )
261- @f . write ( [ value ] . pack ( 'd ' ) )
273+ @f . write ( [ value , Time . now . to_f ] . pack ( 'dd ' ) )
262274 @f . flush
263275 end
264276
@@ -299,7 +311,7 @@ def resize_file(new_capacity)
299311 def init_value ( key )
300312 # Pad to be 8-byte aligned.
301313 padded = key + ( ' ' * ( 8 - ( key . length + 4 ) % 8 ) )
302- value = [ padded . length , padded , 0.0 ] . pack ( "lA#{ padded . length } d " )
314+ value = [ padded . length , padded , 0.0 , 0.0 ] . pack ( "lA#{ padded . length } dd " )
303315 while @used + value . length > @capacity
304316 @capacity *= 2
305317 resize_file ( @capacity )
@@ -310,7 +322,7 @@ def init_value(key)
310322 @f . seek ( 0 )
311323 @f . write ( [ @used ] . pack ( 'l' ) )
312324 @f . flush
313- @positions [ key ] = @used - 8
325+ @positions [ key ] = @used - 16
314326 end
315327
316328 # Read position of all keys. No locking is performed.
@@ -320,7 +332,7 @@ def populate_positions
320332 padded_len = @f . read ( 4 ) . unpack ( 'l' ) [ 0 ]
321333 key = @f . read ( padded_len ) . unpack ( "A#{ padded_len } " ) [ 0 ] . strip
322334 @positions [ key ] = @f . pos
323- @f . seek ( 8 , :CUR )
335+ @f . seek ( 16 , :CUR )
324336 end
325337 end
326338 end
0 commit comments