Skip to content
This repository was archived by the owner on May 11, 2026. It is now read-only.

Commit 530a5a7

Browse files
authored
fix: ensure rav topic can handle v1 and v2 data (#118)
Signed-off-by: Tomás Migone <tomas@edgeandnode.com>
1 parent cdc191a commit 530a5a7

1 file changed

Lines changed: 26 additions & 10 deletions

File tree

src/kafka.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -313,18 +313,21 @@ mod ravs {
313313
return;
314314
}
315315
};
316-
let record = match parse_record(msg) {
317-
Ok(record) => record,
316+
let record = match parse_record(&msg) {
317+
Ok(ParseResult::V2(record)) => record,
318+
Ok(ParseResult::V1) => return,
318319
Err(record_parse_err) => {
319-
tracing::error!(%record_parse_err);
320+
let key = msg.key().map(String::from_utf8_lossy);
321+
let payload = msg.payload().map(String::from_utf8_lossy);
322+
tracing::error!(%record_parse_err, ?key, ?payload);
320323
return;
321324
}
322325
};
323326
if !signers.contains(&record.signer) {
324327
return;
325328
}
326329
tx.send_if_modified(|map| {
327-
match map.entry(record.receiver) {
330+
match map.entry(record.allocation) {
328331
std::collections::btree_map::Entry::Vacant(entry) => {
329332
entry.insert(record.value);
330333
}
@@ -343,18 +346,31 @@ mod ravs {
343346

344347
struct Record {
345348
signer: Address,
346-
receiver: Address,
349+
allocation: Address,
347350
value: u128,
348351
}
349352

350-
fn parse_record(msg: rdkafka::message::BorrowedMessage) -> anyhow::Result<Record> {
353+
enum ParseResult {
354+
V2(Record),
355+
V1,
356+
}
357+
358+
fn parse_record(msg: &rdkafka::message::BorrowedMessage) -> anyhow::Result<ParseResult> {
351359
let key = String::from_utf8_lossy(msg.key().context("missing key")?);
352360
let payload = String::from_utf8_lossy(msg.payload().context("missing payload")?);
353-
let (signer, receiver) = key.split_once(':').context("malformed key")?;
354-
Ok(Record {
361+
let (signer, id) = key.split_once(':').context("malformed key")?;
362+
// V1: allocation ID is 20 bytes (42 chars with 0x prefix)
363+
// V2: collection ID is 32 bytes (66 chars with 0x prefix)
364+
if id.len() == 42 {
365+
return Ok(ParseResult::V1);
366+
}
367+
anyhow::ensure!(id.len() == 66, "invalid id length: {}", id.len());
368+
// Allocation ID is the last 20 bytes of collection ID
369+
let allocation = &id[26..]; // skip "0x" + 24 zero chars (12 bytes padding)
370+
Ok(ParseResult::V2(Record {
355371
signer: signer.parse()?,
356-
receiver: receiver.parse()?,
372+
allocation: allocation.parse()?,
357373
value: payload.parse()?,
358-
})
374+
}))
359375
}
360376
}

0 commit comments

Comments
 (0)