Skip to content

Commit 99dccea

Browse files
authored
Merge pull request #4 from micilini/phase-04-chat-core
phase 04: implement chat core and unique usernames
2 parents 8180feb + 51e097d commit 99dccea

27 files changed

Lines changed: 1732 additions & 0 deletions

README.zip

21.2 KB
Binary file not shown.

src/Chat/ChatKernel.php

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Micilini\PhpSockets\Chat;
6+
7+
use Micilini\PhpSockets\Config\ChatConfig;
8+
use Micilini\PhpSockets\Connection\Connection;
9+
use Micilini\PhpSockets\Contracts\ConnectionRegistryInterface;
10+
use Micilini\PhpSockets\Contracts\MessageStoreInterface;
11+
use Micilini\PhpSockets\Contracts\RoomStoreInterface;
12+
use Micilini\PhpSockets\Contracts\SessionStoreInterface;
13+
use Micilini\PhpSockets\Exceptions\InvalidPayloadException;
14+
use Micilini\PhpSockets\Protocol\Frame;
15+
use Micilini\PhpSockets\Protocol\Opcode;
16+
use Micilini\PhpSockets\Server\WebSocketServer;
17+
use Micilini\PhpSockets\Storage\InMemory\InMemoryMessageStore;
18+
use Micilini\PhpSockets\Storage\InMemory\InMemoryRoomStore;
19+
use Micilini\PhpSockets\Storage\InMemory\InMemorySessionStore;
20+
use Throwable;
21+
22+
final class ChatKernel
23+
{
24+
private readonly SessionStoreInterface $sessions;
25+
private readonly MessageStoreInterface $messages;
26+
private readonly RoomStoreInterface $rooms;
27+
private readonly PayloadValidator $validator;
28+
private readonly PresenceManager $presence;
29+
private readonly RoomManager $roomManager;
30+
private readonly DirectMessageRouter $directMessages;
31+
private readonly PrivateGroupRouter $privateGroups;
32+
33+
public function __construct(
34+
private readonly ChatConfig $config,
35+
?SessionStoreInterface $sessionStore = null,
36+
?MessageStoreInterface $messageStore = null,
37+
?RoomStoreInterface $roomStore = null,
38+
) {
39+
$this->sessions = $sessionStore ?? new InMemorySessionStore();
40+
$this->messages = $messageStore ?? new InMemoryMessageStore();
41+
$this->rooms = $roomStore ?? new InMemoryRoomStore();
42+
$this->validator = new PayloadValidator();
43+
$this->presence = new PresenceManager(
44+
new UsernameNormalizer($this->config->maxDisplayNameLength),
45+
$this->sessions,
46+
);
47+
$this->roomManager = new RoomManager($this->rooms);
48+
$this->directMessages = new DirectMessageRouter($this->roomManager, $this->messages);
49+
$this->privateGroups = new PrivateGroupRouter($this->roomManager, $this->messages);
50+
}
51+
52+
public function attach(WebSocketServer $server): void
53+
{
54+
$server->on('message', function (Connection $connection, Frame $frame) use ($server): void {
55+
$this->handleMessage($server->connections(), $connection, $frame);
56+
});
57+
58+
$server->on('close', function (Connection $connection) use ($server): void {
59+
$this->handleClose($server->connections(), $connection);
60+
});
61+
}
62+
63+
public function presence(): PresenceManager
64+
{
65+
return $this->presence;
66+
}
67+
68+
public function messageStore(): MessageStoreInterface
69+
{
70+
return $this->messages;
71+
}
72+
73+
public function roomStore(): RoomStoreInterface
74+
{
75+
return $this->rooms;
76+
}
77+
78+
public function handleMessage(
79+
ConnectionRegistryInterface $connections,
80+
Connection $connection,
81+
Frame $frame,
82+
): void {
83+
if ($frame->opcode !== Opcode::TEXT) {
84+
$this->sendError($connection, 'Only text frames are supported by the chat core.');
85+
return;
86+
}
87+
88+
try {
89+
$envelope = MessageEnvelope::fromJson($frame->payload);
90+
$this->validator->assertEnvelope($envelope);
91+
92+
match ($envelope->type) {
93+
'auth.join' => $this->handleJoin($connections, $connection, $envelope),
94+
'message.global' => $this->handleGlobalMessage($connections, $connection, $envelope),
95+
'message.direct' => $this->handleDirectMessage($connections, $connection, $envelope),
96+
'room.create' => $this->handleRoomCreate($connections, $connection, $envelope),
97+
'room.message' => $this->handleRoomMessage($connections, $connection, $envelope),
98+
default => throw new InvalidPayloadException('Unsupported message type.'),
99+
};
100+
} catch (Throwable $exception) {
101+
$this->sendError($connection, $exception->getMessage());
102+
}
103+
}
104+
105+
private function handleJoin(
106+
ConnectionRegistryInterface $connections,
107+
Connection $connection,
108+
MessageEnvelope $envelope,
109+
): void {
110+
$session = $this->presence->join($this->validator->displayName($envelope));
111+
112+
$connection->setUserId($session->userId);
113+
$this->roomManager->joinGlobalRoom($session->userId);
114+
115+
$this->sendEnvelope($connection, MessageEnvelope::server('session.accepted', [
116+
'session' => $session->toArray(),
117+
]));
118+
119+
$this->sendEnvelope($connection, MessageEnvelope::server('presence.snapshot', [
120+
'users' => $this->presence->snapshot(),
121+
]));
122+
123+
$this->broadcastAuthenticated($connections, MessageEnvelope::server('presence.user_joined', [
124+
'user' => $session->toArray(),
125+
]));
126+
}
127+
128+
private function handleGlobalMessage(
129+
ConnectionRegistryInterface $connections,
130+
Connection $connection,
131+
MessageEnvelope $envelope,
132+
): void {
133+
$fromUserId = $this->requireAuthenticated($connection);
134+
$room = $this->roomManager->ensureGlobalRoom();
135+
$message = ChatMessage::text($room->id, $fromUserId, $this->validator->text($envelope));
136+
137+
$this->messages->save($message);
138+
139+
$this->broadcastAuthenticated($connections, MessageEnvelope::server('message.received', [
140+
'roomId' => $room->id,
141+
'message' => $message->toArray(),
142+
]));
143+
}
144+
145+
private function handleDirectMessage(
146+
ConnectionRegistryInterface $connections,
147+
Connection $connection,
148+
MessageEnvelope $envelope,
149+
): void {
150+
$fromUserId = $this->requireAuthenticated($connection);
151+
$toUserId = $this->validator->targetUserId($envelope);
152+
153+
$this->assertOnlineUser($toUserId);
154+
155+
$message = $this->directMessages->send(
156+
fromUserId: $fromUserId,
157+
toUserId: $toUserId,
158+
text: $this->validator->text($envelope),
159+
);
160+
161+
$this->deliverToUsers($connections, [$fromUserId, $toUserId], MessageEnvelope::server('message.received', [
162+
'roomId' => $message->roomId,
163+
'message' => $message->toArray(),
164+
]));
165+
}
166+
167+
private function handleRoomCreate(
168+
ConnectionRegistryInterface $connections,
169+
Connection $connection,
170+
MessageEnvelope $envelope,
171+
): void {
172+
$createdByUserId = $this->requireAuthenticated($connection);
173+
$type = $envelope->payload['type'] ?? null;
174+
175+
if ($type !== Room::TYPE_PRIVATE_GROUP) {
176+
throw new InvalidPayloadException('Only private group rooms can be created in this phase.');
177+
}
178+
179+
$participantUserIds = $this->validator->participantUserIds($envelope);
180+
181+
foreach ($participantUserIds as $participantUserId) {
182+
$this->assertOnlineUser($participantUserId);
183+
}
184+
185+
$room = $this->privateGroups->createRoom(
186+
createdByUserId: $createdByUserId,
187+
name: $this->validator->roomName($envelope),
188+
participantUserIds: $participantUserIds,
189+
maxMembers: $this->config->maxPrivateGroupMembers,
190+
);
191+
192+
$this->deliverToUsers($connections, $room->memberUserIds, MessageEnvelope::server('room.created', [
193+
'room' => $room->toArray(),
194+
]));
195+
}
196+
197+
private function handleRoomMessage(
198+
ConnectionRegistryInterface $connections,
199+
Connection $connection,
200+
MessageEnvelope $envelope,
201+
): void {
202+
$fromUserId = $this->requireAuthenticated($connection);
203+
$roomId = $this->validator->roomId($envelope);
204+
$room = $this->roomManager->assertMember($roomId, $fromUserId);
205+
$message = $this->privateGroups->send($roomId, $fromUserId, $this->validator->text($envelope));
206+
207+
$this->deliverToUsers($connections, $room->memberUserIds, MessageEnvelope::server('message.received', [
208+
'roomId' => $room->id,
209+
'message' => $message->toArray(),
210+
]));
211+
}
212+
213+
private function handleClose(ConnectionRegistryInterface $connections, Connection $connection): void
214+
{
215+
$userId = $connection->userId();
216+
217+
if ($userId === null) {
218+
return;
219+
}
220+
221+
$this->presence->leave($userId);
222+
223+
$this->broadcastAuthenticated($connections, MessageEnvelope::server('presence.user_left', [
224+
'userId' => $userId,
225+
]));
226+
}
227+
228+
private function requireAuthenticated(Connection $connection): string
229+
{
230+
$userId = $connection->userId();
231+
232+
if ($userId === null) {
233+
throw new InvalidPayloadException('Connection is not authenticated.');
234+
}
235+
236+
return $userId;
237+
}
238+
239+
private function assertOnlineUser(string $userId): void
240+
{
241+
$session = $this->sessions->findByUserId($userId);
242+
243+
if (!$session instanceof UserSession || !$session->connected) {
244+
throw new InvalidPayloadException('Target user is not online.');
245+
}
246+
}
247+
248+
private function sendError(Connection $connection, string $message): void
249+
{
250+
$this->sendEnvelope($connection, MessageEnvelope::server('error', [
251+
'message' => $message,
252+
]));
253+
}
254+
255+
private function sendEnvelope(Connection $connection, MessageEnvelope $envelope): void
256+
{
257+
$connection->send($envelope->toJson());
258+
}
259+
260+
private function broadcastAuthenticated(ConnectionRegistryInterface $connections, MessageEnvelope $envelope): void
261+
{
262+
foreach ($connections->all() as $connection) {
263+
if ($connection->userId() !== null) {
264+
$this->sendEnvelope($connection, $envelope);
265+
}
266+
}
267+
}
268+
269+
/**
270+
* @param list<string> $userIds
271+
*/
272+
private function deliverToUsers(
273+
ConnectionRegistryInterface $connections,
274+
array $userIds,
275+
MessageEnvelope $envelope,
276+
): void {
277+
foreach ($connections->all() as $connection) {
278+
$connectionUserId = $connection->userId();
279+
280+
if ($connectionUserId !== null && in_array($connectionUserId, $userIds, true)) {
281+
$this->sendEnvelope($connection, $envelope);
282+
}
283+
}
284+
}
285+
}

