[2.0] Refactor MQTT session management to use store-based persistence#1806
Open
jfallows wants to merge 39 commits into
Open
[2.0] Refactor MQTT session management to use store-based persistence#1806jfallows wants to merge 39 commits into
jfallows wants to merge 39 commits into
Conversation
) Add an optional, currently-unused `store` field to the mqtt-kafka binding options so configurations can prepare for StoreHandler-based session ownership coordination. When `store` is absent, log a one-time deprecation warning per binding instance announcing the upcoming requirement. Existing highlander-group ownership behavior is unchanged. https://claude.ai/code/session_0186rZE8mW1U947XTN4kzZ1x
Add a failing schema test asserting that an mqtt-kafka proxy configuration without the 'store' option is rejected, ahead of making 'store' required as part of migrating session ownership to the Store SPI. The companion proxy.missing.store.yaml omits 'store'; validation currently accepts it (store remains optional per #1797), so the test is red until the schema marks 'store' required. https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
Introduce internal MqttKafkaConfiguration Duration properties session.lease and session.renew, sizing the StoreHandler-based session ownership lease TTL and renewal interval. Defaults (30s lease, 10s renew) favor fast cooperative takeover with a bounded TTL fallback. These are runtime tunables, not binding options, and will be consumed by the Store-based ownership coordination in MqttKafkaSessionFactory. https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
…1798) Mark the 'store' option required in the mqtt-kafka schema and declare a store on every binding configuration, completing the deprecation begun in #1797. Spec/test configs reference a 'test' store; the runnable example uses a 'memory' store. SchemaTest now also registers the engine test-store schema patch so 'type: test' validates, and shouldRejectProxyWithoutStore passes now that the option is required. https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
Resolve the configured store reference to a StoreHandler when attaching an mqtt-kafka binding, exposing it on MqttKafkaBindingConfig for the session ownership coordination that will replace the Kafka consumer-group protocol. Mirrors the binding.resolveId + supplyStore pattern used by binding-mcp. https://claude.ai/code/session_01MDjS4CuoxTHjPGS2E1wGJR
Define MqttKafkaSessionOffsets / MqttKafkaSessionOffsetMetadata flyweights to carry per-(topic,partition) consumed offset, idempotent producer state, and in-flight QoS packet IDs as a single compacted mqtt-sessions record, replacing per-clientId Kafka consumer-group offset commit metadata.
…1799) KafkaSessionOffsetsHelper encodes/decodes the per-session QoS offset state (per-(topic,partition) consumed offset, idempotent producer state, in-flight packet IDs) to/from the MqttKafkaSessionOffsets flyweight, with a round-trip unit test. Adds partitionId/producerSequence carriers to KafkaOffsetMetadata.
#1799) Replace the per-clientId consumer-group offsetFetch/offsetCommit Kafka choreography with a client#offsets record on the compacted mqtt-sessions topic: a single merged FETCH reads the prior offsets/producer/in-flight state, and each commit point becomes a merged PRODUCE of a full MqttKafkaSessionOffsets snapshot. initProducerId + meta retained; #migrate and highlander group left for a later removal pass. Note: k3po ITs cannot run in the current execution environment (control agent fails to start), so this scenario is validated by CI, not locally.
) Replace the per-clientId Kafka consumer-group offsetFetch/offsetCommit choreography with a client#offsets record on the compacted mqtt-sessions topic. Session resume reads the record via a merged FETCH_ONLY stream and decodes the MqttKafkaSessionOffsets blob (offsets, idempotent producer state, in-flight QoS2 packet IDs); each commit point produces a full snapshot via a merged PRODUCE_ONLY stream keyed clientId#offsets. meta, initProducerId, group and #migrate are unchanged. Verified: MqttKafkaPublishProxyIT#shouldSendMessageQos2 passes against the engine; KafkaIT#shouldPublishQoS2Message and the helper unit test stay green. Sibling QoS2 scenarios remain on the old choreography pending spec rewrite.
…o #offsets (#1799) Rewrite the publish.qos2.retained and publish.mixture.qos kafka scripts to the client#offsets record model. Verified against the engine: MqttKafkaPublishProxyIT#shouldSendMessageQos2Retained and #shouldSendMessageMixtureQos both pass.
…1799) During session resume the offset-fetch branch created the session stream immediately AND again after the seed #offsets produce was acked, so the duplicate stream left the PUBREL->doCommitOffsetComplete->produce completion path unreachable (no PUBCOMP). Drop the premature doCreateSessionStream so recovery matches the fresh-session path (session stream created once, after the seed snapshot is acked). Migrates publish.qos2.recovery spec to #offsets. Verified: MqttKafkaPublishProxyIT#shouldSendMessageQos2DuringRecovery passes, no regression across the qos2/retained/mixture scenarios.
…ort specs to #offsets (#1799) Rewrite publish.qos2.init.producer.abort and publish.qos2.offset.commit.abort phase1/phase2 kafka scripts to the client#offsets model (FETCH_ONLY preamble and PRODUCE_ONLY commit streams). Verified against the engine: the three corresponding MqttKafkaPublishProxyIT abort methods pass (meta.abort already passed unchanged).
…ct (#1799) When the merged FETCH_ONLY client#offsets stream aborts before delivering the offsets record, the QoS2 mqtt reply is not yet open (deferred for publishQosMax==2), so doMqttAbort on the unopened reply was unobservable and the client hung. Also reset the mqtt initial in KafkaOffsetFetchStream onKafkaAbort, mirroring how meta.abort already surfaces connect-abort. Migrates publish.qos2.offset.fetch.abort spec to #offsets. Verified: full MqttKafkaPublishProxyIT passes (36 tests, 0 failures).
…ry (#1799) Rename the mqtt-sessions record key postfixes #will-signal -> #will and #expiry-signal -> #expiry across the session factory constants and all 65 affected spec scenarios. The type header values (will-signal/expiry-signal) and the will-message key prefix (#will-) are unchanged; clientId extraction strips the constant's length so it stays correct. Clean rename, no legacy read-both fallback. Verified against the engine: MqttKafkaSessionProxyIT (33), MqttKafkaPublishProxyIT (36) and MqttKafkaSubscribeProxyIT (54) all pass.
…1798) Thread the configured StoreHandler into MqttSessionProxy and coordinate single-owner session ownership through the store lease primitives. On session begin the proxy locks clientId#owner for the session.lease TTL; on success it renews the lease on the session.renew interval via a new SIGNAL_RENEW_SESSION_OWNERSHIP timer, and releases the lock on end, abort, and reset. This is the additive foundation for cooperative takeover: ownership is recorded in the store alongside the existing highlander group, which remains until the cooperative takeover and redirect paths replace it. No wire frames change, so existing session behavior is unaffected.
Build the cooperative-takeover protocol on top of the store-backed ownership lease added previously. On session begin the proxy reads the current owner record for the clientId from the store. When the owner is a different replica with a strong (serverRef-advertised) identity, the client is redirected to it via an MQTT reset carrying the server reference; otherwise the proxy publishes its own ownership record and claims the lease cooperatively. The owning proxy watches its ownership key; when a different replica publishes a competing record it stops renewal, releases the lock, and closes its session so the challenger can take over. If the incumbent is unresponsive and never yields, the contending proxy steals the lease once it expires via a bounded retry. Identity strength is derived from the configured serverRef and published in the ownership record. This runs alongside the existing highlander group, which remains as the deprecated fallback until it is removed. Existing publish, subscribe and session integration tests pass unchanged.
… hostname (#1800) Source the replica's externally-reachable identity from the engine 'service.hostname' configuration (zilla.engine.service.hostname) rather than only the per-binding 'serverRef' option, reusing the existing engine configuration surface already consumed by binding-mcp. The binding-level 'serverRef' continues to work and takes precedence when set, emitting a deprecation warning that points to the engine service hostname. When 'serverRef' is absent, the engine service hostname supplies the strong-identity address published in the store ownership record; when neither is configured the replica identity remains weak.
…isenberg-NjXPY # Conflicts: # runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java # specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.store.yaml # specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/config/SchemaTest.java
Restore 100% instruction coverage on the binding-mqtt-kafka.spec module by exercising every mismatch branch of the MqttKafkaSessionOffsetsMatcher: per-field mismatches (partition, consumed offset, producer id/epoch/sequence, packet ids), version mismatch, entry-count mismatch, the no-constraint (null) result, version-only match, and the empty-buffer path.
…1798) Delete the session.group.* scenarios and their IT methods. These exercise the Kafka consumer-group coordinator (describe-config/session-timeout/ not-authorized validation and server-sent reset) that is being removed in favor of store-based cooperative session ownership; three were already @ignore'd. Drops the now-unused @ignore import.
…#1798) mqtt-kafka now requires a store for cooperative session ownership, which broke the asyncapi proxy whose composite generates an mqtt-kafka binding with no store. Generate a memory store in the composite namespace and reference it from the generated mqtt-kafka binding; add store-memory as a provided dependency so the type resolves in tests (the dist already bundles it).
…ator The OWNERSHIP_FIELD_SEPARATOR char literal embedded a raw NUL byte, which caused git, grep, and ripgrep to treat MqttKafkaSessionFactory.java as a binary file. Use the equivalent unicode escape instead so the source stays text and diffs and searches work.
…ngle identity token Store one identity token per clientId#owner record instead of a separator-delimited tuple. A strong (externally-addressable) owner stores its server reference verbatim; a weak owner stores its opaque replica identity behind an ETag-style "W/" prefix. This drops the redundant second identity field and the unused acquisition timestamp, and removes the field separator entirely so the record is a plain string that test stores can seed from YAML.
The session owner token / redirect server reference is a bare hostname; the listener port is implicit per service and shared across replicas, so the service.hostname property carries no port.
…rence The redirect target is the owning replica's hostname; the listener port is implicit per service, so the redirect server reference carries no port.
…nership Remove the pre-Store session-ownership machinery now that Store-based cooperative ownership is in place: - Delete the highlander Kafka consumer-group election (KafkaGroupStream) and the #migrate compacted-topic takeover signal (KafkaSessionSignalStream, sendMigrateSignal, MIGRATE_KEY_POSTFIX, clientIdMigrate). - Stop creating the mqtt-clients consumer group and stop carrying the redirect hint on KafkaResetEx.consumerId across the session, publish, and subscribe streams; redirect now flows from the Store ownership record. - Resolve Store ownership before opening any Kafka stream: onOwnershipLocked drives session establishment (qos<2 replies and opens the session-state stream; qos2 fetches metadata/offsets first), and the will-fetch flush feeds into the same path. A strong remote owner is redirected with no Kafka stream opened. - Drop the "highlander" protocol handling from KafkaClientGroupFactory. KafkaResetEx.consumerId remains in the Kafka IDL (still used by binding-kafka).
…n.subscribe kafka script Strip the #migrate produce stream, the highlander group stream, and the mqtt-clients groupId + #migrate merged-stream filter from the session.subscribe kafka scripts. The proxy now opens a single session-state stream keyed on the clientId once it owns the session via the store. Verified against the engine: MqttKafkaSessionProxyIT.shouldSubscribeSaveSubscriptionsInSession passes.
Seed the test store with a strong remote owner (client-1#owner = mqtt-2.example.com) so the proxy resolves a different strong-identity owner on connect and redirects the client, exercising the store-based redirect path without any Kafka session stream.
…subscribe specs
Remove the #migrate produce streams, highlander group streams, mqtt-clients
groupId, #migrate merged-stream filters, and consumerId redirect hints from the
kafka-side spec scripts across all session/publish/subscribe scenarios, matching
the removed ownership machinery. Verified against the engine: publish (36/36),
subscribe (54/54), and the session scenarios (except the cross-replica ones and
session.exists.clean.start, handled separately) pass; peer-to-peer KafkaIT and
MqttIT are green.
Also drop session.connect.override.{max,min}.session.expiry: the server-side
session-expiry override was a side effect of the Kafka consumer-group
session.timeout negotiation, which no longer exists after removing the group.
…-timeout under analysis) Redesign session.redirect for the store-based ownership model: - Factory: on a strong remote owner, doRedirect resets the mqtt session stream immediately (doMqttReset) without opening any kafka stream (ownership-first). - Add kafka != null guards to the base KafkaSessionStream do* frame-writers so a never-begun session stream (the redirect path) is not written to. - Specs: delete the kafka-side session.redirect scripts (no kafka activity on redirect); mqtt-side client uses connect aborted, server uses accept/rejected. - ITs: shouldRedirect runs mqtt-side only against proxy.redirect.yaml (store seeded with client-1#owner = mqtt-2.example.com); peer MqttIT#shouldRedirect passes. Known issue for analysis: the runtime MqttKafkaSessionProxyIT#shouldRedirect hits an engine-side close timeout with doMqttReset alone — the proxy's reply direction is not released. doMqttAbort on the reply clears it but is being reviewed; committed without it for reproduction.
newStream returns a non-null consumer, so the engine fully establishes the bidirectional mqtt session stream before ownership is resolved. Because the redirect decision is deferred behind the async store lookup, the proxy has accepted the stream and owes a reply terminal; resetting only the initial leaves the reply undischarged and the engine synthetically resets it on close. Abort the reply alongside the initial RESET so the stream is fully torn down. MqttKafkaSessionProxyIT#shouldRedirect and peer MqttIT#shouldRedirect pass.
…on takeover The ownership challenge previously surrendered only to a different identity (cross-replica), so two connections for the same clientId on one replica could not take over from each other — that path had relied on the removed highlander group. Carry a globally-unique per-connection nonce (replicaId + initialId) in the ownership record and surrender in onOwnershipChallenged whenever the challenger's nonce differs from ours. A newer connection's store.put fires the incumbent's watch, which then unlocks and resets, letting the newcomer acquire the lock. Identity is still used for the strong-remote redirect decision.
… re-model The three takeover scenarios (session.client.takeover, session.exists.clean.start, session.will.message.takeover.deliver.will) relied on the removed highlander group. Same-replica takeover now works via the per-connection ownership nonce, but the scripts/ITs will be re-modeled when the ownership logic moves to the mqtt server binding. Ignore them with a TODO until then so the removal lands green.
Merge develop (#1804 et al) into the removal branch. develop's #1804 moved the 'store' option and the session-ownership deprecation warning from mqtt-kafka to the mqtt server binding, so this branch's mqtt-kafka-side store-based ownership cannot stay where it is. Strip every ownership artifact out of mqtt-kafka so the merge compiles; the ownership logic will be added to binding-mqtt in a follow-up commit. Removed from binding-mqtt-kafka: - MqttKafkaBindingConfig.store + supplyStore plumbing through MqttKafkaProxyFactory. - MqttKafkaSessionFactory: OwnershipRecord, all on/do*Ownership* handlers, doRedirect, ownerIdentity/Nonce/Record helpers, OWNER_KEY_POSTFIX, SIGNAL_RENEW_SESSION_OWNERSHIP, SIGNAL_STEAL_SESSION_OWNERSHIP, REDIRECT_AVAILABLE_MASK, hasRedirectCapability, and the MqttSessionProxy store/ownerKey/ownerToken/owns/claimed/ownerWatch/renewAt/stealAt/redirect fields. - MqttKafkaConfiguration: SESSION_LEASE/SESSION_RENEW property defs. - The redirect spec scenario (proxy.redirect.yaml + session.redirect/{client,server}.rpt), shouldRedirect runtime + peer ITs. - Conflict-resolution cleanup: drop 'store' from the mqtt-kafka schema's required list and strip 'store: test0' (plus the stores: block) from the config-fixture yamls. mqtt-kafka now assumes ownership is achieved when it receives a session BEGIN and proceeds straight to doEstablishSession in onMqttBegin. The kafka != null guards on KafkaSessionStream stay; highlander/migrate removal, will-aware doEstablishSession, and the bulk spec strip are preserved.
…binding
Move the store-based MQTT session-ownership state machine into the mqtt server
binding (where it conceptually belongs: ownership is a generic MQTT concern,
not a Kafka-mapping detail). mqtt-kafka now bootstraps the Kafka session as
soon as it receives the application BEGIN; binding-mqtt only forwards that
BEGIN after ownership is achieved.
binding-mqtt:
- MqttBindingConfig.store: resolved from options.store via supplyStore, plumbed
through MqttBinding/MqttBindingContext.
- MqttConfiguration: INSTANCE_ID, SESSION_LEASE (PT30S), SESSION_RENEW (PT10S)
plus a serviceHostname() accessor for the engine's service hostname.
- New InstanceId helper (mirrors mqtt-kafka's).
- MqttServerFactory: per-connection ownership state on MqttServer
(ownerKey/Token/Watch, renewAt, stealAt, owns/claimed/ownershipResolved,
pendingSession* for the deferred-establish), OwnershipRecord with the
globally-unique per-connection nonce (replicaId + initialId), signal ids
SIGNAL_RENEW_SESSION_OWNERSHIP (4) + SIGNAL_STEAL_SESSION_OWNERSHIP (5)
wired into onNetworkSignal. Hook in onDecodeConnectPayload: when store is
configured, defer session establishment to the lock callback by parking
decoder = decodeAwaitOwnership. On strong remote owner doEncodeConnack
SERVER_MOVED + serverReference and never open the downstream session; on
claim+lock, doEstablishSession (which creates MqttSessionStream and sends
BEGIN) + register store.watch for cooperative challenge. Same-replica
takeover via different-nonce challenge. Re-entrant guards on the store
callbacks for connection-already-closed paths. doReleaseOwnership called
from cleanupNetwork and closeStreams.
The legacy session.server.redirect.before.connack path (mqtt-kafka can still
send a session RESET with serverRef post-BEGIN) is kept as a complementary
late-redirect mechanism.
specs/binding-mqtt.spec:
- server.redirect.yaml seeds the engine test store with
client-1#owner = mqtt-2.example.com:1883.
- streams/network/v5/session.redirect/{client,server}.rpt encode the
CONNACK SERVER_MOVED + Server Reference response.
- v5/SessionIT peer test and the runtime v5/SessionIT both cover shouldRedirect.
…nership Wire up store-based MQTT session takeover end-to-end at the binding-mqtt layer and retire the obsolete mqtt-kafka tests. binding-mqtt: - MqttServerFactory.onOwnershipChallenged now routes through the existing onDecodeError(SESSION_TAKEN_OVER) teardown (DISCONNECT 0x8E to the displaced client + END to publishes/subscribes + END to session + doReleaseOwnership), with a pre-emptive session.cleanupAbort when a will is present so the session ends as ABORT (per the will-on-abort path) and the will fires. - OwnershipRecord FIELD_SEPARATOR: use the � escape form in source for clarity. binding-mqtt specs: - New shouldDeliverWillMessageOnSessionTakeover at v5 network + application layers; shouldTakeOverSession / shouldRemoveSessionAtCleanStart already covered v5; add a runtime IT method for each, on a tight-lease store config. - Correct session.redirect's reason-code byte to 0x9d (MqttReasonCodes. SERVER_MOVED); was 0x87 (NOT_AUTHORIZED) in a pre-existing script bug, so the previously-passing shouldRedirect was matching the wrong byte. mqtt-kafka cleanup: - Delete the three @ignored takeover ITs in MqttKafkaSessionProxyIT, the KafkaIT/MqttIT peer methods, and the corresponding kafka/mqtt-side spec scenario dirs. - Drop the dead KafkaGroup inner class and its uses in MqttKafkaPublishFactory / MqttKafkaPublishMetadata (left over from the kafka-side ownership strip; removing closes the mqtt-kafka jacoco class-coverage gate). Verified: ./mvnw clean install across runtime/binding-mqtt, runtime/binding-mqtt-kafka, specs/binding-mqtt.spec, specs/binding-mqtt-kafka.spec — BUILD SUCCESS with full ITs and jacoco; coverage checks met on both bindings.
…over The store-based ownership lock previously left a contended second connection waiting for the broker's sessionLease to expire when the prior holder was unresponsive (no watch event fires). Embed the live lock token in the OwnershipRecord so a contender can pre-empt the unresponsive holder via store.unlock(ownerKey, observedToken, ...), bounded by sessionRenew instead of sessionLease. - OwnershipRecord gains a fourth field, `token`. decode handles 0/1/2 trailing separators so externally-seeded records (identity only) and legacy records (identity + nonce, no token) still parse cleanly. - Reorder the claim flow: lock-then-put. ownershipRecord(token) is now only called from the lock-success path, so every record this code publishes carries the live token. - Extract completeOwnership(traceId, token) shared by onOwnershipLocked (initial claim) and the new onStealComplete (post-steal); it writes the token-carrying record, installs the watch, schedules renew, runs doEstablishSession, and unsticks the decoder. - onStealOwnership now: store.get → if observedToken present, store.unlock → retry store.lock → onStealComplete. The unlock-with-token only succeeds if that token is still the live holder; stale tokens are no-ops, leaving the cooperative path intact for responsive holders that surrender via the watch first. - onStealComplete: success → completeOwnership; failure → CONNACK SERVER_BUSY (0x89) + network end (mirrors the doRedirect teardown pattern). No session cleanup needed because doEstablishSession only runs on success. - onOwnershipRenewed: when renew returns null (token invalidated by a force-unlock), the displaced holder now surrenders via the same path as the watch-driven challenge — extracted as doSurrenderOwnership(traceId) (cancel renew, unlock if held, conditional session.cleanupAbort when a will is set, onDecodeError SESSION_TAKEN_OVER). onOwnershipChallenged also delegates to this helper, removing the duplication. Tests: shouldTakeOverSession / shouldDeliverWillMessageOnSessionTakeover / shouldRemoveSessionAtCleanStart / shouldRedirect remain green. The SERVER_BUSY rejection branch and the renew-failure surrender are not covered by an IT (would require the test store to surface a "renew refused" outcome and a contender holding through both lock attempts); jacoco gates on both bindings still pass.
…n-kafka-removal-rGtCy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
This PR refactors MQTT session management to use a store-based persistence model instead of Kafka consumer group coordination. The changes introduce a new session storage mechanism that allows MQTT brokers to persist and retrieve session state independently, enabling better session takeover and redirect capabilities.
Key Changes
Core Session Management:
MqttKafkaSessionOffsetsHelperto manage session offset persistence in a dedicated storeMqttKafkaSessionOffsetsFWtype for serializing session offset metadataKafkaGroup,KafkaGroupBeginExFW,KafkaGroupFlushExFW)MqttKafkaSessionFactoryto use the new store-based approach instead of group coordinationMQTT Server Enhancements:
SERVER_BUSYreason code support for connection rejectionInstanceIdclass for unique broker instance identificationCloseableinterface support for proper resource cleanupConfiguration & Storage:
MqttConfigurationto support configurable session storesMqttBindingConfigto include store referencesTest Updates:
MqttKafkaSessionOffsetsHelperTestfor session offset persistence logicSpecification Changes:
MqttKafkaSessionOffsetsstructureMotivation
The previous Kafka consumer group-based approach had limitations for session management across multiple broker instances. The new store-based model provides:
Testing
Existing unit and integration tests have been updated to validate the new session persistence model. The test suite covers session creation, takeover, redirect, and expiry scenarios with the store-based approach.
https://claude.ai/code/session_01Gaf31VM5LZAFmUvtuBmqfz