@@ -120,11 +120,11 @@ type produceRequest struct {
120120 Records []store.Record `json:"records"`
121121}
122122
123- type produceResponse struct {
124- Offsets []recordResult `json:"offsets"`
123+ type ProduceResponse struct {
124+ Offsets []RecordResult `json:"offsets"`
125125}
126126
127- type recordResult struct {
127+ type RecordResult struct {
128128 Partition int
129129 Offset int64
130130 Error string
@@ -222,9 +222,9 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
222222 writeError (w , err , http .StatusBadRequest )
223223 }
224224 }
225- res := produceResponse {}
225+ res := ProduceResponse {}
226226 for _ , rec := range result {
227- res .Offsets = append (res .Offsets , recordResult {
227+ res .Offsets = append (res .Offsets , RecordResult {
228228 Partition : rec .Partition ,
229229 Offset : rec .Offset ,
230230 Error : rec .Error ,
@@ -247,7 +247,7 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
247247 w .WriteHeader (http .StatusNotFound )
248248 } else {
249249 w .Header ().Set ("Content-Type" , "application/json" )
250- writeJsonBody (w , getPartitions (k , t ))
250+ writeJsonBody (w , getPartitions (t ))
251251 }
252252 return
253253 // /api/services/kafka/{cluster}/topics/{topic}/partitions/{id}
@@ -276,7 +276,7 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
276276 }
277277 if r .Method == "GET" {
278278 w .Header ().Set ("Content-Type" , "application/json" )
279- writeJsonBody (w , newPartition (k . Store , p ))
279+ writeJsonBody (w , newPartition (p ))
280280 } else {
281281 records , err := getProduceRecords (r )
282282 if err != nil {
@@ -296,9 +296,9 @@ func (h *handler) handleKafka(w http.ResponseWriter, r *http.Request) {
296296 writeError (w , err , http .StatusBadRequest )
297297 }
298298 }
299- res := produceResponse {}
299+ res := ProduceResponse {}
300300 for _ , rec := range result {
301- res .Offsets = append (res .Offsets , recordResult {
301+ res .Offsets = append (res .Offsets , RecordResult {
302302 Partition : rec .Partition ,
303303 Offset : rec .Offset ,
304304 Error : rec .Error ,
@@ -458,7 +458,7 @@ func getTopics(info *runtime.KafkaInfo) []topic {
458458 addr = name
459459 }
460460 t := info .Store .Topic (addr )
461- topics = append (topics , newTopic (info . Store , t , ch .Value , info .Config ))
461+ topics = append (topics , newTopic (t , ch .Value , info .Config ))
462462 }
463463 sort .Slice (topics , func (i , j int ) bool {
464464 return strings .Compare (topics [i ].Name , topics [j ].Name ) < 0
@@ -477,18 +477,18 @@ func getTopic(info *runtime.KafkaInfo, name string) *topic {
477477 }
478478 if addr == name {
479479 t := info .Store .Topic (addr )
480- r := newTopic (info . Store , t , ch .Value , info .Config )
480+ r := newTopic (t , ch .Value , info .Config )
481481 return & r
482482 }
483483
484484 }
485485 return nil
486486}
487487
488- func newTopic (s * store. Store , t * store.Topic , ch * asyncapi3.Channel , cfg * asyncapi3.Config ) topic {
488+ func newTopic (t * store.Topic , ch * asyncapi3.Channel , cfg * asyncapi3.Config ) topic {
489489 var partitions []partition
490490 for _ , p := range t .Partitions {
491- partitions = append (partitions , newPartition (s , p ))
491+ partitions = append (partitions , newPartition (p ))
492492 }
493493 sort .Slice (partitions , func (i , j int ) bool {
494494 return partitions [i ].Id < partitions [j ].Id
@@ -549,10 +549,10 @@ func newTopic(s *store.Store, t *store.Topic, ch *asyncapi3.Channel, cfg *asynca
549549 return result
550550}
551551
552- func getPartitions (info * runtime. KafkaInfo , t * store.Topic ) []partition {
552+ func getPartitions (t * store.Topic ) []partition {
553553 var partitions []partition
554554 for _ , p := range t .Partitions {
555- partitions = append (partitions , newPartition (info . Store , p ))
555+ partitions = append (partitions , newPartition (p ))
556556 }
557557 sort .Slice (partitions , func (i , j int ) bool {
558558 return partitions [i ].Id < partitions [j ].Id
@@ -594,13 +594,12 @@ func newGroup(g *store.Group) group {
594594 return grp
595595}
596596
597- func newPartition (s * store.Store , p * store.Partition ) partition {
598- leader , _ := s .Broker (p .Leader )
597+ func newPartition (p * store.Partition ) partition {
599598 return partition {
600599 Id : p .Index ,
601600 StartOffset : p .StartOffset (),
602601 Offset : p .Offset (),
603- Leader : newBroker (leader ),
602+ Leader : newBroker (p . Leader ),
604603 Segments : len (p .Segments ),
605604 }
606605}
@@ -649,7 +648,7 @@ func getProduceRecords(r *http.Request) ([]store.Record, error) {
649648 return pr .Records , nil
650649}
651650
652- func (r * recordResult ) MarshalJSON () ([]byte , error ) {
651+ func (r * RecordResult ) MarshalJSON () ([]byte , error ) {
653652 aux := & struct {
654653 Partition int `json:"partition"`
655654 Offset int64 `json:"offset"`
0 commit comments