Skip to content

Commit ba4ba2e

Browse files
abnegateclaude
andcommitted
fix(Push): address PR review on Appwrite Push adapter
- Pipeline PUBLISHes up to the broker-advertised Receive Maximum (default 256) and match PUBACKs by packet id. Drops effective send time from N×RTT to ~RTT for large fan-outs. - Verify PUBACK packet id matches the in-flight publish so an out-of-order or duplicate ack cannot attribute success to the wrong device token (Greptile P2, Copilot). - Surface json_encode failures as a RuntimeException instead of silently sending an empty payload to the broker (Greptile P1). - Persistent read buffer on the adapter so coalesced TCP reads do not drop trailing MQTT packets between readPacket() calls (Copilot). - MQTT::encodeConnect throws when a password is supplied without a username (MQTT 5 §3.1.2.9, Greptile P2). - MQTT::encodePublish validates QoS is 0/1/2 instead of silently masking the bits (Copilot). - FakeBroker fixture rewritten on Swoole — drops the pcntl dependency that wasn't installed in the alpine test image, and exercises the same async runtime Appwrite uses in production. Dockerfile installs ext- swoole via PECL for the tests image. - New tests: pipelined send to 64 devices, password-without-username rejection, invalid-QoS rejection. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5208f64 commit ba4ba2e

6 files changed

Lines changed: 256 additions & 115 deletions

File tree

Dockerfile

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@ FROM php:8.3.11-cli-alpine3.20
1919

2020
WORKDIR /usr/local/src/
2121

22+
RUN apk add --no-cache --virtual .build-deps \
23+
$PHPIZE_DEPS \
24+
linux-headers \
25+
openssl-dev \
26+
&& pecl install swoole \
27+
&& docker-php-ext-enable swoole \
28+
&& apk del .build-deps \
29+
&& apk add --no-cache libstdc++
30+
2231
COPY --from=composer /usr/local/src/vendor /usr/local/src/vendor
2332
COPY . /usr/local/src/
2433

src/Utopia/Messaging/Adapter/Push/Appwrite.php

Lines changed: 98 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,20 @@ class Appwrite extends PushAdapter
3434

3535
private int $packetId = 0;
3636

