Skip to content

[2.0] Refactor MQTT session management to use store-based persistence#1806

Open
jfallows wants to merge 39 commits into
developfrom
claude/mqtt-session-kafka-removal-rGtCy
Open

[2.0] Refactor MQTT session management to use store-based persistence#1806
jfallows wants to merge 39 commits into
developfrom
claude/mqtt-session-kafka-removal-rGtCy

Conversation

@jfallows
Copy link
Copy Markdown
Contributor

Description

This PR refactors MQTT session management to use a store-based persistence model instead of Kafka consumer group coordination. The changes introduce a new session storage mechanism that allows MQTT brokers to persist and retrieve session state independently, enabling better session takeover and redirect capabilities.

Key Changes

Core Session Management:

  • Introduced MqttKafkaSessionOffsetsHelper to manage session offset persistence in a dedicated store
  • Added MqttKafkaSessionOffsetsFW type for serializing session offset metadata
  • Removed Kafka consumer group-based session coordination (KafkaGroup, KafkaGroupBeginExFW, KafkaGroupFlushExFW)
  • Updated MqttKafkaSessionFactory to use the new store-based approach instead of group coordination

MQTT Server Enhancements:

  • Added SERVER_BUSY reason code support for connection rejection
  • Introduced InstanceId class for unique broker instance identification
  • Added Closeable interface support for proper resource cleanup
  • Enhanced session takeover and redirect handling with timestamp-based tracking

Configuration & Storage:

  • Added memory store configuration support to MQTT options
  • Updated MqttConfiguration to support configurable session stores
  • Modified MqttBindingConfig to include store references
  • Updated example configuration to include memory store definition

Test Updates:

  • Added MqttKafkaSessionOffsetsHelperTest for session offset persistence logic
  • Removed obsolete Kafka group-based session test scenarios
  • Updated MQTT-Kafka integration tests to work with new session model
  • Added new MQTT session redirect and takeover test scenarios

Specification Changes:

  • Updated IDL to include MqttKafkaSessionOffsets structure
  • Removed test scripts for deprecated Kafka group coordination flows
  • Simplified session management test scripts by removing group-based coordination
  • Added new test scenarios for store-based session management

Motivation

The previous Kafka consumer group-based approach had limitations for session management across multiple broker instances. The new store-based model provides:

  • Better isolation between session state and message publishing
  • Simpler session takeover and redirect logic
  • Support for external session stores (memory, persistent, etc.)
  • Cleaner separation of concerns between session management and message flow

Testing

Existing unit and integration tests have been updated to validate the new session persistence model. The test suite covers session creation, takeover, redirect, and expiry scenarios with the store-based approach.

https://claude.ai/code/session_01Gaf31VM5LZAFmUvtuBmqfz

claude added 30 commits May 26, 2026 22:33
)

Add an optional, currently-unused `store` field to the mqtt-kafka binding
options so configurations can prepare for StoreHandler-based session
ownership coordination. When `store` is absent, log a one-time deprecation
warning per binding instance announcing the upcoming requirement. Existing
highlander-group ownership behavior is unchanged.

https://claude.ai/code/session_0186rZE8mW1U947XTN4kzZ1x
Add a failing schema test asserting that an mqtt-kafka proxy configuration
without the 'store' option is rejected, ahead of making 'store' required as
part of migrating session ownership to the Store SPI. The companion
proxy.missing.store.yaml omits 'store'; validation currently accepts it
(store remains optional per #1797), so the test is red until the schema
marks 'store' required.

https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
Introduce internal MqttKafkaConfiguration Duration properties
session.lease and session.renew, sizing the StoreHandler-based session
ownership lease TTL and renewal interval. Defaults (30s lease, 10s renew)
favor fast cooperative takeover with a bounded TTL fallback. These are
runtime tunables, not binding options, and will be consumed by the
Store-based ownership coordination in MqttKafkaSessionFactory.

https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
…1798)

Mark the 'store' option required in the mqtt-kafka schema and declare a
store on every binding configuration, completing the deprecation begun in
#1797. Spec/test configs reference a 'test' store; the runnable example
uses a 'memory' store. SchemaTest now also registers the engine test-store
schema patch so 'type: test' validates, and shouldRejectProxyWithoutStore
passes now that the option is required.

