Skip to content

Commit d1416fa

Browse files
committed
Start a resampler for each requested metric for a component
This is a bugfix, because earlier there was only one resampler for each component and data from all metrics were coming from that one resampler, which had the data only for the first metric that was subscribed to. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 01ff07d commit d1416fa

1 file changed

Lines changed: 43 additions & 24 deletions

File tree

src/logical_meter/logical_meter_actor.rs

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ impl LogicalMeterActor {
9595
}
9696

9797
pub async fn run(mut self) {
98-
let mut resamplers: HashMap<u64, ComponentDataResampler> = HashMap::new();
99-
let mut formulas: HashMap<String, LogicalMeterFormula> = HashMap::new();
98+
let mut resamplers: HashMap<(u64, Metric), ComponentDataResampler> = HashMap::new();
99+
let mut formulas: HashMap<(String, Metric), LogicalMeterFormula> = HashMap::new();
100100

101101
loop {
102102
tokio::select! {
@@ -109,7 +109,7 @@ impl LogicalMeterActor {
109109
match instruction {
110110
Some(Instruction::SubscribeFormula{formula, metric, response_tx}) => {
111111
if let Err(err) = self.handle_subscribe_formula(
112-
&formula,
112+
formula,
113113
metric,
114114
response_tx,
115115
&mut formulas,
@@ -145,25 +145,27 @@ impl LogicalMeterActor {
145145
/// in the formula, if it does not already exist.
146146
async fn handle_subscribe_formula(
147147
&mut self,
148-
formula: &str,
148+
formula: String,
149149
metric: Metric,
150150
receiver_tx: oneshot::Sender<broadcast::Receiver<Sample>>,
151-
formulas: &mut HashMap<String, LogicalMeterFormula>,
152-
resamplers: &mut HashMap<u64, ComponentDataResampler>,
151+
formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>,
152+
resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>,
153153
) -> Result<(), Error> {
154-
if formulas.contains_key(formula) {
154+
let formula_key = (formula, metric);
155+
if formulas.contains_key(&formula_key) {
155156
receiver_tx
156-
.send(formulas[formula].sender.subscribe())
157+
.send(formulas[&formula_key].sender.subscribe())
157158
.map_err(|_| Error::internal("Failed to send receiver for formula".to_string()))?;
158159
return Ok(());
159160
}
160161

161-
let formula_engine = FormulaEngine::try_new(formula)
162+
let formula_engine = FormulaEngine::try_new(&formula_key.0)
162163
.map_err(|e| Error::formula_engine_error(format!("Failed to parse formula: {e}")))?;
163164
let (sender, receiver) = broadcast::channel(8);
164165

165166
for component_id in formula_engine.components() {
166-
if resamplers.contains_key(component_id) {
167+
let resampler_key = (*component_id, metric);
168+
if resamplers.contains_key(&resampler_key) {
167169
continue;
168170
}
169171
let resampler = ComponentDataResampler {
@@ -180,11 +182,11 @@ impl LogicalMeterActor {
180182
),
181183
receiver: self.client.get_component_data_stream(*component_id).await?,
182184
};
183-
resamplers.insert(*component_id, resampler);
185+
resamplers.insert(resampler_key, resampler);
184186
}
185187

186188
formulas.insert(
187-
formula.to_string(),
189+
formula_key,
188190
LogicalMeterFormula {
189191
formula: formula_engine,
190192
sender,
@@ -200,8 +202,8 @@ impl LogicalMeterActor {
200202
/// Resamples component data and evaluates formulas for the next timestamp.
201203
fn do_next(
202204
&mut self,
203-
resamplers: &mut HashMap<u64, ComponentDataResampler>,
204-
formulas: &mut HashMap<String, LogicalMeterFormula>,
205+
resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>,
206+
formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>,
205207
) -> Result<(), Error> {
206208
let mut comp_data = HashMap::new();
207209
for (_, resampler) in resamplers.iter_mut() {
@@ -219,33 +221,45 @@ impl LogicalMeterActor {
219221
}
220222

221223
let mut formulas_to_drop = vec![];
222-
for (formula_str, formula) in formulas.iter_mut() {
224+
for (formula_key, formula) in formulas.iter_mut() {
223225
let result = formula.formula.calculate(&comp_data).map_err(|e| {
224226
Error::formula_engine_error(format!("Failed to evaluate formula: {e}"))
225227
})?;
226228

227229
if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) {
228-
tracing::debug!("No remaining subscribers for formula: {formula_str}. Err: {e}");
229-
formulas_to_drop.push(formula_str.to_string());
230+
tracing::debug!(
231+
"No remaining subscribers for formula: {}:({}). Err: {e}",
232+
formula_key.1.as_str_name(),
233+
formula_key.0
234+
);
235+
formulas_to_drop.push(formula_key.clone());
230236
}
231237
}
232238

233-
for formula_str in &formulas_to_drop {
234-
if let Some(formula) = formulas.remove(formula_str) {
235-
tracing::debug!("Dropping formula: {}", formula_str);
239+
for formula_key in &formulas_to_drop {
240+
if let Some(formula) = formulas.remove(formula_key) {
241+
tracing::debug!(
242+
"Dropping formula: {}:({})",
243+
formula_key.1.as_str_name(),
244+
formula_key.0
245+
);
236246
drop(formula);
237247
}
238248
}
239249
if !formulas_to_drop.is_empty() {
240-
let mut components = HashSet::<u64>::new();
241-
for (_, formula) in formulas.iter() {
242-
components.extend(formula.formula.components());
250+
let mut components = HashSet::<(u64, Metric)>::new();
251+
for ((_, metric), formula) in formulas.iter() {
252+
components.extend(formula.formula.components().iter().map(|&id| (id, *metric)));
243253
}
244254
resamplers.retain(|component_id, _| {
245255
if components.contains(component_id) {
246256
true
247257
} else {
248-
tracing::debug!("Dropping resampler for component {}", component_id);
258+
tracing::debug!(
259+
"Dropping resampler for component {}:{}",
260+
component_id.0,
261+
component_id.1.as_str_name()
262+
);
249263
false
250264
}
251265
});
@@ -268,6 +282,11 @@ impl LogicalMeterActor {
268282
.iter()
269283
.find(|s| s.metric == metric as i32)
270284
else {
285+
tracing::warn!(
286+
"No data for metric {:?} in component {}",
287+
metric,
288+
resampler.component_id
289+
);
271290
return;
272291
};
273292
let timestamp = if let Some(timestamp) = dd.sampled_at {

0 commit comments

Comments
 (0)