Skip to content
Merged

v0.19.1 #2137

Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config-files/config-batcher-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ batcher:
# When replacing the message, how much higher should the max fee in comparison to the original one
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
min_bump_percentage: 10
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
balance_unlock_polling_interval_seconds: 600

2 changes: 2 additions & 0 deletions config-files/config-batcher-ethereum-package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ batcher:
# When replacing the message, how much higher should the max fee in comparison to the original one
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
min_bump_percentage: 10
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
balance_unlock_polling_interval_seconds: 600

2 changes: 2 additions & 0 deletions config-files/config-batcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ batcher:
# When replacing the message, how much higher should the max fee in comparison to the original one
# The calculation is replacement_max_fee >= original_max_fee + original_max_fee * min_bump_percentage / 100
min_bump_percentage: 10
# How often to poll for BalanceUnlocked events in seconds (default: 600 seconds = 10 minutes)
balance_unlock_polling_interval_seconds: 600
1 change: 1 addition & 0 deletions crates/batcher/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct BatcherConfigFromYaml {
pub non_paying: Option<NonPayingConfigFromYaml>,
pub amount_of_proofs_for_min_max_fee: usize,
pub min_bump_percentage: u64,
pub balance_unlock_polling_interval_seconds: u64,
}

#[derive(Debug, Deserialize)]
Expand Down
312 changes: 284 additions & 28 deletions crates/batcher/src/lib.rs

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions crates/batcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ async fn main() -> Result<(), BatcherError> {
}
});

// spawn task to poll for BalanceUnlocked events
tokio::spawn({
let app = batcher.clone();
async move {
app.poll_balance_unlocked_events()
.await
.expect("Error polling BalanceUnlocked events")
}
});

batcher.metrics.inc_batcher_restart();

batcher.listen_connections(&address).await?;
Expand Down
12 changes: 12 additions & 0 deletions crates/batcher/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct BatcherMetrics {
pub message_handler_user_lock_timeouts: IntCounter,
pub message_handler_batch_lock_timeouts: IntCounter,
pub message_handler_user_states_lock_timeouts: IntCounter,
pub unlocked_event_polling_batch_lock_timeouts: IntCounter,
pub available_data_services: IntGauge,
}

Expand Down Expand Up @@ -103,6 +104,11 @@ impl BatcherMetrics {
"Message Handler User States Lock Timeouts"
))?;

let unlocked_event_polling_batch_lock_timeouts = register_int_counter!(opts!(
"unlocked_event_polling_batch_lock_timeouts_count",
"Unlocked Event Polling Batch Lock Timeouts"
))?;

registry.register(Box::new(open_connections.clone()))?;
registry.register(Box::new(received_proofs.clone()))?;
registry.register(Box::new(sent_batches.clone()))?;
Expand All @@ -122,6 +128,7 @@ impl BatcherMetrics {
registry.register(Box::new(message_handler_user_lock_timeouts.clone()))?;
registry.register(Box::new(message_handler_batch_lock_timeouts.clone()))?;
registry.register(Box::new(message_handler_user_states_lock_timeouts.clone()))?;
registry.register(Box::new(unlocked_event_polling_batch_lock_timeouts.clone()))?;
registry.register(Box::new(available_data_services.clone()))?;

let metrics_route = warp::path!("metrics")
Expand Down Expand Up @@ -154,6 +161,7 @@ impl BatcherMetrics {
message_handler_user_lock_timeouts,
message_handler_batch_lock_timeouts,
message_handler_user_states_lock_timeouts,
unlocked_event_polling_batch_lock_timeouts,
available_data_services,
})
}
Expand Down Expand Up @@ -200,4 +208,8 @@ impl BatcherMetrics {
pub fn inc_message_handler_user_states_lock_timeouts(&self) {
self.message_handler_user_states_lock_timeouts.inc();
}

pub fn inc_unlocked_event_polling_batch_lock_timeout(&self) {
self.unlocked_event_polling_batch_lock_timeouts.inc();
}
}
45 changes: 45 additions & 0 deletions crates/batcher/src/retry/batcher_retryables.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use ethers::prelude::*;
use ethers::providers::Http;
use log::{info, warn};
use tokio::time::timeout;

