Skip to content

Commit beeaf8f

Browse files
committed
feat: add configurable command channel capacity for KNX connector
1 parent 4707a2f commit beeaf8f

2 files changed

Lines changed: 67 additions & 27 deletions

File tree

aimdb-knx-connector/src/embassy_client.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,21 @@ use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
7070
use embassy_sync::channel::Channel;
7171
use static_cell::StaticCell;
7272

73-
/// Static channel for KNX commands (32 slots to match Tokio implementation)
74-
static KNX_COMMAND_CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, KnxCommand, 32>> =
75-
StaticCell::new();
73+
/// Capacity of the static KNX command channel.
74+
///
75+
/// Embassy requires a compile-time const generic — runtime configurability is
76+
/// not possible with `StaticCell<Channel<..., N>>`. Adjust this constant and
77+
/// recompile if your installation needs a larger buffer.
78+
const KNX_COMMAND_QUEUE_SIZE: usize = 32;
79+
80+
/// Static channel for KNX commands (capacity: [`KNX_COMMAND_QUEUE_SIZE`])
81+
static KNX_COMMAND_CHANNEL: StaticCell<
82+
Channel<CriticalSectionRawMutex, KnxCommand, KNX_COMMAND_QUEUE_SIZE>,
83+
> = StaticCell::new();
7684

7785
/// Get or initialize the command channel
78-
fn get_command_channel() -> &'static Channel<CriticalSectionRawMutex, KnxCommand, 32> {
86+
fn get_command_channel(
87+
) -> &'static Channel<CriticalSectionRawMutex, KnxCommand, KNX_COMMAND_QUEUE_SIZE> {
7988
KNX_COMMAND_CHANNEL.init(Channel::new())
8089
}
8190

