Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b1e5315
feat:
chenqwwq Jun 7, 2025
2bb45b7
Merge branch 'refs/heads/master' into feature/first_auth_v1
chenqwwq Jun 14, 2025
ecb124d
feat(client): 支持客户端直连模式
chenqwwq Jun 15, 2025
8357da3
build(dependencies): 更新 spring-boot-starter-logging 依赖版本
chenqwwq Jun 15, 2025
782e39f
refactor(client-sdk): 重构客户端 SDK
chenqwwq Jun 15, 2025
b1c4e8b
refactor(cim-common): 修正 RegistryType 构造函数参数顺序
chenqwwq Jun 15, 2025
dde0fea
refactor(cim): 重构注册中心相关代码
chenqwwq Jun 15, 2025
ec0b583
[链接鉴权] - fix(cim-common): 修复 ChannelInboundDebugHandler 中 channelInac…
chenqwwq Jun 16, 2025
ddceaf8
[链接鉴权] - fix(cim-common):
chenqwwq Jun 16, 2025
67b08bd
[链接鉴权] - fix(cim-common):
chenqwwq Jun 16, 2025
070e9cf
[链接鉴权] - fix:
chenqwwq Jun 16, 2025
31514f9
[链接鉴权] - fix:
chenqwwq Jun 16, 2025
4fb8f67
[连接鉴权] - fix(client): 优化客户端认证逻辑
chenqwwq Jun 18, 2025
8ba873d
Merge branch 'master' into feature/first_auth_v1
crossoverJie Jun 24, 2025
25a4512
[链接鉴权] - feat:
chenqwwq Aug 6, 2025
94e7bea
[链接鉴权] - feat:
chenqwwq Aug 6, 2025
cb2bd63
[链接鉴权] - feat:
chenqwwq Aug 6, 2025
6a1d6e3
[链接鉴权] - feat:
chenqwwq Aug 6, 2025
feff020
Merge remote-tracking branch 'origin/feature/first_auth_v1' into feat…
chenqwwq Aug 6, 2025
07a1955
[链接鉴权] - feat:
chenqwwq Aug 6, 2025
2167493
[链接鉴权] - test:
chenqwwq Aug 6, 2025
664deba
[链接鉴权] - test:
chenqwwq Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,41 @@
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.api.vo.req.P2PReqVO;
import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO;

import java.io.Closeable;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

