1010 * Contributions, suggestions, and feedback are always welcome!
1111 */
1212
13- namespace Ripple \Net \Http \ Server ;
13+ namespace Ripple \Net \Http ;
1414
1515use Closure ;
1616use Generator ;
3131use function is_int ;
3232use function is_file ;
3333use function strtolower ;
34+ use function is_callable ;
3435
3536/**
3637 * response entity
@@ -39,7 +40,7 @@ class Response
3940{
4041 use ClientResponse;
4142
42- /*** @var mixed|Stream */
43+ /*** @var mixed|Stream|Generator|string */
4344 protected mixed $ body ;
4445
4546 /*** @var array */
@@ -54,18 +55,34 @@ class Response
5455 /*** @var string */
5556 protected string $ statusText = 'OK ' ;
5657
58+ /*** @var bool 响应体发送完成后是否关闭连接 */
59+ protected bool $ closeAfterBody = false ;
60+
61+ /**
62+ * @var Stream|null
63+ */
64+ protected ?Stream $ stream = null ;
65+
5766 /**
5867 */
5968 public function __construct ()
6069 {
6170 }
6271
6372 /**
64- * @param Stream $stream
6573 * @return void
6674 * @throws ConnectionException
6775 */
68- public function __invoke (Stream $ stream ): void
76+ public function __invoke (): void
77+ {
78+ $ this ->send ();
79+ }
80+
81+ /**
82+ * @return void
83+ * @throws ConnectionException
84+ */
85+ public function send (): void
6986 {
7087 // respond header
7188 $ header = "HTTP/1.1 {$ this ->statusCode ()} {$ this ->statusText }\r\n" ;
@@ -82,14 +99,19 @@ public function __invoke(Stream $stream): void
8299 $ header .= 'Set-Cookie: ' . $ cookieLine . "\r\n" ;
83100 }
84101
102+ // respond body
103+ if (is_callable ($ this ->body )) {
104+ $ this ->body = ($ this ->body )($ this ->stream );
105+ }
106+
85107 if (is_string ($ this ->body )) {
86- $ stream ->writeAll ("{$ header }\r\n{$ this ->body }" );
108+ $ this -> stream ->writeAll ("{$ header }\r\n{$ this ->body }" );
87109 } else {
88- $ stream ->writeAll ("{$ header }\r\n" );
110+ $ this -> stream ->writeAll ("{$ header }\r\n" );
89111
90112 // 普通文本
91113 if (is_string ($ this ->body )) {
92- $ stream ->writeAll ($ this ->body );
114+ $ this -> stream ->writeAll ($ this ->body );
93115 }
94116
95117 // 可控流式传输方式
@@ -98,41 +120,46 @@ public function __invoke(Stream $stream): void
98120 if (!is_string ($ content ) || $ content === '' ) {
99121 continue ;
100122 }
101- $ stream ->writeAll ($ content );
123+
124+ try {
125+ $ this ->stream ->writeAll ($ content );
126+ } catch (Throwable ) {
127+ break ;
128+ }
102129 }
103130 }
104131
105132 // 固定流传输方式
106133 elseif ($ this ->body instanceof Stream) {
107134 try {
108135 $ owner = \Co \current ();
109- $ stream ->setWriteBufferMax (10485760 );
110- $ stream ->watchWrite (function () use ($ owner, $ stream ) {
136+ $ this -> stream ->setWriteBufferMax (10485760 );
137+ $ this -> stream ->watchWrite (function () use ($ owner ) {
111138 try {
112- $ bufferLen = $ stream ->writeBuffer ()->length ();
139+ $ bufferLen = $ this -> stream ->writeBuffer ()->length ();
113140
114141 // 阈值检查
115142 if ($ bufferLen > 10405760 ) {
116- $ stream ->flushOnce ();
143+ $ this -> stream ->flushOnce ();
117144 return ;
118145 }
119146
120147 // 优先处理body数据
121148 if (!$ this ->body ->isClosed ()) {
122149 $ buf = $ this ->body ->read (8192 );
123150 if ($ buf ) {
124- $ stream ->writeAsync ($ buf );
151+ $ this -> stream ->writeAsync ($ buf );
125152 }
126153
127154 if ($ this ->body ->eof ()) {
128155 $ this ->body ->close ();
129156 }
130157 }
131158
132- $ stream ->flushOnce ();
159+ $ this -> stream ->flushOnce ();
133160
134161 // 文件末尾 && 缓冲区空
135- if ($ this ->body ->eof () && $ stream ->writeBuffer ()->length () === 0 ) {
162+ if ($ this ->body ->eof () && $ this -> stream ->writeBuffer ()->length () === 0 ) {
136163 Scheduler::tryResume ($ owner );
137164 }
138165 } catch (Throwable ) {
@@ -147,7 +174,7 @@ public function __invoke(Stream $stream): void
147174 throw new ConnectionException ($ exception ->getMessage (), $ exception ->getCode (), $ exception );
148175 } finally {
149176 $ this ->body ->close ();
150- $ stream ->unwatchWrite ();
177+ $ this -> stream ->unwatchWrite ();
151178 }
152179 } else {
153180 throw new ConnectionException ('The response content is illegal. ' );
@@ -157,8 +184,8 @@ public function __invoke(Stream $stream): void
157184 $ connectionHeader = $ this ->headers ['Connection ' ] ?? '' ;
158185 $ connectionValue = strtolower ($ connectionHeader );
159186
160- if ($ connectionValue === 'close ' ) {
161- $ stream ->close ();
187+ if ($ connectionValue === 'close ' || $ this -> closeAfterBody ) {
188+ $ this -> stream ->close ();
162189 }
163190 }
164191
@@ -272,6 +299,16 @@ public function withBody(mixed $content): static
272299 return $ this ;
273300 }
274301
302+ /**
303+ * @param Stream $stream
304+ * @return $this
305+ */
306+ public function withStream (Stream $ stream ): static
307+ {
308+ $ this ->stream = $ stream ;
309+ return $ this ;
310+ }
311+
275312 /**
276313 * @param string $name
277314 * @return $this
@@ -282,6 +319,16 @@ public function removeHeader(string $name): static
282319 return $ this ;
283320 }
284321
322+ /**
323+ * 响应体发送完成后关闭连接
324+ * @return $this
325+ */
326+ public function closeAfter (): static
327+ {
328+ $ this ->closeAfterBody = true ;
329+ return $ this ;
330+ }
331+
285332 /**
286333 * 获取响应体内容
287334 * @return mixed
0 commit comments