src/Chat/ChatMessage.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Micilini\PhpSockets\Chat;
6+
7+
use DateTimeImmutable;
8+
9+
final readonly class ChatMessage
10+
{
11+
/**
12+
* @param array<string, mixed> $metadata
13+
*/
14+
public function __construct(
15+
public string $id,
16+
public string $roomId,
17+
public string $fromUserId,
18+
public string $kind,
19+
public ?string $body,
20+
public DateTimeImmutable $createdAt,
21+
public array $metadata = [],
22+
) {
23+
}
24+
25+
public static function text(string $roomId, string $fromUserId, string $text): self
26+
{
27+
return new self(
28+
id: 'msg_' . bin2hex(random_bytes(16)),
29+
roomId: $roomId,
30+
fromUserId: $fromUserId,
31+
kind: 'text',
32+
body: $text,
33+
createdAt: new DateTimeImmutable(),
34+
);
35+
}
36+
37+
/**
38+
* @return array<string, mixed>
39+
*/
40+
public function toArray(): array
41+
{
42+
return [
43+
'id' => $this->id,
44+
'roomId' => $this->roomId,
45+
'fromUserId' => $this->fromUserId,
46+
'kind' => $this->kind,
47+
'body' => $this->body,
48+
'metadata' => $this->metadata,
49+
'createdAt' => $this->createdAt->format(DATE_ATOM),
50+
];
51+
}
52+
}