37+
/**
38+
* Persistent read buffer carrying over bytes the decoder didn't yet consume.
39+
* MQTT packets can be coalesced into a single TCP read and we'd otherwise
40+
* lose them between calls to readPacket().
41+
*/
42+
private string $readBuffer = '';
43+
44+
/**
45+
* Max number of unacknowledged PUBLISHes in flight at any time. MQTT 5's
46+
* Receive Maximum default is 65535 but most real brokers advertise a smaller
47+
* value in CONNACK; we honor whichever is smaller after handshake.
48+
*/
49+
private int $receiveMaximum = 256;
50+
3751
public function __construct(
3852
private string $endpoint,
3953
private string $signingKey,
@@ -70,14 +84,47 @@ protected function process(PushMessage $message): array
7084

7185
try {
7286
$this->handshake($socket);
87+
$this->pipelinedPublish($socket, $message->getTo(), $payload, $expiry, $response);
88+
89+
try {
90+
$this->write($socket, MQTT::encodeDisconnect());
91+
} catch (\Throwable) {
92+
// Best effort; some brokers may have already closed the socket.
93+
}
94+
} finally {
95+
$this->close($socket);
96+
}
7397

74-
foreach ($message->getTo() as $token) {
75-
$topic = $this->topicForToken($token);
98+
return $response->toArray();
99+
}
100+
101+
/**
102+
* Pipelined PUBLISH/PUBACK loop.
103+
*
104+
* Sends up to `receiveMaximum` PUBLISH packets without waiting for an
105+
* acknowledgment, then drains PUBACKs as they arrive, matching each by
106+
* packet id. Refills the in-flight window after each ack until every
107+
* device has been sent. This keeps throughput proportional to socket
108+
* bandwidth rather than to network RTT — important when fanning out to
109+
* thousands of devices per request.
110+
*
111+
* @param resource $socket
112+
* @param array<string> $tokens
113+
*/
114+
private function pipelinedPublish($socket, array $tokens, string $payload, int $expiry, Response $response): void
115+
{
116+
$inflight = [];
117+
$cursor = 0;
118+
$total = \count($tokens);
119+
120+
while ($cursor < $total || !empty($inflight)) {
121+
while ($cursor < $total && \count($inflight) < $this->receiveMaximum) {
122+
$token = $tokens[$cursor++];
123+
$packetId = $this->nextPacketId();
76124

77125
try {
78-
$packetId = $this->nextPacketId();
79126
$packet = MQTT::encodePublish(
80-
topic: $topic,
127+
topic: $this->topicForToken($token),
81128
payload: $payload,
82129
qos: 1,
83130
retain: false,
@@ -88,38 +135,45 @@ protected function process(PushMessage $message): array
88135
'contentType' => 'application/json',
89136
],
90137
);
91-
92138
$this->write($socket, $packet);
93-
$ack = $this->readPacket($socket);
94-
if ($ack['type'] !== MQTT::PACKET_PUBACK) {
95-
$response->addResult($token, 'Broker did not acknowledge PUBLISH');
96-
continue;
97-
}
98-
99-
$parsed = MQTT::parsePuback($ack['payload']);
100-
if ($parsed['reasonCode'] !== MQTT::REASON_SUCCESS) {
101-
$error = $this->errorForReasonCode($parsed['reasonCode']);
102-
$response->addResult($token, $error);
103-
continue;
104-
}
105-
106-
$response->incrementDeliveredTo();
107-
$response->addResult($token);
139+
$inflight[$packetId] = $token;
108140
} catch (\Throwable $error) {
109141
$response->addResult($token, $error->getMessage());
110142
}
111143
}
112144

145+
if (empty($inflight)) {
146+
continue;
147+
}
148+
113149
try {
114-
$this->write($socket, MQTT::encodeDisconnect());
115-
} catch (\Throwable) {
116-
// Best effort; some brokers may have already closed the socket.
150+
$ack = $this->readPacket($socket);
151+
} catch (\Throwable $error) {
152+
foreach ($inflight as $token) {
153+
$response->addResult($token, $error->getMessage());
154+
}
155+
return;
117156
}
118-
} finally {
119-
$this->close($socket);
120-
}
121157

122-
return $response->toArray();
158+
if ($ack['type'] !== MQTT::PACKET_PUBACK) {
159+
continue;
160+
}
161+
162+
$parsed = MQTT::parsePuback($ack['payload']);
163+
$token = $inflight[$parsed['packetId']] ?? null;
164+
if ($token === null) {
165+
continue;
166+
}
167+
unset($inflight[$parsed['packetId']]);
168+
169+
if ($parsed['reasonCode'] !== MQTT::REASON_SUCCESS) {
170+
$response->addResult($token, $this->errorForReasonCode($parsed['reasonCode']));
171+
continue;
172+
}
173+
174+
$response->incrementDeliveredTo();
175+
$response->addResult($token);
176+
}
123177
}
124178

