Skip to content

Commit 269abdc

Browse files
committed
fix: queue retained MQTT publishes during disconnect, replay on reconnect
A user reported that on cold start, a subset of HA-discovered entities sit at 'unavailable' indefinitely. Investigation confirmed cgateweb's own startup path is the dominant culprit: cgateWebBridge.start(): line 287 _updateBridgeReadiness('startup') -> publishNow line 299 haBridgeDiagnostics.publishNow('startup') line 301 _updateBridgeReadiness('startup-complete') -> publishNow These three calls fire ~17 retained publishes each before MqttManager has actually connected (the connect happens inside connectionManager .start() which only awaits start, not readiness). With config-once dedup that's ~35 publishes — matching the reporter's "38 dropped" roll-up almost exactly. Plus any state events that race during the MQTT settle window. MqttManager.publish() returned false and incremented a counter when disconnected, then logged "N publish(es) dropped while disconnected" on reconnect — but the messages themselves were lost forever, so HA had nothing to bind retained-state subscriptions to. Add a bounded retain-aware queue: - Map<topic, {payload, options}> — newest-wins per topic so a stale level=0 is correctly overwritten by a fresh level=128 if both queue during the same disconnect window. - Bounded by mqttPendingPublishMaxEntries (default 1000); when full, oldest entry is evicted and an evict count is warned on flush. - Non-retained publishes still drop (one-shot events whose meaning would be invalidated by replay). - Flush on (re)connect with a single info-line replay summary. - Errors mid-flush are logged per-topic but don't halt the rest. 8 new tests cover queueing, newest-wins, eviction, flush, mid-flush error tolerance, and non-retain pass-through. 1217/1217 passing. Bumps to 1.8.7.
1 parent 0ec88a0 commit 269abdc

5 files changed

Lines changed: 178 additions & 8 deletions

File tree

homeassistant-addon/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ All notable changes to the C-Gate Web Bridge Home Assistant add-on will be docum
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [1.8.7] - 2026-05-05
9+
10+
### Fixed
11+
- **Startup-race silent publish drops**: HA Discovery configs and initial state values published before the MQTT broker was fully connected were silently dropped — `MqttManager.publish()` incremented a counter but never replayed the lost messages. Affected entities sat at `unavailable` in Home Assistant indefinitely (until C-Gate happened to emit a fresh event for that group while MQTT was up). `cgateweb`'s own startup path is the largest culprit: `cgateWebBridge.start()` calls `_updateBridgeReadiness('startup')` and `haBridgeDiagnostics.publishNow('startup')` before the broker connects, so ~38 retained publishes per restart go to /dev/null.
12+
- `MqttManager` now keeps a bounded retain-aware queue of publishes attempted while disconnected. Map semantics give us newest-wins-per-topic so a stale `level=0` is correctly overwritten by a fresh `level=128` if both queue during the same disconnect window. The queue is bounded (default 1000 entries; configurable via `mqttPendingPublishMaxEntries`) and oldest entries are evicted with a warning if the broker stays unreachable. On (re)connect, the queue is flushed and the count is logged. Non-retained publishes (one-shot events whose meaning would be invalidated by replay) are still dropped — only retained state is queued.
13+
814
## [1.8.6] - 2026-05-05
915

1016
### Added

