diff --git a/Dockerfile b/Dockerfile index 5bc1e5e3..3a6814da 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,6 +19,16 @@ FROM php:8.3.11-cli-alpine3.20 WORKDIR /usr/local/src/ +RUN apk add --no-cache --virtual .build-deps \ + $PHPIZE_DEPS \ + linux-headers \ + openssl-dev \ + brotli-dev \ + && pecl install swoole \ + && docker-php-ext-enable swoole \ + && apk del .build-deps \ + && apk add --no-cache libstdc++ brotli-libs + COPY --from=composer /usr/local/src/vendor /usr/local/src/vendor COPY . /usr/local/src/ diff --git a/src/Utopia/Messaging/Adapter/Push/Appwrite.php b/src/Utopia/Messaging/Adapter/Push/Appwrite.php new file mode 100644 index 00000000..48ca64d2 --- /dev/null +++ b/src/Utopia/Messaging/Adapter/Push/Appwrite.php @@ -0,0 +1,442 @@ +serverId === '') { + $this->serverId = self::SERVER_CLIENT_PREFIX . '-' . \bin2hex(\random_bytes(6)); + } + } + + public function getName(): string + { + return static::NAME; + } + + public function getMaxMessagesPerRequest(): int + { + return 5000; + } + + /** + * {@inheritdoc} + */ + protected function process(PushMessage $message): array + { + $payload = $this->buildPayload($message); + $expiry = $this->resolveExpiry($message); + + $response = new Response($this->getType()); + + $socket = $this->connect(); + + try { + $this->handshake($socket); + $this->pipelinedPublish($socket, $message->getTo(), $payload, $expiry, $response); + + try { + $this->write($socket, MQTT::encodeDisconnect()); + } catch (\Throwable) { + // Best effort; some brokers may have already closed the socket. + } + } finally { + $this->close($socket); + } + + return $response->toArray(); + } + + /** + * Pipelined PUBLISH/PUBACK loop. + * + * Sends up to `receiveMaximum` PUBLISH packets without waiting for an + * acknowledgment, then drains PUBACKs as they arrive, matching each by + * packet id. Refills the in-flight window after each ack until every + * device has been sent. This keeps throughput proportional to socket + * bandwidth rather than to network RTT — important when fanning out to + * thousands of devices per request. + * + * @param resource $socket + * @param array $tokens + */ + private function pipelinedPublish($socket, array $tokens, string $payload, int $expiry, Response $response): void + { + $inflight = []; + $cursor = 0; + $total = \count($tokens); + + while ($cursor < $total || !empty($inflight)) { + while ($cursor < $total && \count($inflight) < $this->receiveMaximum) { + $token = $tokens[$cursor++]; + $packetId = $this->nextPacketId(); + + try { + $packet = MQTT::encodePublish( + topic: $this->topicForToken($token), + payload: $payload, + qos: 1, + retain: false, + dup: false, + packetId: $packetId, + properties: [ + 'messageExpiryInterval' => $expiry, + 'contentType' => 'application/json', + ], + ); + $this->write($socket, $packet); + $inflight[$packetId] = $token; + } catch (\Throwable $error) { + $response->addResult($token, $error->getMessage()); + } + } + + if (empty($inflight)) { + continue; + } + + try { + $ack = $this->readPacket($socket); + } catch (\Throwable $error) { + foreach ($inflight as $token) { + $response->addResult($token, $error->getMessage()); + } + return; + } + + if ($ack['type'] !== MQTT::PACKET_PUBACK) { + continue; + } + + $parsed = MQTT::parsePuback($ack['payload']); + $token = $inflight[$parsed['packetId']] ?? null; + if ($token === null) { + continue; + } + unset($inflight[$parsed['packetId']]); + + if ($parsed['reasonCode'] !== MQTT::REASON_SUCCESS) { + $response->addResult($token, $this->errorForReasonCode($parsed['reasonCode'])); + continue; + } + + $response->incrementDeliveredTo(); + $response->addResult($token); + } + } + + /** + * Build a single payload that the device runtime can render. Mirrors the + * shape exposed to FCM/APNS so SDK consumers see a consistent envelope. + * + * @return string JSON-encoded payload + */ + private function buildPayload(PushMessage $message): string + { + $envelope = []; + + if ($message->getTitle() !== null) { + $envelope['notification']['title'] = $message->getTitle(); + } + if ($message->getBody() !== null) { + $envelope['notification']['body'] = $message->getBody(); + } + if ($message->getImage() !== null) { + $envelope['notification']['image'] = $message->getImage(); + } + if ($message->getIcon() !== null) { + $envelope['notification']['icon'] = $message->getIcon(); + } + if ($message->getColor() !== null) { + $envelope['notification']['color'] = $message->getColor(); + } + if ($message->getSound() !== null) { + $envelope['notification']['sound'] = $message->getSound(); + } + if ($message->getTag() !== null) { + $envelope['notification']['tag'] = $message->getTag(); + } + if ($message->getBadge() !== null) { + $envelope['notification']['badge'] = $message->getBadge(); + } + if ($message->getAction() !== null) { + $envelope['notification']['action'] = $message->getAction(); + } + if ($message->getContentAvailable() !== null) { + $envelope['notification']['contentAvailable'] = (bool)$message->getContentAvailable(); + } + if ($message->getCritical() !== null) { + $envelope['notification']['critical'] = (bool)$message->getCritical(); + } + if ($message->getData() !== null) { + $envelope['data'] = $message->getData(); + } + if ($message->getPriority() !== null) { + $envelope['priority'] = match ($message->getPriority()) { + Priority::HIGH => 'high', + Priority::NORMAL => 'normal', + }; + } + + $json = \json_encode($envelope, JSON_UNESCAPED_SLASHES); + if ($json === false) { + throw new \RuntimeException('Failed to encode push payload: ' . \json_last_error_msg()); + } + + return $json; + } + + private function resolveExpiry(PushMessage $message): int + { + if (\method_exists($message, 'getMessageExpiry')) { + $expiry = $message->getMessageExpiry(); + if (\is_int($expiry) && $expiry > 0) { + return $expiry; + } + } + + return $this->messageExpiry; + } + + private function topicForToken(string $token): string + { + return self::TOPIC_PREFIX . '/' . $token; + } + + private function nextPacketId(): int + { + $this->packetId = ($this->packetId + 1) & 0xFFFF; + if ($this->packetId === 0) { + $this->packetId = 1; + } + + return $this->packetId; + } + + /** + * @return resource + */ + private function connect() + { + $url = $this->resolveEndpoint(); + $context = \stream_context_create([ + 'ssl' => [ + 'verify_peer' => true, + 'verify_peer_name' => true, + 'SNI_enabled' => true, + ], + ]); + + $socket = @\stream_socket_client( + $url, + $errno, + $errstr, + self::CONNECT_TIMEOUT, + STREAM_CLIENT_CONNECT, + $context, + ); + + if (!$socket) { + throw new \RuntimeException("Unable to connect to Appwrite Push broker at {$url}: {$errstr} (errno {$errno})"); + } + + \stream_set_timeout($socket, self::READ_TIMEOUT); + + return $socket; + } + + private function resolveEndpoint(): string + { + $endpoint = \rtrim($this->endpoint); + if ($endpoint === '') { + throw new \RuntimeException('Appwrite Push endpoint is not configured'); + } + + $scheme = $this->tls ? 'tls' : 'tcp'; + + if (\str_contains($endpoint, '://')) { + $parts = \parse_url($endpoint); + $host = $parts['host'] ?? ''; + $port = $parts['port'] ?? ($this->tls ? 8883 : 1883); + + return "{$scheme}://{$host}:{$port}"; + } + + if (\str_contains($endpoint, ':')) { + return "{$scheme}://{$endpoint}"; + } + + $port = $this->tls ? 8883 : 1883; + + return "{$scheme}://{$endpoint}:{$port}"; + } + + /** + * @param resource $socket + */ + private function handshake($socket): void + { + $token = $this->issueServerJwt(); + $packet = MQTT::encodeConnect( + clientId: $this->serverId, + username: 'server', + password: $token, + keepAlive: self::KEEP_ALIVE, + cleanStart: true, + properties: [ + 'sessionExpiryInterval' => 0, + ], + ); + + $this->write($socket, $packet); + + $response = $this->readPacket($socket); + if ($response['type'] !== MQTT::PACKET_CONNACK) { + throw new \RuntimeException('Broker did not respond with CONNACK'); + } + + $connack = MQTT::parseConnack($response['payload']); + if ($connack['reasonCode'] !== MQTT::REASON_SUCCESS) { + throw new \RuntimeException("Broker rejected CONNECT (reason {$connack['reasonCode']})"); + } + + $brokerLimit = (int)($connack['properties']['receiveMaximum'] ?? 0); + if ($brokerLimit > 0) { + $this->receiveMaximum = \min($this->receiveMaximum, $brokerLimit); + } + } + + private function issueServerJwt(): string + { + $now = \time(); + $claims = [ + 'iss' => 'appwrite', + 'sub' => $this->serverId, + 'iat' => $now, + 'exp' => $now + self::JWT_TTL, + 'scope' => self::JWT_SCOPE, + ]; + + return JWT::encode($claims, $this->signingKey, self::JWT_ALGORITHM); + } + + /** + * @param resource $socket + * @return array{type: int, flags: int, payload: string} + */ + private function readPacket($socket): array + { + while (true) { + $packet = MQTT::decodePacket($this->readBuffer); + if ($packet !== null) { + return $packet; + } + + $chunk = @\fread($socket, 4096); + if ($chunk === false || $chunk === '') { + if (\feof($socket)) { + throw new \RuntimeException('Broker closed the connection'); + } + + $info = \stream_get_meta_data($socket); + if (!empty($info['timed_out'])) { + throw new \RuntimeException('Broker read timeout'); + } + + continue; + } + + $this->readBuffer .= $chunk; + } + } + + /** + * @param resource $socket + */ + private function write($socket, string $bytes): void + { + $length = \strlen($bytes); + $written = 0; + + while ($written < $length) { + $result = @\fwrite($socket, \substr($bytes, $written)); + if ($result === false || $result === 0) { + throw new \RuntimeException('Failed to write to broker socket'); + } + $written += $result; + } + } + + /** + * @param resource $socket + */ + private function close($socket): void + { + if (\is_resource($socket)) { + @\fclose($socket); + } + } + + private function errorForReasonCode(int $code): string + { + return match ($code) { + 0x10 => $this->getExpiredErrorMessage(), // No matching subscribers + 0x90 => 'Topic name invalid', + 0x97 => 'Quota exceeded', + 0x99 => 'Payload format invalid', + 0x87 => 'Not authorized', + default => "Broker returned reason code 0x" . \dechex($code), + }; + } +} diff --git a/src/Utopia/Messaging/Helpers/MQTT.php b/src/Utopia/Messaging/Helpers/MQTT.php new file mode 100644 index 00000000..c066166f --- /dev/null +++ b/src/Utopia/Messaging/Helpers/MQTT.php @@ -0,0 +1,625 @@ + $properties Extra connection properties (session expiry, etc.). + */ + public static function encodeConnect( + string $clientId, + ?string $username = null, + ?string $password = null, + int $keepAlive = 60, + bool $cleanStart = true, + array $properties = [] + ): string { + $variable = self::encodeString(self::PROTOCOL_NAME); + $variable .= \chr(self::PROTOCOL_VERSION); + + $flags = 0; + if ($cleanStart) { + $flags |= 0x02; + } + if ($password !== null) { + if ($username === null) { + throw new \InvalidArgumentException('MQTT 5 §3.1.2.9 forbids setting a password without a username.'); + } + $flags |= 0x40; + } + if ($username !== null) { + $flags |= 0x80; + } + + $variable .= \chr($flags); + $variable .= \pack('n', $keepAlive); + + $props = ''; + if (isset($properties['sessionExpiryInterval'])) { + $props .= \chr(self::PROPERTY_SESSION_EXPIRY) . \pack('N', (int)$properties['sessionExpiryInterval']); + } + if (isset($properties['authenticationMethod'])) { + $props .= \chr(self::PROPERTY_AUTHENTICATION_METHOD) . self::encodeString((string)$properties['authenticationMethod']); + } + if (isset($properties['authenticationData'])) { + $props .= \chr(self::PROPERTY_AUTHENTICATION_DATA) . self::encodeBinary((string)$properties['authenticationData']); + } + + $variable .= self::encodeVariableByteInteger(\strlen($props)) . $props; + + $payload = self::encodeString($clientId); + if ($username !== null) { + $payload .= self::encodeString($username); + } + if ($password !== null) { + $payload .= self::encodeBinary($password); + } + + return self::buildPacket(self::PACKET_CONNECT, 0, $variable . $payload); + } + + /** + * Encode a CONNACK packet. + * + * @param array $properties Optional server-keepalive/assigned-client-id etc. + */ + public static function encodeConnack(int $reasonCode, bool $sessionPresent = false, array $properties = []): string + { + $variable = \chr($sessionPresent ? 0x01 : 0x00); + $variable .= \chr($reasonCode); + + $props = ''; + if (isset($properties['serverKeepAlive'])) { + $props .= \chr(self::PROPERTY_SERVER_KEEP_ALIVE) . \pack('n', (int)$properties['serverKeepAlive']); + } + if (isset($properties['assignedClientId'])) { + $props .= \chr(self::PROPERTY_ASSIGNED_CLIENT_ID) . self::encodeString((string)$properties['assignedClientId']); + } + if (isset($properties['reasonString'])) { + $props .= \chr(self::PROPERTY_REASON_STRING) . self::encodeString((string)$properties['reasonString']); + } + if (isset($properties['maximumQoS'])) { + $props .= \chr(self::PROPERTY_MAXIMUM_QOS) . \chr((int)$properties['maximumQoS']); + } + if (isset($properties['retainAvailable'])) { + $props .= \chr(self::PROPERTY_RETAIN_AVAILABLE) . \chr($properties['retainAvailable'] ? 1 : 0); + } + if (isset($properties['receiveMaximum'])) { + $props .= \chr(self::PROPERTY_RECEIVE_MAXIMUM) . \pack('n', (int)$properties['receiveMaximum']); + } + if (isset($properties['wildcardSubscriptionAvailable'])) { + $props .= \chr(self::PROPERTY_WILDCARD_SUBSCRIPTION_AVAILABLE) . \chr($properties['wildcardSubscriptionAvailable'] ? 1 : 0); + } + if (isset($properties['sharedSubscriptionAvailable'])) { + $props .= \chr(self::PROPERTY_SHARED_SUBSCRIPTION_AVAILABLE) . \chr($properties['sharedSubscriptionAvailable'] ? 1 : 0); + } + + $variable .= self::encodeVariableByteInteger(\strlen($props)) . $props; + + return self::buildPacket(self::PACKET_CONNACK, 0, $variable); + } + + /** + * Encode a PUBLISH packet. + * + * @param array $properties Optional message expiry, content type, etc. + */ + public static function encodePublish( + string $topic, + string $payload, + int $qos = 0, + bool $retain = false, + bool $dup = false, + ?int $packetId = null, + array $properties = [] + ): string { + if ($qos < 0 || $qos > 2) { + throw new \InvalidArgumentException("MQTT QoS must be 0, 1, or 2 ({$qos} given)"); + } + + $flags = 0; + if ($dup) { + $flags |= 0x08; + } + $flags |= $qos << 1; + if ($retain) { + $flags |= 0x01; + } + + $variable = self::encodeString($topic); + if ($qos > 0) { + if ($packetId === null) { + throw new \InvalidArgumentException('packetId is required for QoS > 0'); + } + $variable .= \pack('n', $packetId); + } + + $props = ''; + if (isset($properties['messageExpiryInterval'])) { + $props .= \chr(self::PROPERTY_MESSAGE_EXPIRY) . \pack('N', (int)$properties['messageExpiryInterval']); + } + if (isset($properties['contentType'])) { + $props .= \chr(self::PROPERTY_CONTENT_TYPE) . self::encodeString((string)$properties['contentType']); + } + if (isset($properties['correlationData'])) { + $props .= \chr(self::PROPERTY_CORRELATION_DATA) . self::encodeBinary((string)$properties['correlationData']); + } + if (isset($properties['responseTopic'])) { + $props .= \chr(self::PROPERTY_RESPONSE_TOPIC) . self::encodeString((string)$properties['responseTopic']); + } + foreach ($properties['userProperties'] ?? [] as $key => $value) { + $props .= \chr(self::PROPERTY_USER_PROPERTY) . self::encodeString((string)$key) . self::encodeString((string)$value); + } + + $variable .= self::encodeVariableByteInteger(\strlen($props)) . $props; + + return self::buildPacket(self::PACKET_PUBLISH, $flags, $variable . $payload); + } + + /** + * Encode a PUBACK packet. + */ + public static function encodePuback(int $packetId, int $reasonCode = self::REASON_SUCCESS): string + { + $variable = \pack('n', $packetId); + $variable .= \chr($reasonCode); + $variable .= \chr(0); + + return self::buildPacket(self::PACKET_PUBACK, 0, $variable); + } + + /** + * Encode a SUBACK packet. + * + * @param array $reasonCodes One reason code per topic filter in the SUBSCRIBE. + */ + public static function encodeSuback(int $packetId, array $reasonCodes): string + { + $variable = \pack('n', $packetId); + $variable .= \chr(0); + foreach ($reasonCodes as $code) { + $variable .= \chr($code); + } + + return self::buildPacket(self::PACKET_SUBACK, 0, $variable); + } + + /** + * Encode a PINGRESP packet. + */ + public static function encodePingresp(): string + { + return self::buildPacket(self::PACKET_PINGRESP, 0, ''); + } + + /** + * Encode a PINGREQ packet. + */ + public static function encodePingreq(): string + { + return self::buildPacket(self::PACKET_PINGREQ, 0, ''); + } + + /** + * Encode a DISCONNECT packet. + */ + public static function encodeDisconnect(int $reasonCode = self::REASON_SUCCESS): string + { + if ($reasonCode === self::REASON_SUCCESS) { + return self::buildPacket(self::PACKET_DISCONNECT, 0, ''); + } + + $variable = \chr($reasonCode) . \chr(0); + + return self::buildPacket(self::PACKET_DISCONNECT, 0, $variable); + } + + /** + * Decode a single MQTT control packet from a buffer. + * + * Returns null if the buffer does not yet contain a full packet. On success + * advances the &$buffer past the consumed bytes and returns the parsed packet. + * + * @return array{type: int, flags: int, payload: string}|null + */ + public static function decodePacket(string &$buffer): ?array + { + $length = \strlen($buffer); + if ($length < 2) { + return null; + } + + $firstByte = \ord($buffer[0]); + $type = ($firstByte >> 4) & 0x0F; + $flags = $firstByte & 0x0F; + + $offset = 1; + $remaining = self::readVariableByteInteger($buffer, $offset); + if ($remaining === null) { + return null; + } + + $total = $offset + $remaining; + if ($length < $total) { + return null; + } + + $payload = \substr($buffer, $offset, $remaining); + $buffer = \substr($buffer, $total); + + return [ + 'type' => $type, + 'flags' => $flags, + 'payload' => $payload, + ]; + } + + /** + * Parse a CONNECT packet body (the bytes after the fixed header). + * + * @return array{ + * protocol: string, + * version: int, + * flags: int, + * keepAlive: int, + * clientId: string, + * username: ?string, + * password: ?string, + * properties: array, + * cleanStart: bool + * } + */ + public static function parseConnect(string $payload): array + { + $offset = 0; + $protocol = self::readString($payload, $offset); + $version = \ord($payload[$offset++]); + $flags = \ord($payload[$offset++]); + $keepAlive = \unpack('n', \substr($payload, $offset, 2))[1]; + $offset += 2; + + $propLen = self::readVariableByteInteger($payload, $offset); + $props = self::readProperties(\substr($payload, $offset, $propLen)); + $offset += $propLen; + + $clientId = self::readString($payload, $offset); + + $username = null; + $password = null; + + if ($flags & 0x80) { + $username = self::readString($payload, $offset); + } + if ($flags & 0x40) { + $password = self::readBinary($payload, $offset); + } + + return [ + 'protocol' => $protocol, + 'version' => $version, + 'flags' => $flags, + 'cleanStart' => (bool)($flags & 0x02), + 'keepAlive' => $keepAlive, + 'clientId' => $clientId, + 'username' => $username, + 'password' => $password, + 'properties' => $props, + ]; + } + + /** + * Parse a PUBLISH packet body. $flags is the lower nibble of the fixed header. + * + * @return array{ + * topic: string, + * payload: string, + * qos: int, + * retain: bool, + * dup: bool, + * packetId: ?int, + * properties: array + * } + */ + public static function parsePublish(string $payload, int $flags): array + { + $qos = ($flags >> 1) & 0x03; + $retain = (bool)($flags & 0x01); + $dup = (bool)($flags & 0x08); + + $offset = 0; + $topic = self::readString($payload, $offset); + + $packetId = null; + if ($qos > 0) { + $packetId = \unpack('n', \substr($payload, $offset, 2))[1]; + $offset += 2; + } + + $propLen = self::readVariableByteInteger($payload, $offset); + $props = self::readProperties(\substr($payload, $offset, $propLen)); + $offset += $propLen; + + return [ + 'topic' => $topic, + 'payload' => \substr($payload, $offset), + 'qos' => $qos, + 'retain' => $retain, + 'dup' => $dup, + 'packetId' => $packetId, + 'properties' => $props, + ]; + } + + /** + * Parse a SUBSCRIBE packet body. + * + * @return array{ + * packetId: int, + * filters: array + * } + */ + public static function parseSubscribe(string $payload): array + { + $offset = 0; + $packetId = \unpack('n', \substr($payload, $offset, 2))[1]; + $offset += 2; + + $propLen = self::readVariableByteInteger($payload, $offset); + $offset += $propLen; + + $filters = []; + while ($offset < \strlen($payload)) { + $topic = self::readString($payload, $offset); + $options = \ord($payload[$offset++]); + $filters[] = [ + 'topic' => $topic, + 'qos' => $options & 0x03, + 'noLocal' => (bool)($options & 0x04), + 'retainAsPublished' => (bool)($options & 0x08), + 'retainHandling' => ($options >> 4) & 0x03, + ]; + } + + return [ + 'packetId' => $packetId, + 'filters' => $filters, + ]; + } + + /** + * Parse a CONNACK packet body. + * + * @return array{sessionPresent: bool, reasonCode: int, properties: array} + */ + public static function parseConnack(string $payload): array + { + $sessionPresent = (bool)(\ord($payload[0]) & 0x01); + $reasonCode = \ord($payload[1]); + $offset = 2; + $propLen = self::readVariableByteInteger($payload, $offset); + $props = self::readProperties(\substr($payload, $offset, $propLen)); + + return [ + 'sessionPresent' => $sessionPresent, + 'reasonCode' => $reasonCode, + 'properties' => $props, + ]; + } + + /** + * Parse a PUBACK packet body. + * + * @return array{packetId: int, reasonCode: int} + */ + public static function parsePuback(string $payload): array + { + $packetId = \unpack('n', \substr($payload, 0, 2))[1]; + $reasonCode = \strlen($payload) > 2 ? \ord($payload[2]) : self::REASON_SUCCESS; + + return [ + 'packetId' => $packetId, + 'reasonCode' => $reasonCode, + ]; + } + + private static function buildPacket(int $type, int $flags, string $body): string + { + $header = \chr((($type & 0x0F) << 4) | ($flags & 0x0F)); + + return $header . self::encodeVariableByteInteger(\strlen($body)) . $body; + } + + private static function encodeString(string $value): string + { + $length = \strlen($value); + if ($length > 0xFFFF) { + throw new \InvalidArgumentException("MQTT string exceeds 65535 byte limit ({$length} given)"); + } + + return \pack('n', $length) . $value; + } + + private static function encodeBinary(string $value): string + { + $length = \strlen($value); + if ($length > 0xFFFF) { + throw new \InvalidArgumentException("MQTT binary exceeds 65535 byte limit ({$length} given)"); + } + + return \pack('n', $length) . $value; + } + + private static function encodeVariableByteInteger(int $value): string + { + if ($value < 0 || $value > 268435455) { + throw new \InvalidArgumentException('Variable byte integer out of range.'); + } + + $bytes = ''; + do { + $byte = $value & 0x7F; + $value >>= 7; + if ($value > 0) { + $byte |= 0x80; + } + $bytes .= \chr($byte); + } while ($value > 0); + + return $bytes; + } + + private static function readVariableByteInteger(string $buffer, int &$offset): ?int + { + $multiplier = 1; + $value = 0; + $length = \strlen($buffer); + $start = $offset; + + do { + if ($offset >= $length) { + $offset = $start; + return null; + } + $byte = \ord($buffer[$offset++]); + $value += ($byte & 0x7F) * $multiplier; + if ($multiplier > 128 * 128 * 128) { + throw new \RuntimeException('Malformed variable byte integer'); + } + $multiplier *= 128; + } while (($byte & 0x80) !== 0); + + return $value; + } + + private static function readString(string $buffer, int &$offset): string + { + $len = \unpack('n', \substr($buffer, $offset, 2))[1]; + $offset += 2; + $value = \substr($buffer, $offset, $len); + $offset += $len; + + return $value; + } + + private static function readBinary(string $buffer, int &$offset): string + { + return self::readString($buffer, $offset); + } + + /** + * @return array + */ + private static function readProperties(string $buffer): array + { + $offset = 0; + $length = \strlen($buffer); + $properties = []; + + while ($offset < $length) { + $identifier = \ord($buffer[$offset++]); + + switch ($identifier) { + case self::PROPERTY_MESSAGE_EXPIRY: + $properties['messageExpiryInterval'] = \unpack('N', \substr($buffer, $offset, 4))[1]; + $offset += 4; + break; + case self::PROPERTY_CONTENT_TYPE: + $properties['contentType'] = self::readString($buffer, $offset); + break; + case self::PROPERTY_RESPONSE_TOPIC: + $properties['responseTopic'] = self::readString($buffer, $offset); + break; + case self::PROPERTY_CORRELATION_DATA: + $properties['correlationData'] = self::readBinary($buffer, $offset); + break; + case self::PROPERTY_SESSION_EXPIRY: + $properties['sessionExpiryInterval'] = \unpack('N', \substr($buffer, $offset, 4))[1]; + $offset += 4; + break; + case self::PROPERTY_RECEIVE_MAXIMUM: + $properties['receiveMaximum'] = \unpack('n', \substr($buffer, $offset, 2))[1]; + $offset += 2; + break; + case self::PROPERTY_AUTHENTICATION_METHOD: + $properties['authenticationMethod'] = self::readString($buffer, $offset); + break; + case self::PROPERTY_AUTHENTICATION_DATA: + $properties['authenticationData'] = self::readBinary($buffer, $offset); + break; + case self::PROPERTY_USER_PROPERTY: + $key = self::readString($buffer, $offset); + $value = self::readString($buffer, $offset); + $properties['userProperties'][$key] = $value; + break; + case self::PROPERTY_TOPIC_ALIAS_MAXIMUM: + $properties['topicAliasMaximum'] = \unpack('n', \substr($buffer, $offset, 2))[1]; + $offset += 2; + break; + case self::PROPERTY_SERVER_KEEP_ALIVE: + $properties['serverKeepAlive'] = \unpack('n', \substr($buffer, $offset, 2))[1]; + $offset += 2; + break; + case self::PROPERTY_REASON_STRING: + $properties['reasonString'] = self::readString($buffer, $offset); + break; + default: + return $properties; + } + } + + return $properties; + } +} diff --git a/tests/Messaging/Adapter/Push/AppwriteTest.php b/tests/Messaging/Adapter/Push/AppwriteTest.php new file mode 100644 index 00000000..4d927667 --- /dev/null +++ b/tests/Messaging/Adapter/Push/AppwriteTest.php @@ -0,0 +1,225 @@ +startBroker(['device-token-1', 'device-token-2']); + + try { + $adapter = new Appwrite( + endpoint: '127.0.0.1:' . $broker['port'], + signingKey: self::SIGNING_KEY, + tls: false, + ); + + $message = new Push( + to: ['device-token-1', 'device-token-2'], + title: 'Hi', + body: 'Hello', + data: ['k' => 'v'], + ); + + $response = $adapter->send($message); + + $this->assertSame(2, $response['deliveredTo']); + $this->assertSame('push', $response['type']); + $this->assertCount(2, $response['results']); + + foreach ($response['results'] as $result) { + $this->assertSame('success', $result['status']); + $this->assertSame('', $result['error']); + } + + $captured = $this->stopBroker($broker); + + $this->assertCount(2, $captured['publishes']); + $this->assertSame('appwrite/push/device-token-1', $captured['publishes'][0]['topic']); + $this->assertSame('appwrite/push/device-token-2', $captured['publishes'][1]['topic']); + + $decoded = \json_decode($captured['publishes'][0]['payload'], true); + $this->assertSame('Hi', $decoded['notification']['title']); + $this->assertSame('Hello', $decoded['notification']['body']); + $this->assertSame(['k' => 'v'], $decoded['data']); + + $this->assertSame('server', $captured['connect']['username']); + $this->assertNotEmpty($captured['connect']['password']); + $this->assertStringStartsWith('appwrite-server-', $captured['connect']['clientId']); + } finally { + $this->stopBroker($broker, suppress: true); + } + } + + public function testPipelinesPublishesToManyDevices(): void + { + $tokens = []; + for ($i = 0; $i < 64; $i++) { + $tokens[] = "device-{$i}"; + } + + $broker = $this->startBroker($tokens); + + try { + $adapter = new Appwrite( + endpoint: '127.0.0.1:' . $broker['port'], + signingKey: self::SIGNING_KEY, + tls: false, + ); + + $message = new Push( + to: $tokens, + title: 'Burst', + body: 'Pipeline test', + ); + + $response = $adapter->send($message); + + $this->assertSame(\count($tokens), $response['deliveredTo']); + $this->assertCount(\count($tokens), $response['results']); + + $captured = $this->stopBroker($broker); + $this->assertCount(\count($tokens), $captured['publishes']); + + $seenTopics = \array_map(fn ($p) => $p['topic'], $captured['publishes']); + \sort($seenTopics); + $expectedTopics = \array_map(fn ($t) => 'appwrite/push/' . $t, $tokens); + \sort($expectedTopics); + $this->assertSame($expectedTopics, $seenTopics); + } finally { + $this->stopBroker($broker, suppress: true); + } + } + + public function testReportsExpiredTokenOnBrokerReasonCode(): void + { + $broker = $this->startBroker(['live-token'], rejectTokens: ['stale-token']); + + try { + $adapter = new Appwrite( + endpoint: '127.0.0.1:' . $broker['port'], + signingKey: self::SIGNING_KEY, + tls: false, + ); + + $message = new Push( + to: ['live-token', 'stale-token'], + title: 'Hi', + body: 'Hello', + ); + + $response = $adapter->send($message); + + $this->assertSame(1, $response['deliveredTo']); + $this->assertSame('success', $response['results'][0]['status']); + $this->assertSame('live-token', $response['results'][0]['recipient']); + $this->assertSame('failure', $response['results'][1]['status']); + $this->assertSame('stale-token', $response['results'][1]['recipient']); + $this->assertSame('Expired device token', $response['results'][1]['error']); + } finally { + $this->stopBroker($broker, suppress: true); + } + } + + /** + * @param array $expectTokens + * @param array $rejectTokens + * @return array{port: int, process: resource, captured: string} + */ + private function startBroker(array $expectTokens, array $rejectTokens = []): array + { + $port = $this->pickFreePort(); + $capturePath = \sys_get_temp_dir() . '/appwrite-push-broker-' . \uniqid() . '.json'; + $stateFile = \sys_get_temp_dir() . '/appwrite-push-broker-state-' . \uniqid() . '.json'; + + \file_put_contents($stateFile, \json_encode([ + 'expect' => $expectTokens, + 'reject' => $rejectTokens, + ])); + + $brokerScript = __DIR__ . '/FakeBroker.php'; + + $process = \proc_open( + [PHP_BINARY, $brokerScript, (string)$port, $capturePath, $stateFile], + [ + 0 => ['pipe', 'r'], + 1 => ['file', '/dev/null', 'a'], + 2 => ['file', '/dev/null', 'a'], + ], + $pipes, + ); + + if (!\is_resource($process)) { + $this->fail('Could not start fake broker process'); + } + + $deadline = \microtime(true) + 3; + while (\microtime(true) < $deadline) { + $probe = @\fsockopen('127.0.0.1', $port, $errno, $errstr, 0.1); + if (\is_resource($probe)) { + \fclose($probe); + return [ + 'port' => $port, + 'process' => $process, + 'captured' => $capturePath, + ]; + } + \usleep(50000); + } + + \proc_terminate($process); + \proc_close($process); + $this->fail("Broker on port {$port} did not come up in time"); + } + + /** + * @param array{port: int, process: resource, captured: string} $broker + * @return array{publishes: array, connect: array} + */ + private function stopBroker(array $broker, bool $suppress = false): array + { + if (\is_resource($broker['process'])) { + \proc_terminate($broker['process'], SIGTERM); + $deadline = \microtime(true) + 1; + while (\microtime(true) < $deadline) { + $status = \proc_get_status($broker['process']); + if (!$status['running']) { + break; + } + \usleep(25000); + } + \proc_close($broker['process']); + } + + if (!\file_exists($broker['captured'])) { + if ($suppress) { + return ['publishes' => [], 'connect' => []]; + } + $this->fail("Broker capture file missing: {$broker['captured']}"); + } + + $captured = \json_decode(\file_get_contents($broker['captured']), true); + @\unlink($broker['captured']); + + return $captured ?: ['publishes' => [], 'connect' => []]; + } + + private function pickFreePort(): int + { + $sock = \stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr); + if (!$sock) { + $this->fail("Could not bind ephemeral port: {$errstr}"); + } + $name = \stream_socket_get_name($sock, false); + \fclose($sock); + + return (int)\explode(':', $name)[1]; + } +} diff --git a/tests/Messaging/Adapter/Push/FakeBroker.php b/tests/Messaging/Adapter/Push/FakeBroker.php new file mode 100644 index 00000000..bdc993ff --- /dev/null +++ b/tests/Messaging/Adapter/Push/FakeBroker.php @@ -0,0 +1,104 @@ + [], + 'publishes' => [], +]; + +$flush = function () use (&$captured, $capturePath) { + \file_put_contents($capturePath, \json_encode($captured)); +}; +$flush(); + +/** @var array $buffers */ +$buffers = []; + +$server = new Server('127.0.0.1', $port, SWOOLE_BASE, SWOOLE_SOCK_TCP); +$server->set([ + 'worker_num' => 1, + 'max_request' => 0, + 'log_level' => SWOOLE_LOG_ERROR, + 'open_eof_check' => false, + 'open_tcp_nodelay' => true, +]); + +$server->on('start', function () { + Timer::after(15000, function () { + \Swoole\Event::exit(); + }); +}); + +$server->on('close', function (Server $server, int $fd) use (&$buffers, $flush) { + unset($buffers[$fd]); + $flush(); +}); + +$server->on('receive', function (Server $server, int $fd, int $reactorId, string $data) use (&$captured, &$buffers, $rejectTokens, $flush) { + $buffers[$fd] = ($buffers[$fd] ?? '') . $data; + + while (($packet = MQTT::decodePacket($buffers[$fd])) !== null) { + switch ($packet['type']) { + case MQTT::PACKET_CONNECT: + $parsed = MQTT::parseConnect($packet['payload']); + $captured['connect'] = [ + 'clientId' => $parsed['clientId'], + 'username' => (string)$parsed['username'], + 'password' => (string)$parsed['password'], + ]; + $server->send($fd, MQTT::encodeConnack(MQTT::REASON_SUCCESS)); + $flush(); + break; + + case MQTT::PACKET_PUBLISH: + $parsed = MQTT::parsePublish($packet['payload'], $packet['flags']); + $captured['publishes'][] = [ + 'topic' => $parsed['topic'], + 'payload' => $parsed['payload'], + 'qos' => $parsed['qos'], + ]; + + $reason = MQTT::REASON_SUCCESS; + foreach ($rejectTokens as $bad) { + if (\str_ends_with($parsed['topic'], '/' . $bad)) { + $reason = 0x10; + break; + } + } + + if ($parsed['qos'] === 1 && $parsed['packetId'] !== null) { + $server->send($fd, MQTT::encodePuback($parsed['packetId'], $reason)); + } + $flush(); + break; + + case MQTT::PACKET_DISCONNECT: + $server->close($fd); + $flush(); + Timer::after(50, fn () => \Swoole\Event::exit()); + return; + + case MQTT::PACKET_PINGREQ: + $server->send($fd, MQTT::encodePingresp()); + break; + + default: + break; + } + } +}); + +$server->start(); + +$flush(); diff --git a/tests/Messaging/Helpers/MQTTTest.php b/tests/Messaging/Helpers/MQTTTest.php new file mode 100644 index 00000000..01a9cd89 --- /dev/null +++ b/tests/Messaging/Helpers/MQTTTest.php @@ -0,0 +1,198 @@ + 3600], + ); + + $this->assertNotEmpty($packet); + $this->assertSame(MQTT::PACKET_CONNECT, (\ord($packet[0]) >> 4) & 0x0F); + + $decoded = MQTT::decodePacket($packet); + $this->assertNotNull($decoded); + $this->assertSame(MQTT::PACKET_CONNECT, $decoded['type']); + + $parsed = MQTT::parseConnect($decoded['payload']); + $this->assertSame('MQTT', $parsed['protocol']); + $this->assertSame(5, $parsed['version']); + $this->assertSame('device-abc', $parsed['clientId']); + $this->assertSame('server', $parsed['username']); + $this->assertSame('jwt.value.here', $parsed['password']); + $this->assertSame(30, $parsed['keepAlive']); + $this->assertTrue($parsed['cleanStart']); + $this->assertSame(3600, $parsed['properties']['sessionExpiryInterval']); + } + + public function testEncodesAndParsesPublish(): void + { + $payload = '{"notification":{"title":"Hi"}}'; + $packet = MQTT::encodePublish( + topic: 'appwrite/push/device-token-1', + payload: $payload, + qos: 1, + retain: false, + dup: false, + packetId: 17, + properties: [ + 'messageExpiryInterval' => 86400, + 'contentType' => 'application/json', + ], + ); + + $decoded = MQTT::decodePacket($packet); + $this->assertNotNull($decoded); + $this->assertSame(MQTT::PACKET_PUBLISH, $decoded['type']); + + $parsed = MQTT::parsePublish($decoded['payload'], $decoded['flags']); + $this->assertSame('appwrite/push/device-token-1', $parsed['topic']); + $this->assertSame($payload, $parsed['payload']); + $this->assertSame(1, $parsed['qos']); + $this->assertSame(17, $parsed['packetId']); + $this->assertSame(86400, $parsed['properties']['messageExpiryInterval']); + $this->assertSame('application/json', $parsed['properties']['contentType']); + } + + public function testEncodesAndParsesConnack(): void + { + $packet = MQTT::encodeConnack(MQTT::REASON_SUCCESS, sessionPresent: false, properties: ['serverKeepAlive' => 60]); + $decoded = MQTT::decodePacket($packet); + $this->assertNotNull($decoded); + $this->assertSame(MQTT::PACKET_CONNACK, $decoded['type']); + + $parsed = MQTT::parseConnack($decoded['payload']); + $this->assertSame(MQTT::REASON_SUCCESS, $parsed['reasonCode']); + $this->assertFalse($parsed['sessionPresent']); + $this->assertSame(60, $parsed['properties']['serverKeepAlive']); + } + + public function testEncodesAndParsesPuback(): void + { + $packet = MQTT::encodePuback(42, MQTT::REASON_SUCCESS); + $decoded = MQTT::decodePacket($packet); + $this->assertNotNull($decoded); + $this->assertSame(MQTT::PACKET_PUBACK, $decoded['type']); + + $parsed = MQTT::parsePuback($decoded['payload']); + $this->assertSame(42, $parsed['packetId']); + $this->assertSame(MQTT::REASON_SUCCESS, $parsed['reasonCode']); + } + + public function testEncodesPingreqAndPingresp(): void + { + $req = MQTT::encodePingreq(); + $resp = MQTT::encodePingresp(); + + $decodedReq = MQTT::decodePacket($req); + $decodedResp = MQTT::decodePacket($resp); + + $this->assertSame(MQTT::PACKET_PINGREQ, $decodedReq['type']); + $this->assertSame(MQTT::PACKET_PINGRESP, $decodedResp['type']); + } + + public function testDecodeReturnsNullForPartialBuffer(): void + { + $packet = MQTT::encodePublish('topic', 'body', 0, false, false, null); + $partial = \substr($packet, 0, 1); + + $buffer = $partial; + $this->assertNull(MQTT::decodePacket($buffer)); + $this->assertSame($partial, $buffer); + } + + public function testDecodeConsumesExactlyOnePacketFromConcatenated(): void + { + $first = MQTT::encodePublish('a/b', '1', 0, false, false, null); + $second = MQTT::encodePublish('c/d', '2', 0, false, false, null); + + $buffer = $first . $second; + + $packet = MQTT::decodePacket($buffer); + $this->assertNotNull($packet); + $parsed = MQTT::parsePublish($packet['payload'], $packet['flags']); + $this->assertSame('a/b', $parsed['topic']); + $this->assertSame('1', $parsed['payload']); + + $next = MQTT::decodePacket($buffer); + $this->assertNotNull($next); + $parsedNext = MQTT::parsePublish($next['payload'], $next['flags']); + $this->assertSame('c/d', $parsedNext['topic']); + $this->assertSame('2', $parsedNext['payload']); + + $this->assertSame('', $buffer); + } + + public function testEncodesLargePayloadAcrossMultiByteRemainingLength(): void + { + $bigPayload = \str_repeat('x', 200); + $packet = MQTT::encodePublish('topic/large', $bigPayload, 0, false, false, null); + + $decoded = MQTT::decodePacket($packet); + $this->assertNotNull($decoded); + $parsed = MQTT::parsePublish($decoded['payload'], $decoded['flags']); + $this->assertSame($bigPayload, $parsed['payload']); + } + + public function testSubscribeParsing(): void + { + $topic = 'appwrite/push/device-abc'; + $body = \pack('n', 5) + . \chr(0) + . \pack('n', \strlen($topic)) . $topic + . \chr(0x01); + + $parsed = MQTT::parseSubscribe($body); + + $this->assertSame(5, $parsed['packetId']); + $this->assertCount(1, $parsed['filters']); + $this->assertSame('appwrite/push/device-abc', $parsed['filters'][0]['topic']); + $this->assertSame(1, $parsed['filters'][0]['qos']); + } + + public function testEncodeConnectRejectsLongStrings(): void + { + $tooLong = \str_repeat('a', 65536); + + $this->expectException(\Throwable::class); + MQTT::encodeConnect(clientId: $tooLong); + } + + public function testEncodeConnectRejectsPasswordWithoutUsername(): void + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessageMatches('/password without a username/i'); + + MQTT::encodeConnect( + clientId: 'device-1', + username: null, + password: 'secret', + ); + } + + public function testEncodePublishRejectsInvalidQos(): void + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('MQTT QoS must be 0, 1, or 2'); + + MQTT::encodePublish( + topic: 'a/b', + payload: 'data', + qos: 3, + retain: false, + dup: false, + packetId: 1, + ); + } +}