https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
Resolve the configured store reference to a StoreHandler when attaching an
mqtt-kafka binding, exposing it on MqttKafkaBindingConfig for the session
ownership coordination that will replace the Kafka consumer-group protocol.
Mirrors the binding.resolveId + supplyStore pattern used by binding-mcp.

https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
Define MqttKafkaSessionOffsets / MqttKafkaSessionOffsetMetadata flyweights
to carry per-(topic,partition) consumed offset, idempotent producer state,
and in-flight QoS packet IDs as a single compacted mqtt-sessions record,
replacing per-clientId Kafka consumer-group offset commit metadata.
…1799)

KafkaSessionOffsetsHelper encodes/decodes the per-session QoS offset state
(per-(topic,partition) consumed offset, idempotent producer state, in-flight
packet IDs) to/from the MqttKafkaSessionOffsets flyweight, with a round-trip
unit test. Adds partitionId/producerSequence carriers to KafkaOffsetMetadata.
)

Add mqtt_kafka:sessionOffsets() / matchSessionOffsets() functions so .rpt
scripts can construct and assert the MqttKafkaSessionOffsets blob as a
mqtt-sessions record value, with round-trip and match/mismatch unit tests.
#1799)

Replace the per-clientId consumer-group offsetFetch/offsetCommit Kafka
choreography with a client#offsets record on the compacted mqtt-sessions
topic: a single merged FETCH reads the prior offsets/producer/in-flight
state, and each commit point becomes a merged PRODUCE of a full
MqttKafkaSessionOffsets snapshot. initProducerId + meta retained; #migrate
and highlander group left for a later removal pass.

Note: k3po ITs cannot run in the current execution environment (control
agent fails to start), so this scenario is validated by CI, not locally.
)

Replace the per-clientId Kafka consumer-group offsetFetch/offsetCommit
choreography with a client#offsets record on the compacted mqtt-sessions
topic. Session resume reads the record via a merged FETCH_ONLY stream and
decodes the MqttKafkaSessionOffsets blob (offsets, idempotent producer
state, in-flight QoS2 packet IDs); each commit point produces a full
snapshot via a merged PRODUCE_ONLY stream keyed clientId#offsets. meta,
initProducerId, group and #migrate are unchanged.

Verified: MqttKafkaPublishProxyIT#shouldSendMessageQos2 passes against the
engine; KafkaIT#shouldPublishQoS2Message and the helper unit test stay green.
Sibling QoS2 scenarios remain on the old choreography pending spec rewrite.
…o #offsets (#1799)

Rewrite the publish.qos2.retained and publish.mixture.qos kafka scripts to
the client#offsets record model. Verified against the engine:
MqttKafkaPublishProxyIT#shouldSendMessageQos2Retained and
#shouldSendMessageMixtureQos both pass.
…1799)

During session resume the offset-fetch branch created the session stream
immediately AND again after the seed #offsets produce was acked, so the
duplicate stream left the PUBREL->doCommitOffsetComplete->produce completion
path unreachable (no PUBCOMP). Drop the premature doCreateSessionStream so
recovery matches the fresh-session path (session stream created once, after
the seed snapshot is acked). Migrates publish.qos2.recovery spec to #offsets.

Verified: MqttKafkaPublishProxyIT#shouldSendMessageQos2DuringRecovery passes,
no regression across the qos2/retained/mixture scenarios.
…ort specs to #offsets (#1799)