homeassistant-addon/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: "C-Gate Web Bridge"
2-
version: "1.8.6"
2+
version: "1.8.7"
33
slug: cgateweb
44
description: "Bridge between Clipsal C-Bus systems and MQTT/Home Assistant"
55
url: "https://github.com/dougrathbone/cgateweb"

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "cgateweb",
3-
"version": "1.8.6",
3+
"version": "1.8.7",
44
"description": "Node.js bridge connecting Clipsal C-Bus automation systems to MQTT for Home Assistant integration",
55
"keywords": [
66
"cbus",

src/mqttManager.js

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,21 @@ class MqttManager extends EventEmitter {
4545
this._lastStatusPayload = null;
4646
this._droppedPublishCount = 0;
4747
this._droppedPublishWarned = false;
48+
49+
// Pre-connect / disconnect publish queue. Retained-state publishes
50+
// attempted while the broker is unreachable are queued here and
51+
// replayed on (re)connect, so HA Discovery configs and initial state
52+
// values aren't silently lost during the startup race. Map semantics
53+
// give us newest-wins-per-topic — a stale level=0 is overwritten by a
54+
// fresh level=128 if both queue while disconnected. Bounded by
55+
// mqttPendingPublishMaxEntries (default 1000); when full, the oldest
56+
// entry is evicted.
57+
this._pendingPublishQueue = new Map();
58+
this._pendingPublishMaxEntries = Math.max(
59+
10, Number(settings && settings.mqttPendingPublishMaxEntries) || 1000
60+
);
61+
this._pendingPublishEvicted = 0;
62+
4863
this.logger = createLogger({ component: 'MqttManager' });
4964
this.errorHandler = createErrorHandler('MqttManager');
5065
}
@@ -118,8 +133,11 @@ class MqttManager extends EventEmitter {
118133
publish(topic, payload, options = {}) {
119134
if (!this.client || !this.connected) {
120135
this._droppedPublishCount++;
136+
if (options && options.retain) {
137+
this._enqueueRetainedPublish(topic, payload, options);
138+
}
121139
if (!this._droppedPublishWarned) {
122-
this.logger.warn('Cannot publish to MQTT: not connected (further drops suppressed until reconnect)');
140+
this.logger.warn('MQTT not connected; queueing retained publishes for replay on reconnect (further drops suppressed)');
123141
this._droppedPublishWarned = true;
124142
}
125143
return false;
@@ -134,6 +152,37 @@ class MqttManager extends EventEmitter {
134152
}
135153
}
136154

155+
_enqueueRetainedPublish(topic, payload, options) {
156+
// Newest-wins per topic — a fresh value supersedes a stale one queued
157+
// earlier in the same disconnect window.
158+
this._pendingPublishQueue.delete(topic);
159+
this._pendingPublishQueue.set(topic, { payload, options });
160+
161+
// Bounded queue: evict the oldest entry when over the cap. Map
162+
// iteration is insertion order so the first key is the oldest.
163+
if (this._pendingPublishQueue.size > this._pendingPublishMaxEntries) {
164+
const oldest = this._pendingPublishQueue.keys().next().value;
165+
this._pendingPublishQueue.delete(oldest);
166+
this._pendingPublishEvicted += 1;
167+
}
168+
}
169+
170+
_flushPendingPublishes() {
171+
if (this._pendingPublishQueue.size === 0) return;
172+
const entries = [...this._pendingPublishQueue.entries()];
173+
this._pendingPublishQueue.clear();
174+
let succeeded = 0;
175+
for (const [topic, { payload, options }] of entries) {
176+
try {
177+
this.client.publish(topic, payload, options);
178+
succeeded += 1;
179+
} catch (error) {
180+
this.logger.error(`Error replaying queued MQTT publish for ${topic}:`, { error });
181+
}
182+
}
183+
this.logger.info(`MQTT reconnected; replayed ${succeeded} queued retained publish(es)`);
184+
}
185+
137186
subscribe(topic, callback) {
138187
if (!this.client || !this.connected) {
139188
this.logger.warn(`Cannot subscribe to MQTT: not connected`);
@@ -213,13 +262,23 @@ class MqttManager extends EventEmitter {
213262
this.connected = true;
214263
this.logger.info(`CONNECTED TO MQTT BROKER: ${this.settings.mqtt}`);
215264

265+
if (this._pendingPublishEvicted > 0) {
266+
this.logger.warn(
267+
`MQTT publish queue exceeded max entries while disconnected; ${this._pendingPublishEvicted} oldest retained publish(es) evicted before flush`
268+
);
269+
this._pendingPublishEvicted = 0;
270+
}
216271
if (this._droppedPublishCount > 0) {
217-
this.logger.info(`MQTT reconnected; ${this._droppedPublishCount} publish(es) were dropped while disconnected`);
272+
const queued = this._pendingPublishQueue.size;
273+
this.logger.info(
274+
`MQTT reconnected; ${this._droppedPublishCount} publish(es) attempted while disconnected, ${queued} queued for replay`
275+
);
218276
this._droppedPublishCount = 0;
219277
}
220278
this._droppedPublishWarned = false;
221279

222280
this._publishStatus(this._bridgeReady ? MQTT_PAYLOAD_STATUS_ONLINE : MQTT_PAYLOAD_STATUS_OFFLINE);
281+
this._flushPendingPublishes();
223282

224283
// Subscribe to command topics
225284
this.subscribe(`${MQTT_TOPIC_PREFIX_WRITE}/#`, (err) => {

tests/mqttManager.test.js

Lines changed: 109 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ describe('MqttManager', () => {
187187
mqttManager.publish('test/topic', 'message');
188188

189189
expect(mockClient.publish).not.toHaveBeenCalled();
190-
expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining('Cannot publish to MQTT: not connected'));
190+
expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining('MQTT not connected'));
191191
});
192192

193193
it('should warn only once when publishing repeatedly while disconnected', () => {
@@ -213,7 +213,7 @@ describe('MqttManager', () => {
213213
mockClient.connected = true;
214214
mqttManager.client.emit('connect');
215215

216-
expect(infoSpy).toHaveBeenCalledWith(expect.stringContaining('17 publish(es) were dropped while disconnected'));
216+
expect(infoSpy).toHaveBeenCalledWith(expect.stringContaining('17 publish(es) attempted while disconnected'));
217217
expect(mqttManager._droppedPublishCount).toBe(0);
218218
expect(mqttManager._droppedPublishWarned).toBe(false);
219219
});
@@ -234,14 +234,119 @@ describe('MqttManager', () => {
234234
mockClient.publish.mockImplementation(() => {
235235
throw new Error('Publish failed');
236236
});
237-
237+
238238
mqttManager.publish('test/topic', 'message');
239-
239+
240240
expect(loggerSpy).toHaveBeenCalledWith(
241241
expect.stringContaining('Error publishing to MQTT'),
242242
expect.objectContaining({ error: expect.any(Error) })
243243
);
244244
});
245+
246+
describe('retain-aware pre-connect publish queue', () => {
247+
it('queues retained publishes while disconnected', () => {
248+
mqttManager.connected = false;
249+
mqttManager.publish('cbus/read/254/56/1/state', 'ON', { retain: true });
250+
expect(mqttManager._pendingPublishQueue.size).toBe(1);
251+
expect(mqttManager._pendingPublishQueue.get('cbus/read/254/56/1/state')).toEqual({
252+
payload: 'ON',
253+
options: { retain: true }
254+
});
255+
});
256+
257+
it('does not queue non-retained publishes (one-shot events)', () => {
258+
mqttManager.connected = false;
259+
mqttManager.publish('cbus/read/254/56/1/event', 'press', { retain: false });
260+
expect(mqttManager._pendingPublishQueue.size).toBe(0);
261+
});
262+
263+
it('newest-wins per topic when the same topic is published multiple times', () => {
264+
mqttManager.connected = false;
265+
mqttManager.publish('cbus/read/254/56/1/level', '0', { retain: true });
266+
mqttManager.publish('cbus/read/254/56/1/level', '128', { retain: true });
267+
mqttManager.publish('cbus/read/254/56/1/level', '255', { retain: true });
268+
expect(mqttManager._pendingPublishQueue.size).toBe(1);
269+
expect(mqttManager._pendingPublishQueue.get('cbus/read/254/56/1/level').payload).toBe('255');
270+
});
271+
272+
it('flushes the queue on reconnect', () => {
273+
mqttManager.connected = false;
274+
mqttManager.publish('cbus/read/254/56/1/state', 'ON', { retain: true });
275+
mqttManager.publish('cbus/read/254/56/2/state', 'OFF', { retain: true });
276+
expect(mqttManager._pendingPublishQueue.size).toBe(2);
277+
278+
mockClient.connected = true;
279+
mqttManager.client.emit('connect');
280+
281+
expect(mqttManager._pendingPublishQueue.size).toBe(0);
282+
expect(mockClient.publish).toHaveBeenCalledWith(
283+
'cbus/read/254/56/1/state', 'ON', { retain: true }
284+
);
285+
expect(mockClient.publish).toHaveBeenCalledWith(
286+
'cbus/read/254/56/2/state', 'OFF', { retain: true }
287+
);
288+
});
289+
290+
it('logs the replay count on reconnect', () => {
291+
mqttManager.connected = false;
292+
mqttManager.publish('cbus/read/254/56/1/state', 'ON', { retain: true });
293+
mqttManager.publish('cbus/read/254/56/2/state', 'OFF', { retain: true });
294+
295+
const infoSpy = jest.spyOn(mqttManager.logger, 'info');
296+
mockClient.connected = true;
297+
mqttManager.client.emit('connect');
298+
299+
expect(infoSpy).toHaveBeenCalledWith(expect.stringContaining('replayed 2 queued retained publish(es)'));
300+
});
301+
302+
it('evicts the oldest entry when the queue exceeds its cap', () => {
303+
mqttManager._pendingPublishMaxEntries = 3;
304+
mqttManager.connected = false;
305+
mqttManager.publish('a', 'first', { retain: true });
306+
mqttManager.publish('b', 'second', { retain: true });
307+
mqttManager.publish('c', 'third', { retain: true });
308+
mqttManager.publish('d', 'fourth', { retain: true });
309+
310+
expect(mqttManager._pendingPublishQueue.size).toBe(3);
311+
expect(mqttManager._pendingPublishQueue.has('a')).toBe(false);
312+
expect(mqttManager._pendingPublishQueue.has('d')).toBe(true);
313+
expect(mqttManager._pendingPublishEvicted).toBe(1);
314+
});
315+
316+
it('warns about evictions on reconnect', () => {
317+
mqttManager._pendingPublishMaxEntries = 2;
318+
mqttManager.connected = false;
319+
mqttManager.publish('a', 'x', { retain: true });
320+
mqttManager.publish('b', 'y', { retain: true });
321+
mqttManager.publish('c', 'z', { retain: true });
322+
323+
const warnSpy = jest.spyOn(mqttManager.logger, 'warn');
324+
mockClient.connected = true;
325+
mqttManager.client.emit('connect');
326+
327+
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('1 oldest retained publish(es) evicted'));
328+
expect(mqttManager._pendingPublishEvicted).toBe(0);
329+
});
330+
331+
it('continues replaying remaining publishes when one fails mid-flush', () => {
332+
mqttManager.connected = false;
333+
mqttManager.publish('topic1', 'a', { retain: true });
334+
mqttManager.publish('topic2', 'b', { retain: true });
335+
mqttManager.publish('topic3', 'c', { retain: true });
336+
337+
let callCount = 0;
338+
mockClient.publish.mockImplementation((topic) => {
339+
callCount++;
340+
if (topic === 'topic2') throw new Error('Mid-flush failure');
341+
});
342+
343+
mockClient.connected = true;
344+
mqttManager.client.emit('connect');
345+
346+
expect(callCount).toBeGreaterThanOrEqual(3);
347+
expect(mqttManager._pendingPublishQueue.size).toBe(0);
348+
});
349+
});
245350
});
246351

247352
describe('subscribe', () => {

0 commit comments

Comments
 (0)