Skip to content

Commit fa06677

Browse files
committed
wip(mqtt): improve separation between kafka and mqtt
wip(mqtt): add log event and validate publish
1 parent 718d677 commit fa06677

23 files changed

Lines changed: 606 additions & 116 deletions

acceptance/petstore/asyncapi.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ servers:
66
broker:
77
url: 127.0.0.1:19092
88
protocol: kafka
9+
mqtt:
10+
url: 127.0.0.1:11883
11+
protocol: mqtt
912
channels:
1013
petstore.order-event:
1114
subscribe:

api/handler_kafka.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ func getKafka(info *runtime.KafkaInfo) kafkaInfo {
424424
for it := info.Servers.Iter(); it.Next(); {
425425
name := it.Key()
426426
s := it.Value()
427-
if s == nil || s.Value == nil {
427+
if s == nil || s.Value == nil || strings.ToLower(s.Value.Protocol) != "kafka" {
428428
continue
429429
}
430430

@@ -491,6 +491,9 @@ func getTopics(info *runtime.KafkaInfo) []topic {
491491
if ch.Value == nil {
492492
continue
493493
}
494+
if !ch.Value.IsChannelAvailable("kafka") {
495+
continue
496+
}
494497
addr := ch.Value.Address
495498
if addr == "" {
496499
addr = name

api/handler_kafka_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ func TestHandler_Kafka(t *testing.T) {
127127
Config: asyncapi3test.NewConfig(
128128
asyncapi3test.WithInfo("foo", "bar", "1.0"),
129129
asyncapi3test.WithServer("foo", "kafka", "foo.bar", asyncapi3test.WithServerDescription("bar")),
130+
asyncapi3test.WithServer("bar", "mqtt", "foo.bar", asyncapi3test.WithServerDescription("bar")),
130131
),
131132
Store: &store.Store{},
132133
}))
@@ -165,6 +166,10 @@ func TestHandler_Kafka(t *testing.T) {
165166
asyncapi3test.WithContentType("application/json"),
166167
),
167168
),
169+
asyncapi3test.WithServer("bar", "mqtt", "foo.bar"),
170+
asyncapi3test.WithChannel("bar",
171+
asyncapi3test.AssignToServer("#/servers/bar"),
172+
),
168173
)
169174
s := store.New(c, enginetest.NewEngine(), &eventstest.Handler{}, monitor.NewKafka())
170175

mqtt/publish.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
package mqtt
22

3+
type PublishReason byte
4+
5+
const (
6+
PublishSuccess PublishReason = 0
7+
TopicNameInvalid PublishReason = 144
8+
PayloadFormatInvalid PublishReason = 153
9+
)
10+
311
type PublishRequest struct {
412
Topic string
513
MessageId uint16
@@ -25,19 +33,26 @@ func (r *PublishRequest) Read(d *Decoder, h *Header) {
2533

2634
func (r *PublishRequest) Write(e *Encoder, h *Header) {
2735
e.writeString(r.Topic)
28-
if r.MessageId > 0 {
36+
if h.QoS > 0 {
2937
e.writeUInt16(r.MessageId)
3038
}
39+
40+
if e.IsV5() {
41+
r.Properties.Write(e)
42+
}
43+
3144
e.Write(r.Data)
3245
}
3346

3447
type PublishResponse struct {
3548
MessageId uint16
49+
ReasonCode PublishReason
3650
Properties Properties
3751
}
3852

3953
func (r *PublishResponse) Read(d *Decoder, h *Header) {
4054
r.MessageId = d.ReadUInt16()
55+
r.ReasonCode = PublishReason(d.ReadByte())
4156

4257
if d.IsV5() {
4358
r.Properties = Properties{}
@@ -46,9 +61,8 @@ func (r *PublishResponse) Read(d *Decoder, h *Header) {
4661
}
4762

4863
func (r *PublishResponse) Write(e *Encoder, _ *Header) {
49-
if r.MessageId > 0 {
50-
e.writeUInt16(r.MessageId)
51-
}
64+
e.writeUInt16(r.MessageId)
65+
e.writeByte(byte(r.ReasonCode))
5266
if e.IsV5() {
5367
r.Properties.Write(e)
5468
}

mqtt/publish_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,97 @@ func TestPublish_ReadRequest(t *testing.T) {
7676
}
7777
}
7878

79+
func TestPublish_Write(t *testing.T) {
80+
testcases := []struct {
81+
name string
82+
msg mqtt.Message
83+
ctx *mqtt.ClientContext
84+
out []byte
85+
}{
86+
{
87+
name: "request QoS=0",
88+
msg: mqtt.Message{
89+
Header: &mqtt.Header{
90+
Type: mqtt.PUBLISH,
91+
},
92+
Payload: &mqtt.PublishRequest{
93+
Topic: "foo",
94+
MessageId: uint16(123),
95+
Data: []byte("bar"),
96+
},
97+
},
98+
ctx: &mqtt.ClientContext{},
99+
out: []byte{
100+
0x30, // Packet type
101+
0x08, // length
102+
0x0, 0x03, // topic length
103+
'f', 'o', 'o',
104+
'b', 'a', 'r', // data
105+
},
106+
},
107+
{
108+
name: "request QoS=1",
109+
msg: mqtt.Message{
110+
Header: &mqtt.Header{
111+
Type: mqtt.PUBLISH,
112+
QoS: 1,
113+
},
114+
Payload: &mqtt.PublishRequest{
115+
Topic: "foo",
116+
MessageId: uint16(123),
117+
Data: []byte("bar"),
118+
},
119+
},
120+
ctx: &mqtt.ClientContext{},
121+
out: []byte{
122+
0x32, // Packet type
123+
0x0a, // length
124+
0x0, 0x03, // topic length
125+
'f', 'o', 'o',
126+
0x0, 0x7b, // message id
127+
'b', 'a', 'r', // data
128+
},
129+
},
130+
{
131+
name: "request v5",
132+
msg: mqtt.Message{
133+
Header: &mqtt.Header{
134+
Type: mqtt.PUBLISH,
135+
QoS: 1,
136+
},
137+
Payload: &mqtt.PublishRequest{
138+
Topic: "foo",
139+
MessageId: uint16(123),
140+
Data: []byte("bar"),
141+
},
142+
},
143+
ctx: &mqtt.ClientContext{ProtocolVersion: 5},
144+
out: []byte{
145+
0x32, // Packet type
146+
0x0b, // length
147+
0x0, 0x03, // topic length
148+
'f', 'o', 'o',
149+
0x0, 0x7b, // message id
150+
0x0, // properties
151+
'b', 'a', 'r', // data
152+
},
153+
},
154+
}
155+
156+
t.Parallel()
157+
for _, tc := range testcases {
158+
tc := tc
159+
t.Run(tc.name, func(t *testing.T) {
160+
t.Parallel()
161+
162+
var b bytes.Buffer
163+
err := tc.msg.Write(&b, tc.ctx)
164+
require.NoError(t, err)
165+
require.Equal(t, tc.out, b.Bytes())
166+
})
167+
}
168+
}
169+
79170
func TestPublish(t *testing.T) {
80171
testcases := []struct {
81172
name string

providers/asyncapi3/channel.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,19 @@ func (c *Channel) GetName() string {
116116
}
117117
return c.Title
118118
}
119+
120+
func (c *Channel) IsChannelAvailable(protocol string) bool {
121+
if len(c.Servers) == 0 {
122+
return true
123+
}
124+
125+
for _, v := range c.Servers {
126+
if v.Value == nil {
127+
continue
128+
}
129+
if protocol == v.Value.Protocol {
130+
return true
131+
}
132+
}
133+
return false
134+
}

providers/asyncapi3/kafka/store/metadata_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package store_test
22

33
import (
44
"fmt"
5+
"mokapi/config/dynamic"
6+
"mokapi/config/dynamic/dynamictest"
57
"mokapi/engine/enginetest"
68
"mokapi/kafka"
79
"mokapi/kafka/kafkatest"
@@ -162,11 +164,14 @@ func TestMetadata(t *testing.T) {
162164
{
163165
"2.2.0 assigning channels to servers",
164166
func(t *testing.T, s *store.Store) {
165-
s.Update(asyncapi3test.NewConfig(
167+
cfg := asyncapi3test.NewConfig(
166168
asyncapi3test.WithServer("foo", "kafka", "127.0.0.1:9092"),
167169
asyncapi3test.WithServer("bar", "kafka", "127.0.0.1:9093"),
168-
asyncapi3test.WithChannel("foo", asyncapi3test.AssignToServer("foo")),
169-
))
170+
asyncapi3test.WithChannel("foo", asyncapi3test.AssignToServer("#/servers/foo")),
171+
)
172+
err := cfg.Parse(&dynamic.Config{Data: cfg}, &dynamictest.Reader{})
173+
require.NoError(t, err)
174+
s.Update(cfg)
170175
rr := kafkatest.NewRecorder()
171176
r := kafkatest.NewRequest("kafkatest", 4, &metaData.Request{})
172177
r.Host = "127.0.0.1:9092"

providers/asyncapi3/kafka/store/store.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ func (s *Store) Update(c *asyncapi3.Config) {
172172
if ch.Value == nil {
173173
continue
174174
}
175+
if !ch.Value.IsChannelAvailable("kafka") {
176+
continue
177+
}
175178

176179
if ch.Value.Address != "" {
177180
n = ch.Value.Address

providers/asyncapi3/kafka/store/validation.go

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"mokapi/kafka"
99
"mokapi/media"
1010
"mokapi/providers/asyncapi3"
11-
openapi "mokapi/providers/openapi/schema"
1211
avro "mokapi/schema/avro/schema"
1312
"mokapi/schema/encoding"
1413
"mokapi/schema/json/parser"
@@ -69,7 +68,7 @@ func newMessageValidator(messageId string, msg *asyncapi3.Message, channel *asyn
6968
var msgParser encoding.Parser
7069
if msg.Payload != nil && channel.Bindings.Kafka.ValueSchemaValidation {
7170
var err error
72-
msgParser, err = getParser(msg.Payload, msg.ContentType)
71+
msgParser, err = msg.Payload.GetParser(msg.ContentType)
7372
if err != nil {
7473
log.Errorf("unsupported payload type: %T", msg.Payload.Value)
7574
}
@@ -336,22 +335,3 @@ func parseHeader(headers []kafka.RecordHeader, sr *asyncapi3.SchemaRef) (map[str
336335
}
337336
return result, nil
338337
}
339-
340-
func getParser(ref *asyncapi3.SchemaRef, contentType string) (encoding.Parser, error) {
341-
switch s := ref.Value.(type) {
342-
case *schema.Schema:
343-
return &parser.Parser{Schema: s, ConvertToSortedMap: true}, nil
344-
case *openapi.Schema:
345-
mt := media.ParseContentType(contentType)
346-
if mt.IsXml() {
347-
return openapi.NewXmlParser(s), nil
348-
}
349-
return &parser.Parser{Schema: openapi.ConvertToJsonSchema(s), ConvertToSortedMap: true}, nil
350-
case *asyncapi3.AvroRef:
351-
return &avro.Parser{Schema: s.Schema}, nil
352-
case *asyncapi3.MultiSchemaFormat:
353-
return getParser(s.Schema, contentType)
354-
default:
355-
return nil, fmt.Errorf("unsupported payload type: %T", s)
356-
}
357-
}

providers/asyncapi3/mqtt/store/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ func (c *Client) publish(msg *Message) {
4040
if topicMatches(sub.Name, msg.Topic) {
4141
effectiveQoS := min(msg.QoS, sub.QoS)
4242

43-
id := c.nextMessageId()
43+
id := uint16(0)
4444
if effectiveQoS > 0 {
45+
id = c.nextMessageId()
4546
c.appendInflight(id, msg)
4647
}
4748

0 commit comments

Comments
 (0)