Rewrite publish.qos2.init.producer.abort and publish.qos2.offset.commit.abort
phase1/phase2 kafka scripts to the client#offsets model (FETCH_ONLY preamble
and PRODUCE_ONLY commit streams). Verified against the engine: the three
corresponding MqttKafkaPublishProxyIT abort methods pass (meta.abort already
passed unchanged).
…ct (#1799)

When the merged FETCH_ONLY client#offsets stream aborts before delivering
the offsets record, the QoS2 mqtt reply is not yet open (deferred for
publishQosMax==2), so doMqttAbort on the unopened reply was unobservable and
the client hung. Also reset the mqtt initial in KafkaOffsetFetchStream
onKafkaAbort, mirroring how meta.abort already surfaces connect-abort.
Migrates publish.qos2.offset.fetch.abort spec to #offsets.

Verified: full MqttKafkaPublishProxyIT passes (36 tests, 0 failures).
…ry (#1799)

Rename the mqtt-sessions record key postfixes #will-signal -> #will and
#expiry-signal -> #expiry across the session factory constants and all 65
affected spec scenarios. The type header values (will-signal/expiry-signal)
and the will-message key prefix (#will-) are unchanged; clientId extraction
strips the constant's length so it stays correct. Clean rename, no legacy
read-both fallback.

Verified against the engine: MqttKafkaSessionProxyIT (33), MqttKafkaPublishProxyIT
(36) and MqttKafkaSubscribeProxyIT (54) all pass.
…1798)

Thread the configured StoreHandler into MqttSessionProxy and coordinate
single-owner session ownership through the store lease primitives. On
session begin the proxy locks clientId#owner for the session.lease TTL;
on success it renews the lease on the session.renew interval via a new
SIGNAL_RENEW_SESSION_OWNERSHIP timer, and releases the lock on end,
abort, and reset.

This is the additive foundation for cooperative takeover: ownership is
recorded in the store alongside the existing highlander group, which
remains until the cooperative takeover and redirect paths replace it.
No wire frames change, so existing session behavior is unaffected.
Build the cooperative-takeover protocol on top of the store-backed
ownership lease added previously.

On session begin the proxy reads the current owner record for the
clientId from the store. When the owner is a different replica with a
strong (serverRef-advertised) identity, the client is redirected to it
via an MQTT reset carrying the server reference; otherwise the proxy
publishes its own ownership record and claims the lease cooperatively.

The owning proxy watches its ownership key; when a different replica
publishes a competing record it stops renewal, releases the lock, and
closes its session so the challenger can take over. If the incumbent is
unresponsive and never yields, the contending proxy steals the lease
once it expires via a bounded retry. Identity strength is derived from
the configured serverRef and published in the ownership record.

This runs alongside the existing highlander group, which remains as the
deprecated fallback until it is removed. Existing publish, subscribe and
session integration tests pass unchanged.
… hostname (#1800)

Source the replica's externally-reachable identity from the engine
'service.hostname' configuration (zilla.engine.service.hostname) rather
than only the per-binding 'serverRef' option, reusing the existing engine
configuration surface already consumed by binding-mcp.

The binding-level 'serverRef' continues to work and takes precedence when
set, emitting a deprecation warning that points to the engine service
hostname. When 'serverRef' is absent, the engine service hostname supplies
the strong-identity address published in the store ownership record; when
neither is configured the replica identity remains weak.
…isenberg-NjXPY

# Conflicts:
#	runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java
#	specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.store.yaml
#	specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/config/SchemaTest.java
Restore 100% instruction coverage on the binding-mqtt-kafka.spec module
by exercising every mismatch branch of the MqttKafkaSessionOffsetsMatcher:
per-field mismatches (partition, consumed offset, producer id/epoch/sequence,
packet ids), version mismatch, entry-count mismatch, the no-constraint
(null) result, version-only match, and the empty-buffer path.
…1798)

Delete the session.group.* scenarios and their IT methods. These exercise
the Kafka consumer-group coordinator (describe-config/session-timeout/
not-authorized validation and server-sent reset) that is being removed in
favor of store-based cooperative session ownership; three were already
@ignore'd. Drops the now-unused @ignore import.
…#1798)

mqtt-kafka now requires a store for cooperative session ownership, which
broke the asyncapi proxy whose composite generates an mqtt-kafka binding
with no store. Generate a memory store in the composite namespace and
reference it from the generated mqtt-kafka binding; add store-memory as a
provided dependency so the type resolves in tests (the dist already
bundles it).
…ator

The OWNERSHIP_FIELD_SEPARATOR char literal embedded a raw NUL byte, which
caused git, grep, and ripgrep to treat MqttKafkaSessionFactory.java as a
binary file. Use the equivalent unicode escape instead so the source stays
text and diffs and searches work.
…ngle identity token

Store one identity token per clientId#owner record instead of a
separator-delimited tuple. A strong (externally-addressable) owner stores its
server reference verbatim; a weak owner stores its opaque replica identity
behind an ETag-style "W/" prefix. This drops the redundant second identity
field and the unused acquisition timestamp, and removes the field separator
entirely so the record is a plain string that test stores can seed from YAML.
The session owner token / redirect server reference is a bare hostname; the
listener port is implicit per service and shared across replicas, so the
service.hostname property carries no port.
…rence

The redirect target is the owning replica's hostname; the listener port is
implicit per service, so the redirect server reference carries no port.
…nership

Remove the pre-Store session-ownership machinery now that Store-based
cooperative ownership is in place:

- Delete the highlander Kafka consumer-group election (KafkaGroupStream) and
  the #migrate compacted-topic takeover signal (KafkaSessionSignalStream,
  sendMigrateSignal, MIGRATE_KEY_POSTFIX, clientIdMigrate).
- Stop creating the mqtt-clients consumer group and stop carrying the redirect
  hint on KafkaResetEx.consumerId across the session, publish, and subscribe
  streams; redirect now flows from the Store ownership record.
- Resolve Store ownership before opening any Kafka stream: onOwnershipLocked
  drives session establishment (qos<2 replies and opens the session-state
  stream; qos2 fetches metadata/offsets first), and the will-fetch flush feeds
  into the same path. A strong remote owner is redirected with no Kafka stream
  opened.
- Drop the "highlander" protocol handling from KafkaClientGroupFactory.

KafkaResetEx.consumerId remains in the Kafka IDL (still used by binding-kafka).
…n.subscribe kafka script

Strip the #migrate produce stream, the highlander group stream, and the
mqtt-clients groupId + #migrate merged-stream filter from the session.subscribe
kafka scripts. The proxy now opens a single session-state stream keyed on the
clientId once it owns the session via the store. Verified against the engine:
MqttKafkaSessionProxyIT.shouldSubscribeSaveSubscriptionsInSession passes.
Seed the test store with a strong remote owner (client-1#owner =
mqtt-2.example.com) so the proxy resolves a different strong-identity owner on
connect and redirects the client, exercising the store-based redirect path
without any Kafka session stream.
…subscribe specs

Remove the #migrate produce streams, highlander group streams, mqtt-clients
groupId, #migrate merged-stream filters, and consumerId redirect hints from the
kafka-side spec scripts across all session/publish/subscribe scenarios, matching
the removed ownership machinery. Verified against the engine: publish (36/36),
subscribe (54/54), and the session scenarios (except the cross-replica ones and
session.exists.clean.start, handled separately) pass; peer-to-peer KafkaIT and
MqttIT are green.

Also drop session.connect.override.{max,min}.session.expiry: the server-side
session-expiry override was a side effect of the Kafka consumer-group
session.timeout negotiation, which no longer exists after removing the group.
claude added 9 commits May 27, 2026 23:23
…-timeout under analysis)

