-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Feature/first auth v1 #175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b1e5315
2bb45b7
ecb124d
8357da3
782e39f
b1c4e8b
dde0fea
ec0b583
ddceaf8
67b08bd
070e9cf
31514f9
4fb8f67
8ba873d
25a4512
94e7bea
cb2bd63
6a1d6e3
feff020
07a1955
2167493
664deba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,6 @@ | ||
| package com.crossoverjie.cim.client.sdk.impl; | ||
|
|
||
| import static com.crossoverjie.cim.common.enums.StatusEnum.RECONNECT_FAIL; | ||
|
|
||
| import com.crossoverjie.cim.client.sdk.Client; | ||
| import com.crossoverjie.cim.client.sdk.ClientState; | ||
| import com.crossoverjie.cim.client.sdk.FetchOfflineMsgJob; | ||
| import com.crossoverjie.cim.client.sdk.ReConnectManager; | ||
| import com.crossoverjie.cim.client.sdk.RouteManager; | ||
| import com.crossoverjie.cim.client.sdk.*; | ||
| import com.crossoverjie.cim.client.sdk.io.CIMClientHandleInitializer; | ||
| import com.crossoverjie.cim.common.data.construct.RingBufferWheel; | ||
| import com.crossoverjie.cim.common.exception.CIMException; | ||
|
|
@@ -27,14 +21,18 @@ | |
| import io.netty.channel.socket.SocketChannel; | ||
| import io.netty.channel.socket.nio.NioSocketChannel; | ||
| import io.netty.util.concurrent.DefaultThreadFactory; | ||
| import lombok.Getter; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.apache.commons.lang3.StringUtils; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.concurrent.*; | ||
| import java.util.function.Consumer; | ||
| import lombok.Getter; | ||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| import static com.crossoverjie.cim.common.enums.StatusEnum.RECONNECT_FAIL; | ||
|
|
||
| @Slf4j | ||
| public class ClientImpl extends ClientState implements Client { | ||
|
|
@@ -133,16 +131,23 @@ private void connectServer(Consumer<Void> success) { | |
| */ | ||
| private CompletableFuture<Boolean> doConnectServer() { | ||
| CompletableFuture<Boolean> future = new CompletableFuture<>(); | ||
| this.userLogin(future).ifPresentOrElse((cimServer) -> { | ||
| this.doConnectServer(cimServer, future); | ||
| this.loginServer(); | ||
| this.serverInfo = cimServer; | ||
| future.complete(true); | ||
| }, () -> { | ||
| this.conf.getEvent().error("Login fail!, cannot get server info!"); | ||
| this.conf.getEvent().fatal(this); | ||
| future.complete(false); | ||
| }); | ||
| this.userLogin(future) // save serverInfo after login success | ||
| .ifPresentOrElse((cimServer) -> { | ||
| if (StringUtils.isBlank(cimServer.getAuthToken())) { | ||
| future.complete(false); | ||
| this.conf.getEvent().error("Login fail!, auth token is blank!"); | ||
| this.conf.getEvent().fatal(this); | ||
| return; | ||
| } | ||
| getAuth().setAuthToken(cimServer.getAuthToken()); | ||
| this.doConnectServer(future); | ||
| this.loginServer(); | ||
| future.complete(true); | ||
| }, () -> { | ||
| this.conf.getEvent().error("Login fail!, cannot get server info!"); | ||
| this.conf.getEvent().fatal(this); | ||
| future.complete(false); | ||
| }); | ||
| return future; | ||
| } | ||
|
|
||
|
|
@@ -157,8 +162,7 @@ private Optional<CIMServerResVO> userLogin(CompletableFuture<Boolean> future) { | |
|
|
||
| CIMServerResVO cimServer = null; | ||
| try { | ||
| cimServer = routeManager.getServer(loginReqVO); | ||
| log.info("cimServer=[{}]", cimServer); | ||
| serverInfo = cimServer = routeManager.getServer(loginReqVO); | ||
| } catch (Exception e) { | ||
| log.error("login fail", e); | ||
| future.completeExceptionally(e); | ||
|
|
@@ -168,14 +172,21 @@ private Optional<CIMServerResVO> userLogin(CompletableFuture<Boolean> future) { | |
|
|
||
| private final EventLoopGroup group = new NioEventLoopGroup(0, new DefaultThreadFactory("cim-work")); | ||
|
|
||
| private void doConnectServer(CIMServerResVO cimServer, CompletableFuture<Boolean> future) { | ||
| private void doConnectServer(CompletableFuture<Boolean> future) { | ||
| Bootstrap bootstrap = new Bootstrap(); | ||
| bootstrap.group(group) | ||
| .channel(NioSocketChannel.class) | ||
| .handler(new CIMClientHandleInitializer()); | ||
| .handler(new CIMClientHandleInitializer(conf.isDebug(), getAuth())); | ||
| ChannelFuture sync; | ||
| try { | ||
| sync = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync(); | ||
| final String host = checkHost(); | ||
| final Integer port = checkPort(); | ||
| if (StringUtils.isBlank(host) || Objects.isNull(port)) { | ||
| this.conf.getEvent().error("cim server host or port is null"); | ||
| future.complete(false); | ||
| return; | ||
| } | ||
| sync = bootstrap.connect(host, port).sync(); | ||
| if (sync.isSuccess()) { | ||
| this.conf.getEvent().info("Start cim client success!"); | ||
| channel = (SocketChannel) sync.channel(); | ||
|
|
@@ -205,6 +216,7 @@ private void loginServer() { | |
| * 2. reconnect. | ||
| * 3. shutdown reconnect job. | ||
| * 4. reset reconnect state. | ||
| * | ||
| * @throws Exception | ||
| */ | ||
| public void reconnect() throws Exception { | ||
|
|
@@ -238,6 +250,25 @@ public void close() { | |
| ringBufferWheel.stop(true); | ||
| } | ||
|
|
||
| @Override | ||
| public String checkHost() { | ||
| // 优先使用直连的方式 | ||
| final String host = StringUtils.isNoneBlank(conf.getHost()) ? conf.getHost() : serverInfo.getIp(); | ||
| if (StringUtils.isBlank(host)) { | ||
| throw new IllegalArgumentException("cim server host is null"); | ||
| } | ||
| return host; | ||
| } | ||
|
|
||
| @Override | ||
| public Integer checkPort() { | ||
| final Integer port = Objects.nonNull(conf.getServerPort()) ? conf.getServerPort() : serverInfo.getCimServerPort(); | ||
| if (Objects.isNull(port)) { | ||
| throw new IllegalArgumentException("cim server port is null"); | ||
| } | ||
| return port; | ||
| } | ||
|
Comment on lines
250
to
+270
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning reconnect() 会将 serverInfo 置 null,理论上 doConnectServer() 会重新 login 再连接,但当前 checkHost/checkPort 直接访问 serverInfo.getIp()/getCimServerPort(),在某些调用时序/异常分支(例如 login 返回空 body 或异常后仍继续)可能触发 NPE。并且 doConnectServer() 中又额外判断 host/port blank/null,导致重复校验逻辑。 建议: 让 checkHost/checkPort 只做“取值+强校验(含 serverInfo null)”,并移除 doConnectServer() 中的重复校验;或在 checkHost/checkPort 内优先校验 serverInfo 非空再访问字段。 |
||
|
|
||
| @Override | ||
| public CompletableFuture<Void> sendP2PAsync(P2PReqVO p2PReqVO) { | ||
| CompletableFuture<Void> future = new CompletableFuture<>(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,33 @@ | ||
| package com.crossoverjie.cim.client.sdk.io; | ||
|
|
||
| import com.crossoverjie.cim.client.sdk.ClientState; | ||
| import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData; | ||
| import com.crossoverjie.cim.client.sdk.impl.ClientImpl; | ||
| import com.crossoverjie.cim.common.constant.Constants; | ||
| import com.crossoverjie.cim.common.enums.ChannelAttributeKeys; | ||
| import com.crossoverjie.cim.common.protocol.BaseCommand; | ||
| import com.crossoverjie.cim.common.protocol.Request; | ||
| import com.crossoverjie.cim.common.protocol.Response; | ||
| import com.crossoverjie.cim.common.util.NettyAttrUtil; | ||
| import io.netty.channel.ChannelFutureListener; | ||
| import io.netty.channel.ChannelHandler; | ||
| import io.netty.channel.ChannelHandlerContext; | ||
| import io.netty.channel.SimpleChannelInboundHandler; | ||
| import io.netty.channel.*; | ||
| import io.netty.handler.timeout.IdleState; | ||
| import io.netty.handler.timeout.IdleStateEvent; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.apache.zookeeper.common.StringUtils; | ||
|
|
||
| import java.util.Objects; | ||
|
|
||
| @ChannelHandler.Sharable | ||
| @Slf4j | ||
| public class CIMClientHandle extends SimpleChannelInboundHandler<Response> { | ||
|
|
||
|
|
||
| private final ClientConfigurationData.Auth auth; | ||
|
|
||
| public CIMClientHandle(ClientConfigurationData.Auth auth) { | ||
| this.auth = auth; | ||
| } | ||
|
|
||
| @Override | ||
| public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { | ||
|
|
||
|
|
@@ -41,8 +51,41 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc | |
|
|
||
| @Override | ||
| public void channelActive(ChannelHandlerContext ctx) { | ||
| ClientImpl.getClient().getConf().getEvent().debug("ChannelActive"); | ||
| ClientImpl.getClient().setState(ClientState.State.Ready); | ||
|
|
||
| // 获取认证信息 | ||
| // channelActive 执行时间过早,所以这些属性没办法放在 channel 上 | ||
| final String token = auth.getAuthToken(); | ||
| if (StringUtils.isBlank(token)) { | ||
| log.error("auth token is blank!"); | ||
| ctx.close(); | ||
| return; | ||
| } | ||
| final long userId = auth.getUserId(); | ||
|
|
||
| // 连接建立之后就发送认证请求 | ||
| Request authReq = Request.newBuilder() | ||
| .setRequestId(userId) | ||
| .setCmd(BaseCommand.LOGIN_REQUEST) | ||
| .setReqMsg(token) | ||
| .build(); | ||
|
|
||
| ctx.writeAndFlush(authReq).syncUninterruptibly().addListener(new ChannelFutureListener() { | ||
| @Override | ||
| public void operationComplete(ChannelFuture future) throws Exception { | ||
| if (future.isSuccess()) { | ||
| log.info("auth msg send success,userId:{},userName:{}", auth.getUserId(), auth.getUserName()); | ||
| ClientImpl.getClient().getConf().getEvent().debug("ChannelActive"); | ||
| ctx.channel().attr(ChannelAttributeKeys.USER_ID).set(userId); | ||
| log.info("channel is active,userId:{}", userId); | ||
| ClientImpl.getClient().setState(ClientState.State.Ready); | ||
| } else { | ||
| log.error("auth msg send failure,userId:{},userName:{}", auth.getUserId(), auth.getUserName()); | ||
| ctx.channel().close(); // 认证失败关闭连接 | ||
| } | ||
| } | ||
| }); | ||
|
Comment on lines
+63
to
+86
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning 客户端在 channelActive 发送 token 使用 BaseCommand.LOGIN_REQUEST;服务端的 ClientAuthHandler 只是拿 Request.reqMsg 当 token 校验,不区分 cmd;但 CIMServerHandle 也把 LOGIN_REQUEST 当作“上报 userName 并绑定 Session”。一旦认证 handler 被移除后,同 cmd 的业务含义与鉴权含义容易混淆,后续扩展风险较大。 建议: 为“连接鉴权”引入独立 cmd(例如 AUTH_REQUEST),避免与现有 LOGIN_REQUEST(业务登录/绑定 session)复用。
Comment on lines
+66
to
+86
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Caution 🚨 客户端使用 LOGIN_REQUEST 发送 token 导致与服务端登录语义混淆、Session 可能不一致 客户端 channelActive 发送 Request(cmd=LOGIN_REQUEST, reqMsg=token)。服务端业务层将 LOGIN_REQUEST 的 reqMsg 当 userName 保存、requestId 当 userId,导致 token 被误存为 userName 且 userId 与 JWT payload 可能不一致。并且若认证 handler 未将该消息继续向下传播,可能不会触发 SessionSocketHolder 绑定。 建议: 拆分 AUTH_REQUEST 与 LOGIN_REQUEST:AUTH_REQUEST 仅携带 token;认证成功后客户端再发送真实 LOGIN_REQUEST(携带 userName),或业务层改为从 JWT payload 的 channel attr 读取 userId/userName。 |
||
|
|
||
|
|
||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -66,16 +109,16 @@ protected void channelRead0(ChannelHandlerContext ctx, Response msg) { | |
|
|
||
| if (msg.getCmd() != BaseCommand.PING) { | ||
| String receiveUserId = msg.getPropertiesMap().get(Constants.MetaKey.RECEIVE_USER_ID); | ||
| ClientImpl client = ClientImpl.getClientMap().get(Long.valueOf(receiveUserId)); | ||
| if (client == null) { | ||
| ClientImpl client; | ||
| if ((Objects.isNull(receiveUserId) || ((client = ClientImpl.getClientMap().get(Long.valueOf(receiveUserId))) == null))) { | ||
| log.error("client not found for userId: {}", receiveUserId); | ||
| return; | ||
| } | ||
| // callback | ||
| client.getConf().getCallbackThreadPool().execute(() -> { | ||
| log.info("client address: {} :{}", ctx.channel().remoteAddress(), client); | ||
| MessageListener messageListener = client.getConf().getMessageListener(); | ||
| if (msg.getBatchResMsgCount() >0 ){ | ||
| if (msg.getBatchResMsgCount() > 0) { | ||
| for (int i = 0; i < msg.getBatchResMsgCount(); i++) { | ||
| messageListener.received(client, msg.getPropertiesMap(), msg.getBatchResMsg(i)); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip
💡 接口方法后多余分号影响可读性
sendP2P/sendGroup 默认方法后存在单独的
;行,为无意义语句,降低可读性。建议: 删除多余分号。