Skip to content

feat(Push): add Appwrite Push (MQTT 5) adapter#122

Open
abnegate wants to merge 3 commits into
mainfrom
feat-appwrite-push
Open

feat(Push): add Appwrite Push (MQTT 5) adapter#122
abnegate wants to merge 3 commits into
mainfrom
feat-appwrite-push

Conversation

@abnegate
Copy link
Copy Markdown
Member

Summary

Adds Appwrite Push — a self-hosted, low-power alternative to FCM/APNS that publishes notifications over MQTT 5 to a per-device topic. Designed so device clients can hold a single persistent TLS connection with a long keep-alive interval (30 min default) — the same model that lets FCM be low-power on Android.

Pairs with the upstream Appwrite server-side broker + REST endpoints (separate PR in `appwrite/appwrite`). This library change just adds the publisher adapter so the existing messaging worker can dispatch to the new provider via the same `PushAdapter` interface as FCM/APNS.

  • `Helpers/MQTT` — minimal MQTT 5 control-packet codec (CONNECT/CONNACK/PUBLISH/PUBACK/SUBSCRIBE/SUBACK/PING/DISCONNECT). Pure PHP, no extra dependency.
  • `Adapter/Push/Appwrite` — the publisher. Opens a TCP/TLS socket per send batch, authenticates with a short-lived HMAC-signed JWT, sends one QoS 1 `PUBLISH` per device to `appwrite/push/{deviceToken}` with content-type + message-expiry properties.
  • Broker reason codes are mapped back to the standard `Expired device token` signal so the consuming server's target-invalidation path keeps working unchanged.

Test plan

  • `vendor/bin/phpunit --filter "MQTTTest|AppwriteTest"` — 12 tests, 69 assertions, all green
  • `vendor/bin/phpstan analyse --level=6 src tests` — no errors
  • `vendor/bin/pint --preset psr12 --test` — pass
  • Adapter integration test spawns a fake broker via `proc_open`, verifies real bytes on the wire (CONNECT shape, per-device PUBLISH topic + payload, PUBACK handling) and reason-code → error-message mapping (0x10 "no subscribers" maps to expired-token)

Notes for reviewers

  • I deliberately wrote the MQTT codec rather than pulling in `php-mqtt/client` — keeps the dependency surface flat and lets us tune for our exact use case (publisher only, QoS 1, no shared subs, no retain).
  • The codec is general; only the Appwrite adapter uses it for now, but a subscribe-side helper or other-broker adapters could reuse it.

🤖 Generated with Claude Code

Adds a self-hosted, low-power alternative to FCM/APNS. Publishes
notifications over MQTT 5 to a per-device topic, allowing a single
persistent TLS connection on the device with a long keep-alive interval
(30 minutes by default) — the same model that lets FCM be low-power on
Android.

- Helpers/MQTT: minimal MQTT 5 codec (control packet encode/decode) so
  the adapter does not need an external MQTT client dependency.
- Adapter/Push/Appwrite: publisher adapter. Connects over TCP/TLS,
  authenticates with a short-lived HMAC-signed JWT, publishes one QoS 1
  PUBLISH per device with content-type and message-expiry properties,
  maps broker reason codes back to the standard expired-token signal so
  Appwrite's target invalidation works the same way as for FCM/APNS.

Tests: 10 codec round-trip cases, 2 adapter integration cases driven by
a fake broker spawned via proc_open. PHPStan level 6 clean, Pint PSR-12
clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 23, 2026 03:29
@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented May 23, 2026

Greptile Summary

This PR adds an Appwrite Push adapter that publishes MQTT 5 notifications to a self-hosted broker over TCP/TLS, plus a hand-rolled minimal MQTT 5 codec and integration tests using a Swoole fake broker.

  • src/Utopia/Messaging/Helpers/MQTT.php — new pure-PHP MQTT 5 codec covering CONNECT/CONNACK/PUBLISH/PUBACK/SUBSCRIBE/PING/DISCONNECT with variable-length integer encoding, property encoding/decoding, and a streaming decodePacket that can handle coalesced TCP reads.
  • src/Utopia/Messaging/Adapter/Push/Appwrite.php — new PushAdapter that opens one TCP/TLS socket per send() call, authenticates with an HMAC-signed JWT, and pipelines up to receiveMaximum QoS-1 PUBLISHes before draining PUBACKs; maps MQTT reason codes back to the standard expired-token signal.
  • Tests — unit tests for the codec and integration tests against a proc_open-spawned Swoole broker that verify wire-level packet shapes, pipelined delivery, and reason-code-to-error mapping.

Confidence Score: 4/5

Safe to merge after fixing the read-buffer reset in connect(); the bug is only reachable when a connection error occurs mid-batch, but it would make the adapter permanently broken for that instance's remaining lifetime.

