Skip to content

Commit 861295a

Browse files
committed
feat: add health check based on write status #11607
1 parent fbf4286 commit 861295a

3 files changed

Lines changed: 57 additions & 11 deletions

File tree

server/common/module_shared.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ var log = logging.MustGetLogger("server_common")
3333

3434
const QUEUE_SIZE = 1 << 16
3535

36+
type IngesterStatus uint8
37+
38+
const (
39+
IngesterOk IngesterStatus = iota
40+
IngesterAbnormal
41+
IngesterInitializing
42+
)
43+
3644
type ControllerIngesterShared struct {
3745
ResourceEventQueue *queue.OverwriteQueue
3846
TraceTreeQueue *queue.OverwriteQueue
@@ -83,15 +91,16 @@ func ExportersEnabled(configPath string) bool {
8391
return false
8492
}
8593

86-
type OrgHanderInterface interface {
94+
type OrgHandlerInterface interface {
8795
DropOrg(orgId uint16) error
8896
UpdateNativeTag(nativetag.NativeTagOP, uint16, *nativetag.NativeTag) error
97+
IsHealthy() bool
8998
}
9099

91-
var ingesterOrgHanders []OrgHanderInterface
100+
var ingesterOrgHandlers []OrgHandlerInterface
92101

93-
func SetOrgHandler(orgHandler OrgHanderInterface) {
94-
ingesterOrgHanders = append(ingesterOrgHanders, orgHandler)
102+
func SetOrgHandler(orgHandler OrgHandlerInterface) {
103+
ingesterOrgHandlers = append(ingesterOrgHandlers, orgHandler)
95104
}
96105

97106
/*
@@ -115,11 +124,11 @@ func SetOrgHandler(orgHandler OrgHanderInterface) {
115124
*/
116125
func DropOrg(orgId uint16) error {
117126
log.Info("drop org id:", orgId)
118-
if ingesterOrgHanders == nil {
119-
return fmt.Errorf("ingesterOrgHanders is nil, drop org id %d failed", orgId)
127+
if ingesterOrgHandlers == nil {
128+
return fmt.Errorf("ingesterOrgHandlers is nil, drop org id %d failed", orgId)
120129
}
121-
for _, ingesterOrgHander := range ingesterOrgHanders {
122-
err := ingesterOrgHander.DropOrg(orgId)
130+
for _, orgHandler := range ingesterOrgHandlers {
131+
err := orgHandler.DropOrg(orgId)
123132
if err != nil {
124133
return err
125134
}
@@ -142,13 +151,13 @@ func PushNativeTags(orgId uint16, nativeTags []nativetag.NativeTag) {
142151
// When adding or removing native_tag, you need to call the interface
143152
func UpdateNativeTag(op nativetag.NativeTagOP, orgId uint16, nativeTag *nativetag.NativeTag) error {
144153
log.Infof("orgId %d %s native tag: %+v", orgId, op, nativeTag)
145-
if ingesterOrgHanders == nil {
154+
if ingesterOrgHandlers == nil {
146155
err := fmt.Errorf("ingester is not ready, update native tag failed")
147156
log.Error(err)
148157
return err
149158
}
150-
for _, ingesterOrgHander := range ingesterOrgHanders {
151-
err := ingesterOrgHander.UpdateNativeTag(op, orgId, nativeTag)
159+
for _, orgHandler := range ingesterOrgHandlers {
160+
err := orgHandler.UpdateNativeTag(op, orgId, nativeTag)
152161
if err != nil {
153162
log.Error(err)
154163
return err
@@ -157,3 +166,18 @@ func UpdateNativeTag(op nativetag.NativeTagOP, orgId uint16, nativeTag *nativeta
157166
nativetag.UpdateNativeTag(op, orgId, nativeTag)
158167
return nil
159168
}
169+
170+
func CheckIngesterStatus() IngesterStatus {
171+
if ingesterOrgHandlers == nil {
172+
log.Infof("ingester is initializing")
173+
return IngesterInitializing
174+
}
175+
for _, orgHandler := range ingesterOrgHandlers {
176+
// Treat the ingester as abnormal only after a handler has observed write failures without any successful writes.
177+
if !orgHandler.IsHealthy() {
178+
log.Errorf("ingester is abnormal")
179+
return IngesterAbnormal
180+
}
181+
}
182+
return IngesterOk
183+
}

server/ingester/ingester/org_handler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func (o *OrgHandler) DropOrg(orgId uint16) error {
6767
return o.dropOrgDatabase(orgId)
6868
}
6969

70+
func (o *OrgHandler) IsHealthy() bool {
71+
return true
72+
}
73+
7074
// FIXME: After clearing the Org data, if the same Org ID is created again later, data writing will fail. You can restart deepflow-server to solve it.
7175
func (o *OrgHandler) dropOrgDatabase(orgId uint16) error {
7276
if ckdb.IsDefaultOrgID(orgId) {

server/ingester/pkg/ckwriter/ckwriter.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,24 @@ func (m *CKWriterManager) EndpointsChange(addrs []string) {
9393
ckwriterManager.Unlock()
9494
}
9595

96+
func (m *CKWriterManager) IsHealthy() bool {
97+
ckwriterManager.Lock()
98+
defer ckwriterManager.Unlock()
99+
hasWriteFailure := false
100+
for _, writer := range m.ckwriters {
101+
for _, queueCtx := range writer.queueContexts {
102+
// Consider the writer healthy once any queue succeeds; before that, only an observed failure makes it abnormal.
103+
if queueCtx.counter.WriteSuccessCount > 0 {
104+
return true
105+
}
106+
if queueCtx.counter.WriteFailedCount > 0 {
107+
hasWriteFailure = true
108+
}
109+
}
110+
}
111+
return !hasWriteFailure
112+
}
113+
96114
type CKWriter struct {
97115
addrs []string
98116
user string

0 commit comments

Comments
 (0)