@@ -265,7 +274,7 @@ impl ChannelState {
265274

266275
/// Internal KNX connector implementation
267276
pub struct KnxConnectorImpl {
268-
command_channel: &'static Channel<CriticalSectionRawMutex, KnxCommand, 32>,
277+
command_channel: &'static Channel<CriticalSectionRawMutex, KnxCommand, KNX_COMMAND_QUEUE_SIZE>,
269278
}
270279

271280
impl KnxConnectorImpl {
@@ -277,7 +286,7 @@ impl KnxConnectorImpl {
277286
runtime_ctx: Option<Arc<dyn core::any::Any + Send + Sync>>,
278287
) -> Result<
279288
(
280-
&'static Channel<CriticalSectionRawMutex, KnxCommand, 32>,
289+
&'static Channel<CriticalSectionRawMutex, KnxCommand, KNX_COMMAND_QUEUE_SIZE>,
281290
BoxFuture,
282291
),
283292
&'static str,
@@ -339,7 +348,11 @@ impl KnxConnectorImpl {
339348
gateway_addr: Ipv4Address,
340349
gateway_port: u16,
341350
router: Arc<Router>,
342-
command_channel: &'static Channel<CriticalSectionRawMutex, KnxCommand, 32>,
351+
command_channel: &'static Channel<
352+
CriticalSectionRawMutex,
353+
KnxCommand,
354+
KNX_COMMAND_QUEUE_SIZE,
355+
>,
343356
runtime_ctx: Option<Arc<dyn core::any::Any + Send + Sync>>,
344357
) {
345358
loop {
@@ -384,7 +397,11 @@ impl KnxConnectorImpl {
384397
gateway_addr: Ipv4Address,
385398
gateway_port: u16,
386399
router: &Router,
387-
command_channel: &'static Channel<CriticalSectionRawMutex, KnxCommand, 32>,
400+
command_channel: &'static Channel<
401+
CriticalSectionRawMutex,
402+
KnxCommand,
403+
KNX_COMMAND_QUEUE_SIZE,
404+
>,
388405
runtime_ctx: Option<&Arc<dyn core::any::Any + Send + Sync>>,
389406
) -> Result<(), &'static str> {
390407
// Create UDP socket with static buffers
@@ -1001,7 +1018,11 @@ impl KnxConnectorImpl {
10011018
/// If a topic provider is configured, it will be called for each value to
10021019
/// dynamically determine the KNX group address. Otherwise, the static default is used.
10031020
fn collect_outbound_futures(
1004-
command_channel: &'static Channel<CriticalSectionRawMutex, KnxCommand, 32>,
1021+
command_channel: &'static Channel<
1022+
CriticalSectionRawMutex,
1023+
KnxCommand,
1024+
KNX_COMMAND_QUEUE_SIZE,
1025+
>,
10051026
runtime_ctx: Arc<dyn core::any::Any + Send + Sync>,
10061027
outbound_routes: Vec<aimdb_core::OutboundRoute>,
10071028
) -> Vec<BoxFuture> {

aimdb-knx-connector/src/tokio_client.rs

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ enum KnxCommand {
6161
/// automatically monitors all required KNX group addresses.
6262
pub struct KnxConnectorBuilder {
6363
gateway_url: String,
64+
/// Capacity of the mpsc channel between outbound publishers and the
65+
/// connection task. Defaults to 32.
66+
command_queue_size: usize,
6467
}
6568

6669
impl KnxConnectorBuilder {
@@ -77,8 +80,19 @@ impl KnxConnectorBuilder {
7780
pub fn new(gateway_url: impl Into<String>) -> Self {
7881
Self {
7982
gateway_url: gateway_url.into(),
83+
command_queue_size: 32,
8084
}
8185
}
86+
87+
/// Override the internal command channel capacity (default: 32).
88+
///
89+
/// The channel sits between outbound publisher futures and the single
90+
/// connection task that serializes UDP sends. Increase this for
91+
/// installations with many outbound routes or bursty publish patterns.
92+
pub fn with_command_queue_size(mut self, size: usize) -> Self {
93+
self.command_queue_size = size;
94+
self
95+
}
8296
}
8397

8498
type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
@@ -114,21 +128,25 @@ impl<R: aimdb_executor::RuntimeAdapter + 'static> ConnectorBuilder<R> for KnxCon
114128
// 2. Receiver captured by `connection_future`.
115129
// 3. Sender cloned into each outbound publisher future below.
116130
let runtime_ctx = db.runtime_any();
117-
let (command_tx, connection_future) =
118-
KnxConnectorImpl::build_internal(&self.gateway_url, router, Some(runtime_ctx))
119-
.await
120-
.map_err(|e| {
121-
#[cfg(feature = "std")]
122-
{
123-
aimdb_core::DbError::RuntimeError {
124-
message: format!("Failed to build KNX connector: {}", e),
125-
}
126-
}
127-
#[cfg(not(feature = "std"))]
128-
{
129-
aimdb_core::DbError::RuntimeError { _message: () }
130-
}
131-
})?;
131+
let (command_tx, connection_future) = KnxConnectorImpl::build_internal(
132+
&self.gateway_url,
133+
router,
134+
Some(runtime_ctx),
135+
self.command_queue_size,
136+
)
137+
.await
138+
.map_err(|e| {
139+
#[cfg(feature = "std")]
140+
{
141+
aimdb_core::DbError::RuntimeError {
142+
message: format!("Failed to build KNX connector: {}", e),
143+
}
144+
}
145+
#[cfg(not(feature = "std"))]
146+
{
147+
aimdb_core::DbError::RuntimeError { _message: () }
148+
}
149+
})?;
132150

133151
let outbound_routes = db.collect_outbound_routes("knx");
134152

@@ -176,6 +194,7 @@ impl KnxConnectorImpl {
176194
gateway_url: &str,
177195
router: Router,
178196
runtime_ctx: Option<Arc<dyn core::any::Any + Send + Sync>>,
197+
command_queue_size: usize,
179198
) -> Result<(mpsc::Sender<KnxCommand>, BoxFuture), String> {
180199
// Parse the gateway URL
181200
let mut url = gateway_url.to_string();
@@ -202,7 +221,7 @@ impl KnxConnectorImpl {
202221

203222
// 1. Create command channel; receiver goes to the connection future,
204223
// sender is returned for the publisher futures to clone.
205-
let (command_tx, command_rx) = mpsc::channel::<KnxCommand>(32);
224+
let (command_tx, command_rx) = mpsc::channel::<KnxCommand>(command_queue_size);
206225

207226
// 2. Build the connection-task future (captures the receiver).
208227
let connection_future = build_connection_future(
@@ -1065,15 +1084,15 @@ mod tests {
10651084
async fn test_connector_creation_with_router() {
10661085
let router = RouterBuilder::new().build();
10671086
let connector =
1068-
KnxConnectorImpl::build_internal("knx://192.168.1.19:3671", router, None).await;
1087+
KnxConnectorImpl::build_internal("knx://192.168.1.19:3671", router, None, 32).await;
10691088
assert!(connector.is_ok());
10701089
}
10711090

10721091
#[tokio::test]
10731092
async fn test_connector_with_port() {
10741093
let router = RouterBuilder::new().build();
10751094
let connector =
1076-
KnxConnectorImpl::build_internal("knx://gateway.local:3672", router, None).await;
1095+
KnxConnectorImpl::build_internal("knx://gateway.local:3672", router, None, 32).await;
10771096
assert!(connector.is_ok());
10781097
}
10791098

0 commit comments

Comments
 (0)