The adapter's $this->readBuffer is never cleared when connect() opens a new socket. A read timeout or broker disconnect during pipelinedPublish leaves partial bytes in the buffer; the next call's handshake() parses those stale bytes instead of the new CONNACK, causing every subsequent send to fail with "Broker did not respond with CONNACK". The fix is a one-line reset in connect() and is low-risk.

src/Utopia/Messaging/Adapter/Push/Appwrite.php — specifically the connect() method and the readBuffer / receiveMaximum instance state that persists across calls.

Important Files Changed

Filename Overview
src/Utopia/Messaging/Adapter/Push/Appwrite.php New adapter for MQTT 5 push notifications; pipelined PUBLISH/PUBACK loop is well-structured, but $this->readBuffer is not reset on reconnect, causing stale bytes from a failed connection to corrupt the CONNACK handshake on the next process() call.
src/Utopia/Messaging/Helpers/MQTT.php Minimal MQTT 5 codec covering CONNECT/CONNACK/PUBLISH/PUBACK/SUBSCRIBE/PING/DISCONNECT; codec logic is correct for the current adapter's use cases.
tests/Messaging/Adapter/Push/AppwriteTest.php Integration tests verify correct PUBLISH topics, pipelined delivery to 64 devices, and reason-code → expired-token mapping.
tests/Messaging/Adapter/Push/FakeBroker.php Swoole-based fake broker for integration tests; handles CONNECT/PUBLISH/DISCONNECT/PINGREQ correctly and writes capture JSON for assertions.
tests/Messaging/Helpers/MQTTTest.php Unit tests for the MQTT codec covering round-trip encode/parse for all packet types, partial-buffer handling, multi-packet concatenation, and validation edge cases.
Dockerfile Adds Swoole PHP extension (required by FakeBroker) to the CI image; build-deps are correctly cleaned up after installation.

Reviews (3): Last reviewed commit: "fix(Dockerfile): install libbrotli for s..." | Re-trigger Greptile

Comment thread src/Utopia/Messaging/Adapter/Push/Appwrite.php Outdated
Comment thread src/Utopia/Messaging/Adapter/Push/Appwrite.php Outdated
Comment thread src/Utopia/Messaging/Helpers/MQTT.php
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new Appwrite Push provider to the messaging library by implementing an MQTT 5 publisher adapter (plus a minimal MQTT 5 packet codec) so the existing worker can dispatch push notifications through the existing PushAdapter abstraction.

Changes:

  • Added Utopia\Messaging\Helpers\MQTT implementing minimal MQTT 5 encode/decode for the packet types needed by the adapter/tests.
  • Added Utopia\Messaging\Adapter\Push\Appwrite that connects to an MQTT broker, authenticates via HMAC JWT, and publishes per-device QoS 1 notifications.
  • Added integration/unit tests, including a spawned fake broker process to validate real bytes-on-the-wire behavior and reason-code mapping.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/Utopia/Messaging/Helpers/MQTT.php Minimal MQTT 5 codec used by the adapter and fake broker in tests.
src/Utopia/Messaging/Adapter/Push/Appwrite.php New Push adapter that connects to an MQTT broker and publishes per-device notifications.
tests/Messaging/Helpers/MQTTTest.php Unit tests covering CONNECT/PUBLISH/CONNACK/PUBACK encode+parse and decode buffering behavior.
tests/Messaging/Adapter/Push/FakeBroker.php Test-only fake broker process that decodes incoming MQTT packets and emits CONNACK/PUBACK.
tests/Messaging/Adapter/Push/AppwriteTest.php Integration tests that start the fake broker and assert topics/payloads and reason-code mapping.
Comments suppressed due to low confidence (2)

tests/Messaging/Adapter/Push/FakeBroker.php:39

  • The loop comment says it “exits only via SIGTERM handler above”, but the code also exits via the DISCONNECT case (break 3). Updating the comment would avoid misleading future maintainers when debugging test hangs/shutdown behavior.
