@@ -108,6 +108,95 @@ bool MQService_MQTT_Handle(LPCXSTR lpszClientAddr, MQTTPROTOCOL_FIXEDHEADER* pSt
108108
109109 MessageQueue_TCP_Handle (&st_ProtocolHdr, lpszClientAddr, (LPCXSTR)&st_ProtocolAuth, sizeof (XENGINE_PROTOCOL_USERAUTH), XENGINE_MQAPP_NETTYPE_MQTT);
110110 XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求链接成功,客户端ID:%s,用户名:%s" ), lpszClientAddr, st_USerInfo.tszClientID , st_USerInfo.tszClientUser );
111+ return true ;
112+ }
113+
114+ XCHAR tszUserName[XPATH_MIN] = {};
115+ if (!SessionModule_Client_GetUser (lpszClientAddr, tszUserName))
116+ {
117+ MQTTProtocol_Packet_DisConnect (tszRVBuffer, &nRVLen);
118+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_DISCONN, tszRVBuffer, nRVLen);
119+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
120+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求指定消息:%d 失败,用户未登录,错误码:%lX" ), lpszClientAddr, pSt_MQTTHdr->byMsgType , MQTTProtocol_GetLastError ());
121+ return false ;
122+ }
123+ if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBSCRIBE == pSt_MQTTHdr->byMsgType )
124+ {
125+ XSHOT wMsgID = 0 ;
126+ XCHAR tszTopicName[XPATH_MAX] = {};
127+ int nListCount = 0 ;
128+ MQTTPROTOCOL_HDRPROPERTY** ppSt_HDRProperty;
129+ MQTTPROTOCOL_HDRSUBSCRIBE st_SubScribe = {};
130+
131+ if (!MQTTProtocol_Parse_Subscribe (lpszMSGBuffer, nMSGLen, &wMsgID, tszTopicName, &st_SubScribe, &ppSt_HDRProperty, &nListCount))
132+ {
133+ MQTTProtocol_Packet_DisConnect (tszRVBuffer, &nRVLen);
134+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_DISCONN, tszRVBuffer, nRVLen);
135+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
136+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求订阅失败,错误码:%lX" ), lpszClientAddr, MQTTProtocol_GetLastError ());
137+ return false ;
138+ }
139+ XENGINE_DBUSERKEY st_Userkey = {};
140+
141+ _tcsxcpy (st_Userkey.tszUserName , tszUserName);
142+ _tcsxcpy (st_Userkey.tszKeyName , tszTopicName);
143+ // 先查询有没有
144+ if (DBModule_MQUser_KeyQuery (&st_Userkey))
145+ {
146+ // 有就更新
147+ st_Userkey.nKeySerial = wMsgID;
148+ if (!DBModule_MQUser_KeyUPDate (&st_Userkey))
149+ {
150+ MQTTProtocol_Packet_DisConnect (tszRVBuffer, &nRVLen);
151+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_DISCONN, tszRVBuffer, nRVLen);
152+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
153+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" MQTT消息端:%s,设置消息队列主题更新失败,主题名称:%s,序列号:%d,错误:%lX" ), lpszClientAddr, tszTopicName, wMsgID, DBModule_GetLastError ());
154+ return false ;
155+ }
156+ }
157+ else
158+ {
159+ // 没有就创建
160+ st_Userkey.nKeySerial = wMsgID;
161+ if (!DBModule_MQUser_KeyInsert (&st_Userkey))
162+ {
163+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_FAILURE, _X (" insert bind topic is failure" ));
164+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
165+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" MQTT消息端:%s,设置消息队列主题创建失败,主题名称:%s,序列号:%d,错误:%lX" ), lpszClientAddr, tszTopicName, wMsgID, DBModule_GetLastError ());
166+ return false ;
167+ }
168+ }
169+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求订阅成功,主题名称:%s" ), lpszClientAddr, tszTopicName);
170+ }
171+ else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBSCRIBE == pSt_MQTTHdr->byMsgType )
172+ {
173+ XSHOT wMsgID = 0 ;
174+ XCHAR tszTopicName[XPATH_MAX] = {};
175+ int nListCount = 0 ;
176+ MQTTPROTOCOL_HDRPROPERTY** ppSt_HDRProperty;
177+
178+ if (!MQTTProtocol_Parse_UNSubcribe (lpszMSGBuffer, nMSGLen, &wMsgID, tszTopicName, &ppSt_HDRProperty, &nListCount))
179+ {
180+ MQTTProtocol_Packet_DisConnect (tszRVBuffer, &nRVLen);
181+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_DISCONN, tszRVBuffer, nRVLen);
182+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
183+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求取消订阅失败,错误码:%lX" ), lpszClientAddr, MQTTProtocol_GetLastError ());
184+ return false ;
185+ }
186+ XENGINE_DBUSERKEY st_Userkey = {};
187+
188+ _tcsxcpy (st_Userkey.tszUserName , tszUserName);
189+ _tcsxcpy (st_Userkey.tszKeyName , tszTopicName);
190+
191+ if (!DBModule_MQUser_KeyDelete (&st_Userkey))
192+ {
193+ MQTTProtocol_Packet_DisConnect (tszRVBuffer, &nRVLen);
194+ MQTTProtocol_Packet_Header (tszSDBuffer, &nSDLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_DISCONN, tszRVBuffer, nRVLen);
195+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_MQTT);
196+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" MQTT消息端:%s,解除消息绑定订阅失败,可能没有找到主题用户,主题名称:%s,用户名称:%s,错误:%lX" ), lpszClientAddr, tszTopicName, tszUserName, DBModule_GetLastError ());
197+ return false ;
198+ }
199+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" MQTT客户端:%s,请求取消订阅,主题名称:%s" ), lpszClientAddr, tszTopicName);
111200 }
112201 else if (XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_PUBLISH == pSt_MQTTHdr->byMsgType )
113202 {
0 commit comments