Skip to content

Commit 9a7bc05

Browse files
authored
Fix watermark (#323)
* don't report watermark if no waiting msg * report watermark in /status
1 parent 5ba1c4c commit 9a7bc05

5 files changed

Lines changed: 47 additions & 16 deletions

File tree

cmd/gravity/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ func statusHandler(server *app.Server, name, hash string) func(http.ResponseWrit
210210
Name: name,
211211
ConfigHash: hash,
212212
Position: v,
213+
Watermarks: server.Scheduler.Watermarks(),
213214
Stage: state,
214215
Version: utils.Version,
215216
}

pkg/core/msg.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"strings"
77
"time"
88

9+
"github.com/moiot/gravity/pkg/sliding_window"
10+
911
"github.com/juju/errors"
1012
"github.com/pingcap/parser/ast"
1113
log "github.com/sirupsen/logrus"
@@ -190,11 +192,12 @@ const (
190192
)
191193

192194
type TaskReportStatus struct {
193-
Name string `json:"name"`
194-
ConfigHash string `json:"configHash"`
195-
Position string `json:"position"`
196-
Stage TaskReportStage `json:"stage"`
197-
Version string `json:"version"`
195+
Name string `json:"name"`
196+
ConfigHash string `json:"configHash"`
197+
Position string `json:"position"`
198+
Watermarks map[string]sliding_window.Watermark `json:"watermarks"`
199+
Stage TaskReportStage `json:"stage"`
200+
Version string `json:"version"`
198201
}
199202

200203
func HashConfig(config string) string {

pkg/core/scheduler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package core
22

3+
import "github.com/moiot/gravity/pkg/sliding_window"
4+
35
type Scheduler interface {
46
MsgSubmitter
57
MsgAcker
68
Healthy() bool
9+
Watermarks() map[string]sliding_window.Watermark
710
Start(output Output) error
811
Close()
912
}

pkg/schedulers/batch_table_scheduler/batch_table_scheduler.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,23 @@ func (scheduler *batchScheduler) Healthy() bool {
173173
}
174174
}
175175

176+
func (scheduler *batchScheduler) Watermarks() (result map[string]sliding_window.Watermark) {
177+
if scheduler.cfg.SlidingWindowSize == 0 {
178+
return
179+
}
180+
181+
result = make(map[string]sliding_window.Watermark)
182+
183+
scheduler.windowMutex.Lock()
184+
defer scheduler.windowMutex.Unlock()
185+
186+
for k, w := range scheduler.slidingWindows {
187+
result[k] = w.Watermark()
188+
}
189+
190+
return
191+
}
192+
176193
func (scheduler *batchScheduler) Start(output core.Output) error {
177194
if scheduler.cfg.MaxBatchPerWorker > config.TxnBufferLimit-1 {
178195
return errors.Errorf("max batch per worker exceed txn buffer limit")

pkg/sliding_window/static_sliding_window.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -171,19 +171,26 @@ func (w *staticSlidingWindow) reportMetrics() {
171171
for {
172172
select {
173173
case <-ticker.C:
174-
watermark := w.Watermark()
175-
176-
// ProcessTime can be seen as the duration that event are in the queue.
177-
seconds := time.Since(watermark.ProcessTime).Seconds()
178-
if seconds > ProcessDelayWarningThreshold {
179-
log.Warnf("[sliding_window] item not ack after %f seconds. %s", seconds, w.nextItemToCommit)
174+
waitingCnt := len(w.waitingItemC)
175+
176+
if waitingCnt > 0 {
177+
watermark := w.Watermark()
178+
179+
// ProcessTime can be seen as the duration that event are in the queue.
180+
seconds := time.Since(watermark.ProcessTime).Seconds()
181+
if seconds > ProcessDelayWarningThreshold {
182+
log.Warnf("[sliding_window] item not ack after %f seconds. %s", seconds, w.nextItemToCommit)
183+
}
184+
metrics.End2EndProcessTimeHistogram.WithLabelValues(env.PipelineName).Observe(seconds)
185+
186+
// EventTime can be seen as the end to end duration of event process time.
187+
metrics.End2EndEventTimeHistogram.WithLabelValues(env.PipelineName).Observe(time.Since(watermark.EventTime).Seconds())
188+
} else {
189+
metrics.End2EndProcessTimeHistogram.WithLabelValues(env.PipelineName).Observe(0)
190+
metrics.End2EndEventTimeHistogram.WithLabelValues(env.PipelineName).Observe(0)
180191
}
181-
metrics.End2EndProcessTimeHistogram.WithLabelValues(env.PipelineName).Observe(seconds)
182-
183-
// EventTime can be seen as the end to end duration of event process time.
184-
metrics.End2EndEventTimeHistogram.WithLabelValues(env.PipelineName).Observe(time.Since(watermark.EventTime).Seconds())
185192

186-
metrics.QueueLength.WithLabelValues(env.PipelineName, "sliding-window-waiting-chan", w.name).Set(float64(len(w.waitingItemC)))
193+
metrics.QueueLength.WithLabelValues(env.PipelineName, "sliding-window-waiting-chan", w.name).Set(float64(waitingCnt))
187194
metrics.QueueLength.WithLabelValues(env.PipelineName, "sliding-window-ready-chan", w.name).Set(float64(len(w.readyC)))
188195
metrics.QueueLength.WithLabelValues(env.PipelineName, "sliding-window-ready-heap", w.name).Set(float64(atomic.LoadInt64(&w.heapSize)))
189196

0 commit comments

Comments
 (0)