Expand Down Expand Up @@ -285,3 +286,47 @@ pub async fn cancel_create_new_task_retryable(
"Receipt not found".to_string(),
)))
}

pub async fn get_current_block_number_retryable(
eth_http_provider: &Provider<Http>,
eth_http_provider_fallback: &Provider<Http>,
) -> Result<U64, RetryError<String>> {
if let Ok(block_number) = eth_http_provider.get_block_number().await {
return Ok(block_number);
}

eth_http_provider_fallback
.get_block_number()
.await
.map_err(|e| {
warn!("Failed to get current block number: {e}");
RetryError::Transient(e.to_string())
})
}

pub async fn query_balance_unlocked_events_retryable(
payment_service: &BatcherPaymentService,
payment_service_fallback: &BatcherPaymentService,
from_block: U64,
to_block: U64,
) -> Result<Vec<aligned_sdk::eth::batcher_payment_service::BalanceUnlockedFilter>, RetryError<String>>
{
let filter = payment_service
.balance_unlocked_filter()
.from_block(from_block)
.to_block(to_block);

if let Ok(events) = filter.query().await {
return Ok(events);
}

let filter_fallback = payment_service_fallback
.balance_unlocked_filter()
.from_block(from_block)
.to_block(to_block);

filter_fallback.query().await.map_err(|e| {
warn!("Failed to query BalanceUnlocked events: {e}");
RetryError::Transient(e.to_string())
})
}
30 changes: 25 additions & 5 deletions crates/batcher/src/types/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub enum TransactionSendError {
NoProofSubmitters,
NoFeePerProof,
InsufficientFeeForAggregator,
SubmissionInsufficientBalance,
SubmissionInsufficientBalance(Address),
BatchAlreadySubmitted,
InsufficientFunds,
OnlyBatcherAllowed,
Expand All @@ -30,8 +30,16 @@ impl From<Bytes> for TransactionSendError {
"0x3102f10c" => TransactionSendError::BatchAlreadySubmitted, // can happen, don't flush
"0x5c54305e" => TransactionSendError::InsufficientFunds, // shouldn't happen, don't flush
"0x152bc288" => TransactionSendError::OnlyBatcherAllowed, // won't happen, don't flush
"0x4f779ceb" => TransactionSendError::SubmissionInsufficientBalance, // shouldn't happen,
// flush can help if something went wrong
"0x4f779ceb" => {
// SubmissionInsufficientBalance(address sender, uint256 balance, uint256 required)
// Try to decode the address parameter (first parameter after selector)
let address = byte_string
.get(34..74) // Skip "0x" + selector (8 chars) + padding (24 chars)
.and_then(|hex_str| hex::decode(hex_str).ok())
.map(|bytes| Address::from_slice(&bytes))
.unwrap_or(Address::zero());
TransactionSendError::SubmissionInsufficientBalance(address)
}
_ => {
// flush because unkown error
TransactionSendError::Generic(format!("Unknown bytestring error: {}", byte_string))
Expand All @@ -58,6 +66,8 @@ pub enum BatcherError {
WsSinkEmpty,
AddressNotFoundInUserStates(Address),
QueueRemoveError(String),
StateCorruptedAndFlushed(String),
EthereumProviderError(String),
}

impl From<tungstenite::Error> for BatcherError {
Expand Down Expand Up @@ -139,6 +149,12 @@ impl fmt::Debug for BatcherError {
BatcherError::QueueRemoveError(e) => {
write!(f, "Error while removing entry from queue: {}", e)
}
BatcherError::StateCorruptedAndFlushed(reason) => {
write!(f, "Batcher state was corrupted and flushed: {}", reason)
}
BatcherError::EthereumProviderError(e) => {
write!(f, "Ethereum provider error: {}", e)
}
}
}
}
Expand All @@ -155,8 +171,12 @@ impl fmt::Display for TransactionSendError {
TransactionSendError::InsufficientFeeForAggregator => {
write!(f, "Insufficient fee for aggregator")
}
TransactionSendError::SubmissionInsufficientBalance => {
write!(f, "Submission insufficient balance")
TransactionSendError::SubmissionInsufficientBalance(address) => {
write!(
f,
"Submission insufficient balance for address: {:?}",
address
)
}
TransactionSendError::BatchAlreadySubmitted => {
write!(f, "Batch already submitted")
Expand Down
Loading
Loading