Skip to content

Commit dcf57e1

Browse files
authored
Merge branch 'develop' into dependabot/npm_and_yarn/webui/develop/nodemailer-8.0.7
2 parents 65b93e4 + 718d677 commit dcf57e1

35 files changed

Lines changed: 1205 additions & 279 deletions

mqtt/connect.go

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,10 @@ type ConnectRequest struct {
1515
Message []byte
1616
Username string
1717
Password string
18+
Properties Properties
1819
}
1920

20-
type ConnectHeader struct {
21-
}
22-
23-
func (r *ConnectRequest) Read(d *Decoder) {
21+
func (r *ConnectRequest) Read(d *Decoder, _ *Header) {
2422
r.Protocol = d.ReadString()
2523
r.Version = d.ReadByte()
2624

@@ -33,6 +31,11 @@ func (r *ConnectRequest) Read(d *Decoder) {
3331
r.CleanSession = (b>>1)&0x1 > 0
3432
r.KeepAlive = d.ReadInt16()
3533

34+
if r.Version == 5 {
35+
r.Properties = Properties{}
36+
r.Properties.Read(d)
37+
}
38+
3639
r.ClientId = d.ReadString()
3740

3841
if r.WillFlag {
@@ -48,7 +51,7 @@ func (r *ConnectRequest) Read(d *Decoder) {
4851
}
4952
}
5053

51-
func (r *ConnectRequest) Write(e *Encoder) {
54+
func (r *ConnectRequest) Write(e *Encoder, _ *Header) {
5255
e.writeString(r.Protocol)
5356
e.writeByte(r.Version)
5457
b := byte(0)
@@ -61,7 +64,7 @@ func (r *ConnectRequest) Write(e *Encoder) {
6164
if r.WillRetain {
6265
b |= 0x1 << 5
6366
}
64-
b |= (r.WillQoS & 0x03) << 1
67+
b |= (r.WillQoS & 0x3) << 3
6568
if r.WillFlag {
6669
b |= 0x1 << 2
6770
}
@@ -70,8 +73,19 @@ func (r *ConnectRequest) Write(e *Encoder) {
7073
}
7174
e.writeByte(b)
7275
e.writeInt16(r.KeepAlive)
76+
77+
if r.Version == 5 {
78+
r.Properties.Write(e)
79+
}
80+
7381
e.writeString(r.ClientId)
7482

83+
if r.WillFlag {
84+
e.writeString(r.Topic)
85+
e.writeInt16(int16(len(r.Message)))
86+
e.Write(r.Message)
87+
}
88+
7589
if r.HasUsername {
7690
e.writeString(r.Username)
7791
}
@@ -82,21 +96,30 @@ func (r *ConnectRequest) Write(e *Encoder) {
8296

8397
type ConnectResponse struct {
8498
SessionPresent bool
85-
ReturnCode Code
99+
ReasonCode Code
100+
Properties Properties
86101
}
87102

88-
func (r *ConnectResponse) Write(e *Encoder) {
103+
func (r *ConnectResponse) Write(e *Encoder, _ *Header) {
89104
if r.SessionPresent {
90-
e.writeByte(0x01)
105+
e.writeByte(0x1)
91106
} else {
92107
e.writeByte(0x0)
93108
}
94-
e.writeByte(r.ReturnCode.Code)
109+
e.writeByte(r.ReasonCode.Code)
110+
if e.IsV5() {
111+
r.Properties.Write(e)
112+
}
95113
}
96114

97-
func (r *ConnectResponse) Read(d *Decoder) {
98-
r.SessionPresent = d.ReadByte() == 0x01
99-
r.ReturnCode = Code{
115+
func (r *ConnectResponse) Read(d *Decoder, _ *Header) {
116+
r.SessionPresent = d.ReadByte() == 0x1
117+
r.ReasonCode = Code{
100118
Code: d.ReadByte(),
101119
}
120+
121+
if d.IsV5() {
122+
r.Properties = Properties{}
123+
r.Properties.Read(d)
124+
}
102125
}

mqtt/connect_test.go

Lines changed: 161 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package mqtt_test
33
import (
44
"bytes"
55
"fmt"
6-
"github.com/stretchr/testify/require"
76
"mokapi/mqtt"
87
"mokapi/mqtt/mqtttest"
98
"mokapi/try"
109
"testing"
10+
11+
"github.com/stretchr/testify/require"
1112
)
1213

1314
func TestConnect_ReadRequest(t *testing.T) {
@@ -19,10 +20,10 @@ func TestConnect_ReadRequest(t *testing.T) {
1920
{
2021
name: "simple connect",
2122
in: []byte{
22-
0x10, // flags
23+
0x10, // Packet type
2324
0x10, // length
2425
0x00, 0x04, // protocol length
25-
0x4d, 0x51, 0x54, 0x54, // protocol
26+
0x4d, 0x51, 0x54, 0x54, // protocol name MQTT
2627
0x04, // version
2728
0x02, // connect flags
2829
0x00, 0x3c, // keep alive
@@ -53,10 +54,10 @@ func TestConnect_ReadRequest(t *testing.T) {
5354
{
5455
name: "connect with topic and message",
5556
in: []byte{
56-
0x10, // flags
57+
0x10, // Packet type
5758
0x1A, // length
5859
0x00, 0x04, // protocol length
59-
0x4d, 0x51, 0x54, 0x54, // protocol
60+
0x4d, 0x51, 0x54, 0x54, // protocol name MQTT
6061
0x04, // version
6162
0x0e, // connect flags
6263
0x00, 0x3c, // keep alive
@@ -90,6 +91,45 @@ func TestConnect_ReadRequest(t *testing.T) {
9091
require.Equal(t, []byte("bar"), msg.Message)
9192
},
9293
},
94+
{
95+
name: "connect v5",
96+
in: []byte{
97+
0x10, // Packet type
98+
0x18, // length
99+
0x00, 0x04, // protocol length
100+
0x4d, 0x51, 0x54, 0x54, // protocol name MQTT
101+
0x05, // version
102+
0x02, // connect flags
103+
0x00, 0x3c, // keep alive
104+
0x5, // properties length
105+
0x11, // Session Expiry Interval
106+
0x0, 0x0, 0x0, 0x0, // value
107+
0x00, 0x06, // client id length
108+
'm', 'o', 'k', 'a', 'p', 'i', // client id
109+
},
110+
test: func(t *testing.T, r *mqtt.Message, err error) {
111+
require.NoError(t, err)
112+
113+
require.Equal(t, 24, r.Header.Size)
114+
115+
require.IsType(t, &mqtt.ConnectRequest{}, r.Payload)
116+
msg := r.Payload.(*mqtt.ConnectRequest)
117+
118+
require.Equal(t, "MQTT", msg.Protocol)
119+
require.Equal(t, byte(5), msg.Version)
120+
121+
require.False(t, msg.HasUsername)
122+
require.False(t, msg.HasPassword)
123+
require.False(t, msg.WillRetain)
124+
require.Equal(t, byte(0), msg.WillQoS)
125+
require.False(t, msg.WillFlag)
126+
require.True(t, msg.CleanSession)
127+
require.Equal(t, int16(60), msg.KeepAlive)
128+
require.Contains(t, msg.Properties, mqtt.SessionExpiryInterval)
129+
require.Equal(t, int32(0), msg.Properties[mqtt.SessionExpiryInterval])
130+
require.Equal(t, "mokapi", msg.ClientId)
131+
},
132+
},
93133
}
94134

95135
t.Parallel()
@@ -99,12 +139,126 @@ func TestConnect_ReadRequest(t *testing.T) {
99139
t.Parallel()
100140

101141
r := &mqtt.Message{}
102-
err := r.Read(bytes.NewReader(tc.in))
142+
err := r.Read(bytes.NewReader(tc.in), &mqtt.ClientContext{})
103143
tc.test(t, r, err)
104144
})
105145
}
106146
}
107147

148+
func TestConnect_WriteRequest(t *testing.T) {
149+
testcases := []struct {
150+
name string
151+
msg mqtt.Message
152+
out []byte
153+
}{
154+
{
155+
name: "simple connect",
156+
msg: mqtt.Message{
157+
Header: &mqtt.Header{
158+
Type: mqtt.CONNECT,
159+
},
160+
Payload: &mqtt.ConnectRequest{
161+
Protocol: "MQTT",
162+
Version: 4,
163+
CleanSession: true,
164+
KeepAlive: 60,
165+
ClientId: "mqtt",
166+
},
167+
},
168+
out: []byte{
169+
0x10, // Packet type
170+
0x10, // length
171+
0x00, 0x04, // protocol length
172+
0x4d, 0x51, 0x54, 0x54, // protocol name MQTT
173+
0x04, // version
174+
0x02, // connect flags
175+
0x00, 0x3c, // keep alive
176+
0x00, 0x04, // client id length
177+
0x6d, 0x71, 0x74, 0x74, // client id
178+
},
179+
},
180+
{
181+
name: "connect with topic and message",
182+
msg: mqtt.Message{
183+
Header: &mqtt.Header{
184+
Type: mqtt.CONNECT,
185+
},
186+
Payload: &mqtt.ConnectRequest{
187+
Protocol: "MQTT",
188+
Version: 4,
189+
WillQoS: 1,
190+
CleanSession: true,
191+
WillFlag: true,
192+
KeepAlive: 60,
193+
ClientId: "mqtt",
194+
Topic: "foo",
195+
Message: []byte("bar"),
196+
},
197+
},
198+
out: []byte{
199+
0x10, // Packet type
200+
0x1A, // length
201+
0x00, 0x04, // protocol length
202+
0x4d, 0x51, 0x54, 0x54, // protocol name MQTT
203+
0x04, // version
204+
0x0e, // connect flags
205+
0x00, 0x3c, // keep alive
206+
0x00, 0x04, // client id length
207+
0x6d, 0x71, 0x74, 0x74, // client id
208+
0x00, 0x03, // topic length
209+
'f', 'o', 'o', // topic
210+
0x00, 0x03, // message length
211+
'b', 'a', 'r', // message
212+
},
213+
},
214+
{
215+
name: "connect v5",
216+
msg: mqtt.Message{
217+
Header: &mqtt.Header{
218+
Type: mqtt.CONNECT,
219+
},
220+
Payload: &mqtt.ConnectRequest{
221+
Protocol: "MQTT",
222+
Version: 5,
223+
CleanSession: true,
224+
KeepAlive: 60,
225+
ClientId: "mokapi",
226+
Properties: mqtt.Properties{
227+
mqtt.SessionExpiryInterval: int32(0),
228+
},
229+
},
230+
},
231+
out: []byte{
232+
0x10, // Packet type
233+
0x18, // length
234+
0x00, 0x04, // protocol length
235+
0x4d, 0x51, 0x54, 0x54, // protocol name MQTT
236+
0x05, // version
237+
0x02, // connect flags
238+
0x00, 0x3c, // keep alive
239+
0x5, // properties length
240+
0x11, // Session Expiry Interval
241+
0x0, 0x0, 0x0, 0x0, // value
242+
0x00, 0x06, // client id length
243+
'm', 'o', 'k', 'a', 'p', 'i', // client id
244+
},
245+
},
246+
}
247+
248+
t.Parallel()
249+
for _, tc := range testcases {
250+
tc := tc
251+
t.Run(tc.name, func(t *testing.T) {
252+
t.Parallel()
253+
254+
var b bytes.Buffer
255+
err := tc.msg.Write(&b, &mqtt.ClientContext{})
256+
require.NoError(t, err)
257+
require.Equal(t, tc.out, b.Bytes())
258+
})
259+
}
260+
}
261+
108262
func TestConnect(t *testing.T) {
109263
testcases := []struct {
110264
name string
@@ -120,7 +274,7 @@ func TestConnect(t *testing.T) {
120274
},
121275
Payload: &mqtt.ConnectResponse{
122276
SessionPresent: false,
123-
ReturnCode: mqtt.Accepted,
277+
ReasonCode: mqtt.Success,
124278
},
125279
})
126280
}),

mqtt/context.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,13 @@ import (
99
const clientKey = "client"
1010

1111
type ClientContext struct {
12-
Addr string
13-
ClientId string
12+
Addr string
13+
ClientId string
14+
ProtocolVersion byte
1415

1516
conn net.Conn
1617
}
1718

18-
type packet struct {
19-
header *Header
20-
payload buffer.Buffer
21-
retries int
22-
}
23-
2419
func ClientFromContext(ctx context.Context) *ClientContext {
2520
if ctx == nil {
2621
return nil
@@ -36,8 +31,8 @@ func NewClientContext(ctx context.Context, conn net.Conn) context.Context {
3631
func (c *ClientContext) Send(r *Message) error {
3732
b := buffer.NewPageBuffer()
3833

39-
e := NewEncoder(b)
40-
r.Payload.Write(e)
34+
e := NewEncoder(b, c.ProtocolVersion)
35+
r.Payload.Write(e, r.Header)
4136

4237
r.Header.Size = b.Size()
4338

0 commit comments

Comments
 (0)