// @phpstan-ignore-next-line
while (true) { // Exits only via SIGTERM handler above.
    $client = @\stream_socket_accept($server, 5);

tests/Messaging/Adapter/Push/AppwriteTest.php:150

  • stopBroker() uses the SIGTERM constant when calling proc_terminate(). SIGTERM is provided by ext-pcntl, which isn’t required by this project; without it, this will throw (undefined constant) and the test will fail to run. Consider calling proc_terminate($process) without a signal argument, or guarding signal usage behind defined('SIGTERM').
        if (\is_resource($broker['process'])) {
            \proc_terminate($broker['process'], SIGTERM);
            $deadline = \microtime(true) + 1;

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +333 to +337

$packet = MQTT::decodePacket($buffer);
if ($packet !== null) {
return $packet;
}
Comment on lines +99 to +103
$parsed = MQTT::parsePuback($ack['payload']);
if ($parsed['reasonCode'] !== MQTT::REASON_SUCCESS) {
$error = $this->errorForReasonCode($parsed['reasonCode']);
$response->addResult($token, $error);
continue;
Comment on lines +21 to +25
\pcntl_async_signals(true);
foreach ([SIGTERM, SIGINT] as $signal) {
\pcntl_signal($signal, function () use (&$captured, $capturePath) {
\file_put_contents($capturePath, \json_encode($captured));
exit(0);
Comment on lines +111 to +114
[
0 => ['pipe', 'r'],
1 => ['file', '/dev/null', 'a'],
2 => ['file', '/dev/null', 'a'],
Comment thread src/Utopia/Messaging/Helpers/MQTT.php Outdated
Comment on lines +166 to +170
$flags = 0;
if ($dup) {
$flags |= 0x08;
}
$flags |= ($qos & 0x03) << 1;
Comment on lines +92 to +96
$this->write($socket, $packet);
$ack = $this->readPacket($socket);
if ($ack['type'] !== MQTT::PACKET_PUBACK) {
$response->addResult($token, 'Broker did not acknowledge PUBLISH');
continue;
abnegate and others added 2 commits May 23, 2026 15:40
- Pipeline PUBLISHes up to the broker-advertised Receive Maximum (default
  256) and match PUBACKs by packet id. Drops effective send time from
  N×RTT to ~RTT for large fan-outs.
- Verify PUBACK packet id matches the in-flight publish so an
  out-of-order or duplicate ack cannot attribute success to the wrong
  device token (Greptile P2, Copilot).
- Surface json_encode failures as a RuntimeException instead of
  silently sending an empty payload to the broker (Greptile P1).
- Persistent read buffer on the adapter so coalesced TCP reads do not
  drop trailing MQTT packets between readPacket() calls (Copilot).
- MQTT::encodeConnect throws when a password is supplied without a
  username (MQTT 5 §3.1.2.9, Greptile P2).
- MQTT::encodePublish validates QoS is 0/1/2 instead of silently
  masking the bits (Copilot).
- FakeBroker fixture rewritten on Swoole — drops the pcntl dependency
  that wasn't installed in the alpine test image, and exercises the same
  async runtime Appwrite uses in production. Dockerfile installs ext-
  swoole via PECL for the tests image.
- New tests: pipelined send to 64 devices, password-without-username
  rejection, invalid-QoS rejection.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Swoole 6.x enables brotli compression by default and requires
libbrotli-dev at build time + brotli-libs at runtime. Without them the
configure step fails with "Package 'libbrotlienc' not found".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment on lines +281 to +296
$socket = @\stream_socket_client(
$url,
$errno,
$errstr,
self::CONNECT_TIMEOUT,
STREAM_CLIENT_CONNECT,
$context,
);

if (!$socket) {
throw new \RuntimeException("Unable to connect to Appwrite Push broker at {$url}: {$errstr} (errno {$errno})");
}

\stream_set_timeout($socket, self::READ_TIMEOUT);

return $socket;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 $this->readBuffer is never cleared when a new socket connection is opened. If a process() call exits abnormally — for example, a read timeout or broker disconnect causes readPacket to throw, which is caught by pipelinedPublish and triggers an early return — the buffer retains partial or complete MQTT packet bytes from the closed connection. The next process() call opens a fresh socket, but handshake() immediately calls readPacket(), which checks $this->readBuffer first via decodePacket. If the stale bytes form a decodable packet (e.g., the tail end of a prior PUBACK), readPacket returns that packet instead of waiting for the new CONNACK, and handshake throws "Broker did not respond with CONNACK" on every subsequent send — even to a healthy broker.

Suggested change
$socket = @\stream_socket_client(
$url,
$errno,
$errstr,
self::CONNECT_TIMEOUT,
STREAM_CLIENT_CONNECT,
$context,
);
if (!$socket) {
throw new \RuntimeException("Unable to connect to Appwrite Push broker at {$url}: {$errstr} (errno {$errno})");
}
\stream_set_timeout($socket, self::READ_TIMEOUT);
return $socket;
$socket = @\stream_socket_client(
$url,
$errno,
$errstr,
self::CONNECT_TIMEOUT,
STREAM_CLIENT_CONNECT,
$context,
);
if (!$socket) {
throw new \RuntimeException("Unable to connect to Appwrite Push broker at {$url}: {$errstr} (errno {$errno})");
}
\stream_set_timeout($socket, self::READ_TIMEOUT);
$this->readBuffer = '';
return $socket;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants