Skip to content

Commit 3434b7d

Browse files
committed
kafka service: add reconnect mechanism
1 parent 1bec590 commit 3434b7d

1 file changed

Lines changed: 21 additions & 2 deletions

File tree

services/kafka/kafka.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ import (
1313
"github.com/telkomdev/tob/util"
1414
)
1515

16+
const (
17+
// ErrorClosedNetwork is an error indicating the connection is closed
18+
ErrorClosedNetwork = "use of closed network connection"
19+
)
20+
1621
// Kafka service
1722
type Kafka struct {
1823
url string
@@ -57,6 +62,16 @@ func (d *Kafka) Ping() []byte {
5762
if d.verbose {
5863
d.logger.Println("Kafka error read available brokers")
5964
d.logger.Println(err)
65+
66+
// re dial
67+
if strings.Contains(err.Error(), ErrorClosedNetwork) {
68+
d.logger.Println(fmt.Sprintf("Kafka: %s | do re dial\n", err.Error()))
69+
// re dial ignore error
70+
err = d.dial()
71+
if err != nil {
72+
d.logger.Println(fmt.Sprintf("Kafka: %s | do re dial\n", err.Error()))
73+
}
74+
}
6075
}
6176
return []byte("NOT_OK")
6277
}
@@ -83,8 +98,7 @@ func (d *Kafka) SetURL(url string) {
8398
d.url = url
8499
}
85100

86-
// Connect to service if needed
87-
func (d *Kafka) Connect() error {
101+
func (d *Kafka) dial() error {
88102
if d.verbose {
89103
d.logger.Println("connecting to Kafka server")
90104
}
@@ -147,6 +161,11 @@ func (d *Kafka) Connect() error {
147161
return nil
148162
}
149163

164+
// Connect to service if needed
165+
func (d *Kafka) Connect() error {
166+
return d.dial()
167+
}
168+
150169
// Close will close the service resources if needed
151170
func (d *Kafka) Close() error {
152171
if d.verbose {

0 commit comments

Comments
 (0)