Skip to content

Commit 6cfd3c3

Browse files
authored
feat: 流式服务端消费完后移除流&添加调用信息 (#110)
1 parent 4915b45 commit 6cfd3c3

1 file changed

Lines changed: 10 additions & 1 deletion

File tree

trpc-proto/trpc-proto-standard/src/main/java/com/tencent/trpc/proto/standard/stream/TRpcStreamResponder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.tencent.trpc.core.exception.ErrorCode;
1717
import com.tencent.trpc.core.logger.Logger;
1818
import com.tencent.trpc.core.logger.LoggerFactory;
19+
import com.tencent.trpc.core.rpc.CallInfo;
1920
import com.tencent.trpc.core.rpc.ProviderInvoker;
2021
import com.tencent.trpc.core.rpc.RpcContext;
2122
import com.tencent.trpc.core.rpc.RpcServerContext;
@@ -104,13 +105,15 @@ protected void handleStreamInit(int streamId, ByteBuf frame) {
104105
// release reference after decoding
105106
ReferenceCountUtil.safeRelease(data);
106107
}
107-
});
108+
})
109+
.doFinally(signal -> receivers.remove(streamId));
108110

109111
// use the thread pool to call the corresponding interface to prevent blocking the IO thread
110112
workerPool.execute(() -> {
111113
// Currently TRPC streaming protocol does not carry timeout field, timeout control is currently managed
112114
// by the client side
113115
RpcContext ctx = new RpcServerContext();
116+
fillCallInfo(ctx.getCallInfo(), requestMeta);
114117
requestMeta.getTransInfoMap().forEach((key, val) -> ctx.getReqAttachMap().put(key, val.toByteArray()));
115118

116119
Publisher<?> resp;
@@ -146,4 +149,10 @@ protected void handleStreamInit(int streamId, ByteBuf frame) {
146149
});
147150
}
148151

152+
private void fillCallInfo(CallInfo callInfo, TrpcStreamInitRequestMeta requestMeta) {
153+
callInfo.setCaller(requestMeta.getCaller().toStringUtf8());
154+
callInfo.setCallee(requestMeta.getCallee().toStringUtf8());
155+
callInfo.setCalleeMethod(requestMeta.getFunc().toStringUtf8());
156+
}
157+
149158
}

0 commit comments

Comments
 (0)