125179
/**
@@ -175,7 +229,12 @@ private function buildPayload(PushMessage $message): string
175229
};
176230
}
177231

178-
return \json_encode($envelope, JSON_UNESCAPED_SLASHES);
232+
$json = \json_encode($envelope, JSON_UNESCAPED_SLASHES);
233+
if ($json === false) {
234+
throw new \RuntimeException('Failed to encode push payload: ' . \json_last_error_msg());
235+
}
236+
237+
return $json;
179238
}
180239

181240
private function resolveExpiry(PushMessage $message): int
@@ -291,6 +350,11 @@ private function handshake($socket): void
291350
if ($connack['reasonCode'] !== MQTT::REASON_SUCCESS) {
292351
throw new \RuntimeException("Broker rejected CONNECT (reason {$connack['reasonCode']})");
293352
}
353+
354+
$brokerLimit = (int)($connack['properties']['receiveMaximum'] ?? 0);
355+
if ($brokerLimit > 0) {
356+
$this->receiveMaximum = \min($this->receiveMaximum, $brokerLimit);
357+
}
294358
}
295359

296360
private function issueServerJwt(): string
@@ -313,8 +377,12 @@ private function issueServerJwt(): string
313377
*/
314378
private function readPacket($socket): array
315379
{
316-
$buffer = '';
317380
while (true) {
381+
$packet = MQTT::decodePacket($this->readBuffer);
382+
if ($packet !== null) {
383+
return $packet;
384+
}
385+
318386
$chunk = @\fread($socket, 4096);
319387
if ($chunk === false || $chunk === '') {
320388
if (\feof($socket)) {
@@ -329,12 +397,7 @@ private function readPacket($socket): array
329397
continue;
330398
}
331399

332-
$buffer .= $chunk;
333-
334-
$packet = MQTT::decodePacket($buffer);
335-
if ($packet !== null) {
336-
return $packet;
337-
}
400+
$this->readBuffer .= $chunk;
338401
}
339402
}
340403

src/Utopia/Messaging/Helpers/MQTT.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ public static function encodeConnect(
7575
$flags |= 0x02;
7676
}
7777
if ($password !== null) {
78+
if ($username === null) {
79+
throw new \InvalidArgumentException('MQTT 5 §3.1.2.9 forbids setting a password without a username.');
80+
}
7881
$flags |= 0x40;
7982
}
8083
if ($username !== null) {
@@ -163,11 +166,15 @@ public static function encodePublish(
163166
?int $packetId = null,
164167
array $properties = []
165168
): string {
169+
if ($qos < 0 || $qos > 2) {
170+
throw new \InvalidArgumentException("MQTT QoS must be 0, 1, or 2 ({$qos} given)");
171+
}
172+
166173
$flags = 0;
167174
if ($dup) {
168175
$flags |= 0x08;
169176
}
170-
$flags |= ($qos & 0x03) << 1;
177+
$flags |= $qos << 1;
171178
if ($retain) {
172179
$flags |= 0x01;
173180
}

tests/Messaging/Adapter/Push/AppwriteTest.php

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,46 @@ public function testSendPublishesToDeviceTopicAndCountsAck(): void
5858
}
5959
}
6060

61+
public function testPipelinesPublishesToManyDevices(): void
62+
{
63+
$tokens = [];
64+
for ($i = 0; $i < 64; $i++) {
65+
$tokens[] = "device-{$i}";
66+
}
67+
68+
$broker = $this->startBroker($tokens);
69+
70+
try {
71+
$adapter = new Appwrite(
72+
endpoint: '127.0.0.1:' . $broker['port'],
73+
signingKey: self::SIGNING_KEY,
74+
tls: false,
75+
);
76+
77+
$message = new Push(
78+
to: $tokens,
79+
title: 'Burst',
80+
body: 'Pipeline test',
81+
);
82+
83+
$response = $adapter->send($message);
84+
85+
$this->assertSame(\count($tokens), $response['deliveredTo']);
86+
$this->assertCount(\count($tokens), $response['results']);
87+
88+
$captured = $this->stopBroker($broker);
89+
$this->assertCount(\count($tokens), $captured['publishes']);
90+
91+
$seenTopics = \array_map(fn ($p) => $p['topic'], $captured['publishes']);
92+
\sort($seenTopics);
93+
$expectedTopics = \array_map(fn ($t) => 'appwrite/push/' . $t, $tokens);
94+
\sort($expectedTopics);
95+
$this->assertSame($expectedTopics, $seenTopics);
96+
} finally {
97+
$this->stopBroker($broker, suppress: true);
98+
}
99+
}
100+
61101
public function testReportsExpiredTokenOnBrokerReasonCode(): void
62102
{
63103
$broker = $this->startBroker(['live-token'], rejectTokens: ['stale-token']);

0 commit comments

Comments
 (0)