public interface Client extends Closeable {

static ClientBuilder builder(ClientConfigurationData conf) {
Objects.requireNonNull(conf, "ClientConfigurationData must not be null");
return new ClientBuilderImpl(conf);
}

static ClientBuilder builder() {
return new ClientBuilderImpl();
}

default void sendP2P(P2PReqVO p2PReqVO) throws Exception{
String checkHost();

Integer checkPort();

default void sendP2P(P2PReqVO p2PReqVO) throws Exception {
sendP2PAsync(p2PReqVO).get();
};
}

;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

💡 接口方法后多余分号影响可读性

sendP2P/sendGroup 默认方法后存在单独的 ; 行,为无意义语句,降低可读性。

建议: 删除多余分号。


CompletableFuture<Void> sendP2PAsync(P2PReqVO p2PReqVO);

default void sendGroup(String msg) throws Exception{
default void sendGroup(String msg) throws Exception {
sendGroupAsync(msg).get();
};
}

;

CompletableFuture<Void> sendGroupAsync(String msg);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import com.crossoverjie.cim.client.sdk.io.ReconnectCheck;
import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import com.crossoverjie.cim.common.util.StringUtil;
import java.util.concurrent.ThreadPoolExecutor;
import okhttp3.OkHttpClient;

import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;

public class ClientBuilderImpl implements ClientBuilder {


Expand All @@ -18,18 +20,20 @@ public class ClientBuilderImpl implements ClientBuilder {
public ClientBuilderImpl() {
this(new ClientConfigurationData());
}

public ClientBuilderImpl(ClientConfigurationData conf) {
Objects.requireNonNull(conf, "ClientConfigurationData must not be null");
this.conf = conf;
}

@Override
public Client build() {
return new ClientImpl(conf);
}

@Override
public ClientBuilder auth(ClientConfigurationData.Auth auth) {
if (auth.getUserId() <= 0 || StringUtil.isEmpty(auth.getUserName())){
if (auth.getUserId() <= 0 || StringUtil.isEmpty(auth.getUserName())) {
throw new IllegalArgumentException("userId and userName must be set");
}
this.conf.setAuth(auth);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,45 @@
package com.crossoverjie.cim.client.sdk.impl;

import com.crossoverjie.cim.client.sdk.Event;
import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import com.crossoverjie.cim.client.sdk.io.MessageListener;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.client.sdk.io.ReconnectCheck;
import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import okhttp3.OkHttpClient;

import java.util.concurrent.ThreadPoolExecutor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClientConfigurationData {

private boolean debug = false;

private Auth auth;

private String host;

private Integer serverPort;

private Integer httpPort;

@Data
@AllArgsConstructor
@Builder
public static class Auth{
public static class Auth {
private long userId;
private String userName;

@JsonIgnore
private String authToken;
}

private String routeUrl;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
package com.crossoverjie.cim.client.sdk.impl;

import static com.crossoverjie.cim.common.enums.StatusEnum.RECONNECT_FAIL;

import com.crossoverjie.cim.client.sdk.Client;
import com.crossoverjie.cim.client.sdk.ClientState;
import com.crossoverjie.cim.client.sdk.FetchOfflineMsgJob;
import com.crossoverjie.cim.client.sdk.ReConnectManager;
import com.crossoverjie.cim.client.sdk.RouteManager;
import com.crossoverjie.cim.client.sdk.*;
import com.crossoverjie.cim.client.sdk.io.CIMClientHandleInitializer;
import com.crossoverjie.cim.common.data.construct.RingBufferWheel;
import com.crossoverjie.cim.common.exception.CIMException;
Expand All @@ -27,14 +21,18 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.*;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import static com.crossoverjie.cim.common.enums.StatusEnum.RECONNECT_FAIL;

@Slf4j
public class ClientImpl extends ClientState implements Client {
Expand Down Expand Up @@ -133,16 +131,23 @@ private void connectServer(Consumer<Void> success) {
*/
private CompletableFuture<Boolean> doConnectServer() {
CompletableFuture<Boolean> future = new CompletableFuture<>();
this.userLogin(future).ifPresentOrElse((cimServer) -> {
this.doConnectServer(cimServer, future);
this.loginServer();
this.serverInfo = cimServer;
future.complete(true);
}, () -> {
this.conf.getEvent().error("Login fail!, cannot get server info!");
this.conf.getEvent().fatal(this);
future.complete(false);
});
this.userLogin(future) // save serverInfo after login success
.ifPresentOrElse((cimServer) -> {
if (StringUtils.isBlank(cimServer.getAuthToken())) {
future.complete(false);
this.conf.getEvent().error("Login fail!, auth token is blank!");
this.conf.getEvent().fatal(this);
return;
}
getAuth().setAuthToken(cimServer.getAuthToken());
this.doConnectServer(future);
this.loginServer();
future.complete(true);
}, () -> {
this.conf.getEvent().error("Login fail!, cannot get server info!");
this.conf.getEvent().fatal(this);
future.complete(false);
});
return future;
}

Expand All @@ -157,8 +162,7 @@ private Optional<CIMServerResVO> userLogin(CompletableFuture<Boolean> future) {

CIMServerResVO cimServer = null;
try {
cimServer = routeManager.getServer(loginReqVO);
log.info("cimServer=[{}]", cimServer);
serverInfo = cimServer = routeManager.getServer(loginReqVO);
} catch (Exception e) {
log.error("login fail", e);
future.completeExceptionally(e);
Expand All @@ -168,14 +172,21 @@ private Optional<CIMServerResVO> userLogin(CompletableFuture<Boolean> future) {

private final EventLoopGroup group = new NioEventLoopGroup(0, new DefaultThreadFactory("cim-work"));

private void doConnectServer(CIMServerResVO cimServer, CompletableFuture<Boolean> future) {
private void doConnectServer(CompletableFuture<Boolean> future) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new CIMClientHandleInitializer());
.handler(new CIMClientHandleInitializer(conf.isDebug(), getAuth()));
ChannelFuture sync;
try {
sync = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
final String host = checkHost();
final Integer port = checkPort();
if (StringUtils.isBlank(host) || Objects.isNull(port)) {
this.conf.getEvent().error("cim server host or port is null");
future.complete(false);
return;
}
sync = bootstrap.connect(host, port).sync();
if (sync.isSuccess()) {
this.conf.getEvent().info("Start cim client success!");
channel = (SocketChannel) sync.channel();
Expand Down Expand Up @@ -205,6 +216,7 @@ private void loginServer() {
* 2. reconnect.
* 3. shutdown reconnect job.
* 4. reset reconnect state.
*
* @throws Exception
*/
public void reconnect() throws Exception {
Expand Down Expand Up @@ -238,6 +250,25 @@ public void close() {
ringBufferWheel.stop(true);
}

@Override
public String checkHost() {
// 优先使用直连的方式
final String host = StringUtils.isNoneBlank(conf.getHost()) ? conf.getHost() : serverInfo.getIp();
if (StringUtils.isBlank(host)) {
throw new IllegalArgumentException("cim server host is null");
}
return host;
}

@Override
public Integer checkPort() {
final Integer port = Objects.nonNull(conf.getServerPort()) ? conf.getServerPort() : serverInfo.getCimServerPort();
if (Objects.isNull(port)) {
throw new IllegalArgumentException("cim server port is null");
}
return port;
}
Comment on lines 250 to +270
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning

⚠️ checkHost/checkPort 在 serverInfo 为空时可能 NPE,且 doConnectServer 已做空值判断重复

reconnect() 会将 serverInfo 置 null,理论上 doConnectServer() 会重新 login 再连接,但当前 checkHost/checkPort 直接访问 serverInfo.getIp()/getCimServerPort(),在某些调用时序/异常分支(例如 login 返回空 body 或异常后仍继续)可能触发 NPE。并且 doConnectServer() 中又额外判断 host/port blank/null,导致重复校验逻辑。

建议: 让 checkHost/checkPort 只做“取值+强校验(含 serverInfo null)”,并移除 doConnectServer() 中的重复校验;或在 checkHost/checkPort 内优先校验 serverInfo 非空再访问字段。


@Override
public CompletableFuture<Void> sendP2PAsync(P2PReqVO p2PReqVO) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
package com.crossoverjie.cim.client.sdk.io;

import com.crossoverjie.cim.client.sdk.ClientState;
import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData;
import com.crossoverjie.cim.client.sdk.impl.ClientImpl;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.enums.ChannelAttributeKeys;
import com.crossoverjie.cim.common.protocol.BaseCommand;
import com.crossoverjie.cim.common.protocol.Request;
import com.crossoverjie.cim.common.protocol.Response;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.common.StringUtils;

import java.util.Objects;

@ChannelHandler.Sharable
@Slf4j
public class CIMClientHandle extends SimpleChannelInboundHandler<Response> {


private final ClientConfigurationData.Auth auth;

public CIMClientHandle(ClientConfigurationData.Auth auth) {
this.auth = auth;
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

Expand All @@ -41,8 +51,41 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc

@Override
public void channelActive(ChannelHandlerContext ctx) {
ClientImpl.getClient().getConf().getEvent().debug("ChannelActive");
ClientImpl.getClient().setState(ClientState.State.Ready);

// 获取认证信息
// channelActive 执行时间过早,所以这些属性没办法放在 channel 上
final String token = auth.getAuthToken();
if (StringUtils.isBlank(token)) {
log.error("auth token is blank!");
ctx.close();
return;
}
final long userId = auth.getUserId();

// 连接建立之后就发送认证请求
Request authReq = Request.newBuilder()
.setRequestId(userId)
.setCmd(BaseCommand.LOGIN_REQUEST)
.setReqMsg(token)
.build();

ctx.writeAndFlush(authReq).syncUninterruptibly().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info("auth msg send success,userId:{},userName:{}", auth.getUserId(), auth.getUserName());
ClientImpl.getClient().getConf().getEvent().debug("ChannelActive");
ctx.channel().attr(ChannelAttributeKeys.USER_ID).set(userId);
log.info("channel is active,userId:{}", userId);
ClientImpl.getClient().setState(ClientState.State.Ready);
} else {
log.error("auth msg send failure,userId:{},userName:{}", auth.getUserId(), auth.getUserName());
ctx.channel().close(); // 认证失败关闭连接
}
}
});
Comment on lines +63 to +86
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning

⚠️ 客户端把鉴权请求的 cmd 设为 LOGIN_REQUEST,可能与业务登录上报语义冲突

客户端在 channelActive 发送 token 使用 BaseCommand.LOGIN_REQUEST;服务端的 ClientAuthHandler 只是拿 Request.reqMsg 当 token 校验,不区分 cmd;但 CIMServerHandle 也把 LOGIN_REQUEST 当作“上报 userName 并绑定 Session”。一旦认证 handler 被移除后,同 cmd 的业务含义与鉴权含义容易混淆,后续扩展风险较大。

建议: 为“连接鉴权”引入独立 cmd(例如 AUTH_REQUEST),避免与现有 LOGIN_REQUEST(业务登录/绑定 session)复用。

Comment on lines +66 to +86
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

🚨 客户端使用 LOGIN_REQUEST 发送 token 导致与服务端登录语义混淆、Session 可能不一致

客户端 channelActive 发送 Request(cmd=LOGIN_REQUEST, reqMsg=token)。服务端业务层将 LOGIN_REQUEST 的 reqMsg 当 userName 保存、requestId 当 userId,导致 token 被误存为 userName 且 userId 与 JWT payload 可能不一致。并且若认证 handler 未将该消息继续向下传播,可能不会触发 SessionSocketHolder 绑定。

建议: 拆分 AUTH_REQUEST 与 LOGIN_REQUEST:AUTH_REQUEST 仅携带 token;认证成功后客户端再发送真实 LOGIN_REQUEST(携带 userName),或业务层改为从 JWT payload 的 channel attr 读取 userId/userName。



}

@Override
Expand All @@ -66,16 +109,16 @@ protected void channelRead0(ChannelHandlerContext ctx, Response msg) {

if (msg.getCmd() != BaseCommand.PING) {
String receiveUserId = msg.getPropertiesMap().get(Constants.MetaKey.RECEIVE_USER_ID);
ClientImpl client = ClientImpl.getClientMap().get(Long.valueOf(receiveUserId));
if (client == null) {
ClientImpl client;
if ((Objects.isNull(receiveUserId) || ((client = ClientImpl.getClientMap().get(Long.valueOf(receiveUserId))) == null))) {
log.error("client not found for userId: {}", receiveUserId);
return;
}
// callback
client.getConf().getCallbackThreadPool().execute(() -> {
log.info("client address: {} :{}", ctx.channel().remoteAddress(), client);
MessageListener messageListener = client.getConf().getMessageListener();
if (msg.getBatchResMsgCount() >0 ){
if (msg.getBatchResMsgCount() > 0) {
for (int i = 0; i < msg.getBatchResMsgCount(); i++) {
messageListener.received(client, msg.getPropertiesMap(), msg.getBatchResMsg(i));
}
Expand Down
Loading