src/Chat/ChatServer.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Micilini\PhpSockets\Chat;
6+
7+
use Micilini\PhpSockets\Config\ChatConfig;
8+
use Micilini\PhpSockets\Config\ServerConfig;
9+
use Micilini\PhpSockets\Server\WebSocketServer;
10+
11+
final readonly class ChatServer
12+
{
13+
public function __construct(
14+
private WebSocketServer $server,
15+
private ChatKernel $kernel,
16+
) {
17+
$this->kernel->attach($this->server);
18+
}
19+
20+
public static function create(ServerConfig $serverConfig, ChatConfig $chatConfig): self
21+
{
22+
return new self(
23+
server: new WebSocketServer($serverConfig),
24+
kernel: new ChatKernel($chatConfig),
25+
);
26+
}
27+
28+
public function on(string $eventName, callable $listener): self
29+
{
30+
$this->server->on($eventName, $listener);
31+
32+
return $this;
33+
}
34+
35+
public function run(): void
36+
{
37+
$this->server->run();
38+
}
39+
40+
public function stop(): void
41+
{
42+
$this->server->stop();
43+
}
44+
45+
public function webSocketServer(): WebSocketServer
46+
{
47+
return $this->server;
48+
}
49+
50+
public function kernel(): ChatKernel
51+
{
52+
return $this->kernel;
53+
}
54+
}

0 commit comments

Comments
 (0)