Redesign session.redirect for the store-based ownership model:
- Factory: on a strong remote owner, doRedirect resets the mqtt session stream
  immediately (doMqttReset) without opening any kafka stream (ownership-first).
- Add kafka != null guards to the base KafkaSessionStream do* frame-writers so a
  never-begun session stream (the redirect path) is not written to.
- Specs: delete the kafka-side session.redirect scripts (no kafka activity on
  redirect); mqtt-side client uses connect aborted, server uses accept/rejected.
- ITs: shouldRedirect runs mqtt-side only against proxy.redirect.yaml (store
  seeded with client-1#owner = mqtt-2.example.com); peer MqttIT#shouldRedirect
  passes.

Known issue for analysis: the runtime MqttKafkaSessionProxyIT#shouldRedirect
hits an engine-side close timeout with doMqttReset alone — the proxy's reply
direction is not released. doMqttAbort on the reply clears it but is being
reviewed; committed without it for reproduction.
newStream returns a non-null consumer, so the engine fully establishes the
bidirectional mqtt session stream before ownership is resolved. Because the
redirect decision is deferred behind the async store lookup, the proxy has
accepted the stream and owes a reply terminal; resetting only the initial
leaves the reply undischarged and the engine synthetically resets it on close.
Abort the reply alongside the initial RESET so the stream is fully torn down.

MqttKafkaSessionProxyIT#shouldRedirect and peer MqttIT#shouldRedirect pass.
…on takeover

The ownership challenge previously surrendered only to a different identity
(cross-replica), so two connections for the same clientId on one replica could
not take over from each other — that path had relied on the removed highlander
group. Carry a globally-unique per-connection nonce (replicaId + initialId) in
the ownership record and surrender in onOwnershipChallenged whenever the
challenger's nonce differs from ours. A newer connection's store.put fires the
incumbent's watch, which then unlocks and resets, letting the newcomer acquire
the lock. Identity is still used for the strong-remote redirect decision.
… re-model

The three takeover scenarios (session.client.takeover, session.exists.clean.start,
session.will.message.takeover.deliver.will) relied on the removed highlander
group. Same-replica takeover now works via the per-connection ownership nonce,
but the scripts/ITs will be re-modeled when the ownership logic moves to the
mqtt server binding. Ignore them with a TODO until then so the removal lands
green.
Merge develop (#1804 et al) into the removal branch. develop's #1804 moved the
'store' option and the session-ownership deprecation warning from mqtt-kafka
to the mqtt server binding, so this branch's mqtt-kafka-side store-based
ownership cannot stay where it is. Strip every ownership artifact out of
mqtt-kafka so the merge compiles; the ownership logic will be added to
binding-mqtt in a follow-up commit.

Removed from binding-mqtt-kafka:
- MqttKafkaBindingConfig.store + supplyStore plumbing through MqttKafkaProxyFactory.
- MqttKafkaSessionFactory: OwnershipRecord, all on/do*Ownership* handlers,
  doRedirect, ownerIdentity/Nonce/Record helpers, OWNER_KEY_POSTFIX,
  SIGNAL_RENEW_SESSION_OWNERSHIP, SIGNAL_STEAL_SESSION_OWNERSHIP,
  REDIRECT_AVAILABLE_MASK, hasRedirectCapability, and the MqttSessionProxy
  store/ownerKey/ownerToken/owns/claimed/ownerWatch/renewAt/stealAt/redirect
  fields.
- MqttKafkaConfiguration: SESSION_LEASE/SESSION_RENEW property defs.
- The redirect spec scenario (proxy.redirect.yaml + session.redirect/{client,server}.rpt),
  shouldRedirect runtime + peer ITs.
- Conflict-resolution cleanup: drop 'store' from the mqtt-kafka schema's
  required list and strip 'store: test0' (plus the stores: block) from the
  config-fixture yamls.

mqtt-kafka now assumes ownership is achieved when it receives a session BEGIN
and proceeds straight to doEstablishSession in onMqttBegin. The kafka != null
guards on KafkaSessionStream stay; highlander/migrate removal, will-aware
doEstablishSession, and the bulk spec strip are preserved.
…binding

Move the store-based MQTT session-ownership state machine into the mqtt server
binding (where it conceptually belongs: ownership is a generic MQTT concern,
not a Kafka-mapping detail). mqtt-kafka now bootstraps the Kafka session as
soon as it receives the application BEGIN; binding-mqtt only forwards that
BEGIN after ownership is achieved.

binding-mqtt:
- MqttBindingConfig.store: resolved from options.store via supplyStore, plumbed
  through MqttBinding/MqttBindingContext.
- MqttConfiguration: INSTANCE_ID, SESSION_LEASE (PT30S), SESSION_RENEW (PT10S)
  plus a serviceHostname() accessor for the engine's service hostname.
- New InstanceId helper (mirrors mqtt-kafka's).
- MqttServerFactory: per-connection ownership state on MqttServer
  (ownerKey/Token/Watch, renewAt, stealAt, owns/claimed/ownershipResolved,
  pendingSession* for the deferred-establish), OwnershipRecord with the
  globally-unique per-connection nonce (replicaId + initialId), signal ids
  SIGNAL_RENEW_SESSION_OWNERSHIP (4) + SIGNAL_STEAL_SESSION_OWNERSHIP (5)
  wired into onNetworkSignal. Hook in onDecodeConnectPayload: when store is
  configured, defer session establishment to the lock callback by parking
  decoder = decodeAwaitOwnership. On strong remote owner doEncodeConnack
  SERVER_MOVED + serverReference and never open the downstream session; on
  claim+lock, doEstablishSession (which creates MqttSessionStream and sends
  BEGIN) + register store.watch for cooperative challenge. Same-replica
  takeover via different-nonce challenge. Re-entrant guards on the store
  callbacks for connection-already-closed paths. doReleaseOwnership called
  from cleanupNetwork and closeStreams.

The legacy session.server.redirect.before.connack path (mqtt-kafka can still
send a session RESET with serverRef post-BEGIN) is kept as a complementary
late-redirect mechanism.

specs/binding-mqtt.spec:
- server.redirect.yaml seeds the engine test store with
  client-1#owner = mqtt-2.example.com:1883.
- streams/network/v5/session.redirect/{client,server}.rpt encode the
  CONNACK SERVER_MOVED + Server Reference response.
- v5/SessionIT peer test and the runtime v5/SessionIT both cover shouldRedirect.
…nership

Wire up store-based MQTT session takeover end-to-end at the binding-mqtt layer
and retire the obsolete mqtt-kafka tests.

binding-mqtt:
- MqttServerFactory.onOwnershipChallenged now routes through the existing
  onDecodeError(SESSION_TAKEN_OVER) teardown (DISCONNECT 0x8E to the displaced
  client + END to publishes/subscribes + END to session + doReleaseOwnership),
  with a pre-emptive session.cleanupAbort when a will is present so the
  session ends as ABORT (per the will-on-abort path) and the will fires.
- OwnershipRecord FIELD_SEPARATOR: use the � escape form in source for
  clarity.

binding-mqtt specs:
- New shouldDeliverWillMessageOnSessionTakeover at v5 network + application
  layers; shouldTakeOverSession / shouldRemoveSessionAtCleanStart already
  covered v5; add a runtime IT method for each, on a tight-lease store config.
- Correct session.redirect's reason-code byte to 0x9d (MqttReasonCodes.
  SERVER_MOVED); was 0x87 (NOT_AUTHORIZED) in a pre-existing script bug, so
  the previously-passing shouldRedirect was matching the wrong byte.

mqtt-kafka cleanup:
- Delete the three @ignored takeover ITs in MqttKafkaSessionProxyIT, the
  KafkaIT/MqttIT peer methods, and the corresponding kafka/mqtt-side
  spec scenario dirs.
- Drop the dead KafkaGroup inner class and its uses in MqttKafkaPublishFactory
  / MqttKafkaPublishMetadata (left over from the kafka-side ownership strip;
  removing closes the mqtt-kafka jacoco class-coverage gate).

Verified: ./mvnw clean install across runtime/binding-mqtt, runtime/binding-mqtt-kafka,
specs/binding-mqtt.spec, specs/binding-mqtt-kafka.spec — BUILD SUCCESS with
full ITs and jacoco; coverage checks met on both bindings.
…over

The store-based ownership lock previously left a contended second connection
waiting for the broker's sessionLease to expire when the prior holder was
unresponsive (no watch event fires). Embed the live lock token in the
OwnershipRecord so a contender can pre-empt the unresponsive holder via
store.unlock(ownerKey, observedToken, ...), bounded by sessionRenew instead
of sessionLease.

- OwnershipRecord gains a fourth field, `token`. decode handles 0/1/2
  trailing separators so externally-seeded records (identity only) and
  legacy records (identity + nonce, no token) still parse cleanly.
- Reorder the claim flow: lock-then-put. ownershipRecord(token) is now only
  called from the lock-success path, so every record this code publishes
  carries the live token.
- Extract completeOwnership(traceId, token) shared by onOwnershipLocked
  (initial claim) and the new onStealComplete (post-steal); it writes the
  token-carrying record, installs the watch, schedules renew, runs
  doEstablishSession, and unsticks the decoder.
- onStealOwnership now: store.get → if observedToken present, store.unlock →
  retry store.lock → onStealComplete. The unlock-with-token only succeeds
  if that token is still the live holder; stale tokens are no-ops, leaving
  the cooperative path intact for responsive holders that surrender via the
  watch first.
- onStealComplete: success → completeOwnership; failure → CONNACK
  SERVER_BUSY (0x89) + network end (mirrors the doRedirect teardown
  pattern). No session cleanup needed because doEstablishSession only runs
  on success.
- onOwnershipRenewed: when renew returns null (token invalidated by a
  force-unlock), the displaced holder now surrenders via the same path as
  the watch-driven challenge — extracted as doSurrenderOwnership(traceId)
  (cancel renew, unlock if held, conditional session.cleanupAbort when a
  will is set, onDecodeError SESSION_TAKEN_OVER). onOwnershipChallenged
  also delegates to this helper, removing the duplication.

Tests: shouldTakeOverSession / shouldDeliverWillMessageOnSessionTakeover /
shouldRemoveSessionAtCleanStart / shouldRedirect remain green. The
SERVER_BUSY rejection branch and the renew-failure surrender are not
covered by an IT (would require the test store to surface a "renew refused"
outcome and a contender holding through both lock attempts); jacoco gates
on both bindings still pass.
@jfallows jfallows changed the title Refactor MQTT session management to use store-based persistence [2.0] Refactor MQTT session management to use store-based persistence May 29, 2026
@jfallows jfallows self-assigned this May 29, 2026
@jfallows jfallows requested a review from akrambek May 29, 2026 06:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants