|
16 | 16 | import com.tencent.trpc.core.exception.ErrorCode; |
17 | 17 | import com.tencent.trpc.core.logger.Logger; |
18 | 18 | import com.tencent.trpc.core.logger.LoggerFactory; |
| 19 | +import com.tencent.trpc.core.rpc.CallInfo; |
19 | 20 | import com.tencent.trpc.core.rpc.ProviderInvoker; |
20 | 21 | import com.tencent.trpc.core.rpc.RpcContext; |
21 | 22 | import com.tencent.trpc.core.rpc.RpcServerContext; |
@@ -104,13 +105,15 @@ protected void handleStreamInit(int streamId, ByteBuf frame) { |
104 | 105 | // release reference after decoding |
105 | 106 | ReferenceCountUtil.safeRelease(data); |
106 | 107 | } |
107 | | - }); |
| 108 | + }) |
| 109 | + .doFinally(signal -> receivers.remove(streamId)); |
108 | 110 |
|
109 | 111 | // use the thread pool to call the corresponding interface to prevent blocking the IO thread |
110 | 112 | workerPool.execute(() -> { |
111 | 113 | // Currently TRPC streaming protocol does not carry timeout field, timeout control is currently managed |
112 | 114 | // by the client side |
113 | 115 | RpcContext ctx = new RpcServerContext(); |
| 116 | + fillCallInfo(ctx.getCallInfo(), requestMeta); |
114 | 117 | requestMeta.getTransInfoMap().forEach((key, val) -> ctx.getReqAttachMap().put(key, val.toByteArray())); |
115 | 118 |
|
116 | 119 | Publisher<?> resp; |
@@ -146,4 +149,10 @@ protected void handleStreamInit(int streamId, ByteBuf frame) { |
146 | 149 | }); |
147 | 150 | } |
148 | 151 |
|
| 152 | + private void fillCallInfo(CallInfo callInfo, TrpcStreamInitRequestMeta requestMeta) { |
| 153 | + callInfo.setCaller(requestMeta.getCaller().toStringUtf8()); |
| 154 | + callInfo.setCallee(requestMeta.getCallee().toStringUtf8()); |
| 155 | + callInfo.setCalleeMethod(requestMeta.getFunc().toStringUtf8()); |
| 156 | + } |
| 157 | + |
149 | 158 | } |
0 commit comments