feat(Push): add Appwrite Push (MQTT 5) adapter#122
Conversation
Adds a self-hosted, low-power alternative to FCM/APNS. Publishes notifications over MQTT 5 to a per-device topic, allowing a single persistent TLS connection on the device with a long keep-alive interval (30 minutes by default) — the same model that lets FCM be low-power on Android. - Helpers/MQTT: minimal MQTT 5 codec (control packet encode/decode) so the adapter does not need an external MQTT client dependency. - Adapter/Push/Appwrite: publisher adapter. Connects over TCP/TLS, authenticates with a short-lived HMAC-signed JWT, publishes one QoS 1 PUBLISH per device with content-type and message-expiry properties, maps broker reason codes back to the standard expired-token signal so Appwrite's target invalidation works the same way as for FCM/APNS. Tests: 10 codec round-trip cases, 2 adapter integration cases driven by a fake broker spawned via proc_open. PHPStan level 6 clean, Pint PSR-12 clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR adds an Appwrite Push adapter that publishes MQTT 5 notifications to a self-hosted broker over TCP/TLS, plus a hand-rolled minimal MQTT 5 codec and integration tests using a Swoole fake broker.
Confidence Score: 4/5Safe to merge after fixing the read-buffer reset in The adapter's
Important Files Changed
Reviews (3): Last reviewed commit: "fix(Dockerfile): install libbrotli for s..." | Re-trigger Greptile |
There was a problem hiding this comment.
Pull request overview
Adds a new Appwrite Push provider to the messaging library by implementing an MQTT 5 publisher adapter (plus a minimal MQTT 5 packet codec) so the existing worker can dispatch push notifications through the existing PushAdapter abstraction.
Changes:
- Added
Utopia\Messaging\Helpers\MQTTimplementing minimal MQTT 5 encode/decode for the packet types needed by the adapter/tests. - Added
Utopia\Messaging\Adapter\Push\Appwritethat connects to an MQTT broker, authenticates via HMAC JWT, and publishes per-device QoS 1 notifications. - Added integration/unit tests, including a spawned fake broker process to validate real bytes-on-the-wire behavior and reason-code mapping.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
src/Utopia/Messaging/Helpers/MQTT.php |
Minimal MQTT 5 codec used by the adapter and fake broker in tests. |
src/Utopia/Messaging/Adapter/Push/Appwrite.php |
New Push adapter that connects to an MQTT broker and publishes per-device notifications. |
tests/Messaging/Helpers/MQTTTest.php |
Unit tests covering CONNECT/PUBLISH/CONNACK/PUBACK encode+parse and decode buffering behavior. |
tests/Messaging/Adapter/Push/FakeBroker.php |
Test-only fake broker process that decodes incoming MQTT packets and emits CONNACK/PUBACK. |
tests/Messaging/Adapter/Push/AppwriteTest.php |
Integration tests that start the fake broker and assert topics/payloads and reason-code mapping. |
Comments suppressed due to low confidence (2)
tests/Messaging/Adapter/Push/FakeBroker.php:39
- The loop comment says it “exits only via SIGTERM handler above”, but the code also exits via the DISCONNECT case (
break 3). Updating the comment would avoid misleading future maintainers when debugging test hangs/shutdown behavior.
// @phpstan-ignore-next-line
while (true) { // Exits only via SIGTERM handler above.
$client = @\stream_socket_accept($server, 5);
tests/Messaging/Adapter/Push/AppwriteTest.php:150
stopBroker()uses theSIGTERMconstant when callingproc_terminate().SIGTERMis provided byext-pcntl, which isn’t required by this project; without it, this will throw (undefined constant) and the test will fail to run. Consider callingproc_terminate($process)without a signal argument, or guarding signal usage behinddefined('SIGTERM').
if (\is_resource($broker['process'])) {
\proc_terminate($broker['process'], SIGTERM);
$deadline = \microtime(true) + 1;
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| $packet = MQTT::decodePacket($buffer); | ||
| if ($packet !== null) { | ||
| return $packet; | ||
| } |
| $parsed = MQTT::parsePuback($ack['payload']); | ||
| if ($parsed['reasonCode'] !== MQTT::REASON_SUCCESS) { | ||
| $error = $this->errorForReasonCode($parsed['reasonCode']); | ||
| $response->addResult($token, $error); | ||
| continue; |
| \pcntl_async_signals(true); | ||
| foreach ([SIGTERM, SIGINT] as $signal) { | ||
| \pcntl_signal($signal, function () use (&$captured, $capturePath) { | ||
| \file_put_contents($capturePath, \json_encode($captured)); | ||
| exit(0); |
| [ | ||
| 0 => ['pipe', 'r'], | ||
| 1 => ['file', '/dev/null', 'a'], | ||
| 2 => ['file', '/dev/null', 'a'], |
| $flags = 0; | ||
| if ($dup) { | ||
| $flags |= 0x08; | ||
| } | ||
| $flags |= ($qos & 0x03) << 1; |
| $this->write($socket, $packet); | ||
| $ack = $this->readPacket($socket); | ||
| if ($ack['type'] !== MQTT::PACKET_PUBACK) { | ||
| $response->addResult($token, 'Broker did not acknowledge PUBLISH'); | ||
| continue; |
- Pipeline PUBLISHes up to the broker-advertised Receive Maximum (default 256) and match PUBACKs by packet id. Drops effective send time from N×RTT to ~RTT for large fan-outs. - Verify PUBACK packet id matches the in-flight publish so an out-of-order or duplicate ack cannot attribute success to the wrong device token (Greptile P2, Copilot). - Surface json_encode failures as a RuntimeException instead of silently sending an empty payload to the broker (Greptile P1). - Persistent read buffer on the adapter so coalesced TCP reads do not drop trailing MQTT packets between readPacket() calls (Copilot). - MQTT::encodeConnect throws when a password is supplied without a username (MQTT 5 §3.1.2.9, Greptile P2). - MQTT::encodePublish validates QoS is 0/1/2 instead of silently masking the bits (Copilot). - FakeBroker fixture rewritten on Swoole — drops the pcntl dependency that wasn't installed in the alpine test image, and exercises the same async runtime Appwrite uses in production. Dockerfile installs ext- swoole via PECL for the tests image. - New tests: pipelined send to 64 devices, password-without-username rejection, invalid-QoS rejection. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Swoole 6.x enables brotli compression by default and requires libbrotli-dev at build time + brotli-libs at runtime. Without them the configure step fails with "Package 'libbrotlienc' not found". Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| $socket = @\stream_socket_client( | ||
| $url, | ||
| $errno, | ||
| $errstr, | ||
| self::CONNECT_TIMEOUT, | ||
| STREAM_CLIENT_CONNECT, | ||
| $context, | ||
| ); | ||
|
|
||
| if (!$socket) { | ||
| throw new \RuntimeException("Unable to connect to Appwrite Push broker at {$url}: {$errstr} (errno {$errno})"); | ||
| } | ||
|
|
||
| \stream_set_timeout($socket, self::READ_TIMEOUT); | ||
|
|
||
| return $socket; |
There was a problem hiding this comment.
$this->readBuffer is never cleared when a new socket connection is opened. If a process() call exits abnormally — for example, a read timeout or broker disconnect causes readPacket to throw, which is caught by pipelinedPublish and triggers an early return — the buffer retains partial or complete MQTT packet bytes from the closed connection. The next process() call opens a fresh socket, but handshake() immediately calls readPacket(), which checks $this->readBuffer first via decodePacket. If the stale bytes form a decodable packet (e.g., the tail end of a prior PUBACK), readPacket returns that packet instead of waiting for the new CONNACK, and handshake throws "Broker did not respond with CONNACK" on every subsequent send — even to a healthy broker.
| $socket = @\stream_socket_client( | |
| $url, | |
| $errno, | |
| $errstr, | |
| self::CONNECT_TIMEOUT, | |
| STREAM_CLIENT_CONNECT, | |
| $context, | |
| ); | |
| if (!$socket) { | |
| throw new \RuntimeException("Unable to connect to Appwrite Push broker at {$url}: {$errstr} (errno {$errno})"); | |
| } | |
| \stream_set_timeout($socket, self::READ_TIMEOUT); | |
| return $socket; | |
| $socket = @\stream_socket_client( | |
| $url, | |
| $errno, | |
| $errstr, | |
| self::CONNECT_TIMEOUT, | |
| STREAM_CLIENT_CONNECT, | |
| $context, | |
| ); | |
| if (!$socket) { | |
| throw new \RuntimeException("Unable to connect to Appwrite Push broker at {$url}: {$errstr} (errno {$errno})"); | |
| } | |
| \stream_set_timeout($socket, self::READ_TIMEOUT); | |
| $this->readBuffer = ''; | |
| return $socket; |
Summary
Adds Appwrite Push — a self-hosted, low-power alternative to FCM/APNS that publishes notifications over MQTT 5 to a per-device topic. Designed so device clients can hold a single persistent TLS connection with a long keep-alive interval (30 min default) — the same model that lets FCM be low-power on Android.
Pairs with the upstream Appwrite server-side broker + REST endpoints (separate PR in `appwrite/appwrite`). This library change just adds the publisher adapter so the existing messaging worker can dispatch to the new provider via the same `PushAdapter` interface as FCM/APNS.
Test plan
Notes for reviewers
🤖 Generated with Claude Code