1+ <?php namespace peer \server ;
2+
3+ use Throwable ;
4+ use lang \IllegalStateException ;
5+ use peer \server \protocol \SocketAcceptHandler ;
6+ use peer \{ServerSocket , SocketException , SocketTimeoutException };
7+
8+ /**
9+ * Asynchronous TCP/IP Server
10+ *
11+ * ```php
12+ * use peer\server\AsynchronousServer;
13+ *
14+ * $server= new AsynchronousServer();
15+ * $server->listen(new ServerSocket('127.0.0.1', 6100), new MyProtocol());
16+ * $server->service();
17+ * ```
18+ *
19+ * @see peer.ServerSocket
20+ * @test peer.unittest.server.AsynchronousServerTest
21+ */
22+ class AsynchronousServer extends ServerImplementation {
23+ private $ terminate = false ;
24+ private $ select = [], $ tasks = [], $ continuation = [];
25+
26+ static function __static () {
27+
28+ // For PHP < 7.3.0
29+ if (!function_exists ('array_key_last ' )) {
30+ eval ('function array_key_last($array) { end($array); return key($array); } ' );
31+ }
32+ }
33+
34+ /**
35+ * Adds server socket to listen on, associating protocol handler with it
36+ *
37+ * @param peer.ServerSocket|peer.BSDServerSocket $socket
38+ * @param peer.server.ServerProtocol $protocol
39+ * @return self
40+ */
41+ public function listen ($ socket , ServerProtocol $ protocol ) {
42+ $ socket ->create ();
43+ $ socket ->bind (true );
44+ $ socket ->listen ();
45+
46+ $ protocol ->initialize ($ this );
47+
48+ $ i = $ this ->select ? array_key_last ($ this ->select ) + 1 : 1 ;
49+ $ this ->select [$ i ]= $ socket ;
50+ $ this ->continuation [$ i ]= new Continuation (function ($ socket ) use ($ protocol ) {
51+ do {
52+ $ connection = $ socket ->accept ();
53+ if ($ protocol instanceof SocketAcceptHandler && !$ protocol ->handleAccept ($ connection )) {
54+ $ connection ->close ();
55+ return ;
56+ }
57+
58+ $ this ->tcpnodelay && $ connection ->useNoDelay ();
59+ $ protocol ->handleConnect ($ connection );
60+ $ this ->select ($ connection , $ protocol );
61+ yield 'accept ' => $ connection ;
62+ } while (!$ this ->terminate );
63+ });
64+
65+ // Never time out sockets we listen on
66+ $ this ->continuation [$ i ]->next = null ;
67+ return $ this ;
68+ }
69+
70+ /**
71+ * Adds socket to select, associating a function to call for data
72+ *
73+ * @param peer.Socket|peer.BSDSocket $socket
74+ * @param peer.ServerProtocol|function(peer.Socket|peer.BSDSocket): void $handler
75+ * @param bool $timeout
76+ * @return self
77+ */
78+ public function select ($ socket , $ handler , $ timeout = false ) {
79+ $ i = $ this ->select ? array_key_last ($ this ->select ) + 1 : 1 ;
80+ $ this ->select [$ i ]= $ socket ;
81+ if ($ handler instanceof ServerProtocol) {
82+ $ this ->continuation [$ i ]= new Continuation (function ($ socket ) use ($ handler ) {
83+ try {
84+
85+ // Check for readability, then handle incoming data
86+ while ($ socket ->isConnected () && !$ socket ->eof ()) {
87+ yield 'read ' => $ socket ;
88+ yield from $ handler ->handleData ($ socket ) ?? [];
89+ }
90+
91+ // Handle disconnnect gracefully, ensure socket is closed
92+ $ handler ->handleDisconnect ($ socket );
93+ $ socket ->close ();
94+ } catch (Throwable $ t ) {
95+
96+ // Handle any errors, then close socket
97+ $ handler ->handleError ($ socket , $ t );
98+ $ socket ->close ();
99+ }
100+ });
101+ } else {
102+ $ this ->continuation [$ i ]= new Continuation ($ handler );
103+ }
104+
105+ // Unless explicitely given, ensure sockets we select on never timeout
106+ $ timeout || $ this ->continuation [$ i ]->next = null ;
107+ return $ this ;
108+ }
109+
110+ /**
111+ * Schedule a given task to execute every given interval.
112+ *
113+ * @param int|float $interval
114+ * @param function(): ?int|float
115+ * @return self
116+ */
117+ public function schedule ($ interval , $ function ) {
118+ $ i = $ this ->tasks ? array_key_last ($ this ->tasks ) - 1 : -1 ;
119+ $ this ->tasks [$ i ]= $ function ;
120+ $ this ->continuation [$ i ]= new Continuation (function ($ function ) use ($ interval ) {
121+ try {
122+ while (($ interval = $ function () ?? $ interval ) >= 0 ) {
123+ yield 'delay ' => $ interval * 1000 ;
124+ }
125+ } catch (Throwable $ t ) {
126+ // Not displayed, simply stops execution
127+ }
128+ });
129+
130+ return $ this ;
131+ }
132+
133+ /**
134+ * Runs service until shutdown() is called.
135+ *
136+ * @return void
137+ * @throws lang.IllegalStateException
138+ */
139+ public function service () {
140+ if (empty ($ this ->select ) && empty ($ this ->tasks )) {
141+ throw new IllegalStateException ('No sockets or tasks to execute ' );
142+ }
143+
144+ $ readable = $ writeable = $ waitable = $ write = [];
145+ $ sockets = $ errors = null ;
146+ do {
147+ $ time = microtime (true );
148+ $ wait = [];
149+ foreach ($ this ->continuation as $ i => $ continuation ) {
150+ if (null !== $ continuation ->next && $ continuation ->next >= $ time ) {
151+ $ wait []= $ continuation ->next - $ time ;
152+ continue ;
153+ } else if (isset ($ this ->tasks [$ i ])) {
154+ $ execute = $ continuation ->continue ($ this ->tasks [$ i ]);
155+ unset($ waitable [$ i ]);
156+ } else if (isset ($ readable [$ i ]) || isset ($ writeable [$ i ]) || isset ($ waitable [$ i ])) {
157+ $ execute = $ continuation ->continue ($ this ->select [$ i ]);
158+ if (null !== $ continuation ->next ) $ continuation ->next = $ time ;
159+ unset($ readable [$ i ], $ writeable [$ i ], $ waitable [$ i ]);
160+ } else {
161+ isset ($ write [$ i ]) ? $ writeable [$ i ]= $ this ->select [$ i ] : $ readable [$ i ]= $ this ->select [$ i ];
162+ if (null === $ continuation ->next ) continue ;
163+
164+ // Check if the socket has timed out...
165+ $ idle = $ time - $ continuation ->next ;
166+ $ timeout = $ this ->select [$ i ]->getTimeout ();
167+ if ($ idle < $ timeout ) {
168+ $ wait []= $ timeout - $ idle ;
169+ continue ;
170+ }
171+
172+ // ...and if so, throw an exception, allowing the continuation to handle it.
173+ $ execute = $ continuation ->throw ($ this ->select [$ i ], new SocketTimeoutException ('Timed out ' , $ timeout ));
174+ $ continuation ->next = $ time ;
175+ unset($ readable [$ i ], $ writeable [$ i ]);
176+ }
177+
178+ // Check whether execution has finished
179+ if (null === $ execute ) {
180+ unset($ this ->tasks [$ i ], $ this ->select [$ i ], $ this ->continuation [$ i ], $ write [$ i ]);
181+ continue ;
182+ }
183+
184+ // `yield 'accept' => $socket`: Check for being able to read from socket
185+ // `yield 'read' => $_`: Continue as soon as the socket becomes readable
186+ // `yield 'write' => $_`: Continue as soon as the socket becomes writeable
187+ // `yield 'delay' => $millis`: Wait a specified number of milliseconds
188+ // `yield`: Continue at the next possible execution slot (`delay => 0`)
189+ switch ($ execute ->key ()) {
190+ case 'accept ' :
191+ $ socket = $ execute ->current ();
192+ $ readable [array_key_last ($ this ->select )]= $ socket ;
193+ $ readable [$ i ]= $ this ->select [$ i ];
194+ $ wait []= $ socket ->getTimeout ();
195+ break ;
196+
197+ case 'write ' :
198+ $ write [$ i ]= true ;
199+ $ writeable [$ i ]= $ this ->select [$ i ];
200+ $ wait []= $ this ->select [$ i ]->getTimeout ();
201+ break ;
202+
203+ case 'read ' :
204+ unset($ write [$ i ]);
205+ $ readable [$ i ]= $ this ->select [$ i ];
206+ $ wait []= $ this ->select [$ i ]->getTimeout ();
207+ break ;
208+
209+ case 'delay ' :
210+ $ delay = $ execute ->current () / 1000 ;
211+ $ continuation ->next = $ time + $ delay ;
212+ $ waitable [$ i ]= true ;
213+ $ wait []= $ delay ;
214+ break ;
215+
216+ default :
217+ $ continuation ->next = $ time ;
218+ $ waitable [$ i ]= true ;
219+ $ wait []= 0 ;
220+ break ;
221+ }
222+ }
223+
224+ // When asked to terminate, close sockets in reverse order
225+ if ($ this ->terminate ) {
226+ for ($ i = array_key_last ($ this ->select ); $ i > 0 ; $ i --) {
227+ isset ($ this ->select [$ i ]) && $ this ->select [$ i ]->close ();
228+ }
229+ break ;
230+ }
231+
232+ if ($ this ->select ) {
233+ // echo date('H:i:s'), " SELECT ", \util\Objects::stringOf($wait), " @ {\n",
234+ // " R: ", \util\Objects::stringOf($readable), "\n",
235+ // " W: ", \util\Objects::stringOf($writeable), "\n",
236+ // "}\n";
237+ $ sockets ?? $ sockets = current ($ this ->select )->kind ();
238+ $ sockets ->select ($ readable , $ writeable , $ errors , $ wait ? min ($ wait ) : null );
239+ } else {
240+ // echo date('H:i:s'), " SLEEP ", \util\Objects::stringOf($wait), "\n";
241+ $ wait && usleep (1000000 * (int )min ($ wait ));
242+ }
243+ } while ($ this ->select || $ this ->tasks );
244+ }
245+
246+ /**
247+ * Shutdown the server
248+ *
249+ * @return void
250+ */
251+ public function shutdown () {
252+ $ this ->terminate = true ;
253+ }
254+ }
0 commit comments