Skip to content

Commit 1589e75

Browse files
authored
pkg/dxf/importinto, pkg/executor: show conflict progress in SHOW IMPORT JOB (#67551)
ref #66019
1 parent 885b297 commit 1589e75

29 files changed

Lines changed: 245 additions & 110 deletions

pkg/ddl/backfilling_clean_s3.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func sendMeterOnCleanUp(ctx context.Context, task *proto.Task, logger *zap.Logge
109109
return errors.Trace(err)
110110
}
111111
rowCount += summary.RowCnt.Load()
112-
indexKVSize += summary.Bytes.Load()
112+
indexKVSize += summary.Processed.Load()
113113
}
114114
return handle.SendRowAndSizeMeterData(ctx, task, rowCount, 0, indexKVSize, logger)
115115
}

pkg/ddl/backfilling_read_index.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ func (d *distTaskRowCntCollector) Accepted(bytes int64) {
490490
}
491491

492492
func (d *distTaskRowCntCollector) Processed(bytes, rowCnt int64) {
493-
d.summary.Bytes.Add(bytes)
493+
d.summary.Processed.Add(bytes)
494494
d.summary.RowCnt.Add(rowCnt)
495495
d.counter.Add(float64(rowCnt))
496496
}

pkg/dxf/framework/storage/table_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,8 @@ func TestGetSubtaskSummaries(t *testing.T) {
333333
require.NoError(t, tm.SwitchTaskStepInBatch(ctx, task, proto.TaskStateRunning, proto.StepOne, subtasks))
334334

335335
summary := &execute.SubtaskSummary{
336-
RowCnt: *atomic.NewInt64(100),
337-
Bytes: *atomic.NewInt64(200),
336+
RowCnt: *atomic.NewInt64(100),
337+
Processed: *atomic.NewInt64(200),
338338
}
339339
bytes, err := json.Marshal(summary)
340340
require.NoError(t, err)
@@ -345,7 +345,7 @@ func TestGetSubtaskSummaries(t *testing.T) {
345345
require.Len(t, summaries, len(subtasks))
346346
for _, summary := range summaries {
347347
require.EqualValues(t, 100, summary.RowCnt.Load())
348-
require.EqualValues(t, 200, summary.Bytes.Load())
348+
require.EqualValues(t, 200, summary.Processed.Load())
349349
}
350350

351351
// If the JSON value is wrong, we still get an empty summary.
@@ -356,7 +356,7 @@ func TestGetSubtaskSummaries(t *testing.T) {
356356
require.NoError(t, err)
357357
for _, summary := range summaries {
358358
require.EqualValues(t, 0, summary.RowCnt.Load())
359-
require.EqualValues(t, 0, summary.Bytes.Load())
359+
require.EqualValues(t, 0, summary.Processed.Load())
360360
}
361361
}
362362

pkg/dxf/framework/taskexecutor/execute/interface.go

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ type Progress struct {
8686
// For now, RowCnt is not used, but as it's collected by the collector,
8787
// we still keep it here for future possible usage.
8888
RowCnt int64 `json:"row_count,omitempty"`
89-
Bytes int64 `json:"bytes,omitempty"`
89+
// Processed stores generic progress units, such as bytes processed or rows processed.
90+
// Keep the JSON tag as "bytes" for backward compatibility with persisted summaries.
91+
Processed int64 `json:"bytes,omitempty"`
9092

9193
// UpdateTime is the time when this progress is stored.
9294
UpdateTime time.Time `json:"update_time,omitempty"`
@@ -95,10 +97,12 @@ type Progress struct {
9597
// SubtaskSummary contains the summary of a subtask.
9698
// It tracks the runtime summary of the subtask.
9799
type SubtaskSummary struct {
98-
// RowCnt and Bytes are updated by the collector.
100+
// RowCnt and Processed are updated by the collector.
99101
RowCnt atomic.Int64 `json:"row_count,omitempty"`
100-
// Bytes is the number of bytes to process.
101-
Bytes atomic.Int64 `json:"bytes,omitempty"`
102+
// Processed is the number of processed units reported by the collector.
103+
// Its unit may be bytes processed or rows processed, depending on the step.
104+
// Keep the JSON tag as "bytes" for backward compatibility with persisted summaries.
105+
Processed atomic.Int64 `json:"bytes,omitempty"`
102106
// ReadBytes is the number of bytes that read from the source.
103107
ReadBytes atomic.Int64 `json:"read_bytes,omitempty"`
104108
// GetReqCnt is the number of get requests to the external storage.
@@ -124,7 +128,7 @@ func (s *SubtaskSummary) MergeObjStoreRequests(reqs *recording.Requests) {
124128
func (s *SubtaskSummary) Update() {
125129
s.Progresses = append(s.Progresses, Progress{
126130
RowCnt: s.RowCnt.Load(),
127-
Bytes: s.Bytes.Load(),
131+
Processed: s.Processed.Load(),
128132
UpdateTime: time.Now(),
129133
})
130134

@@ -145,15 +149,15 @@ func (s *SubtaskSummary) GetSpeedInTimeRange(endTime time.Time, duration time.Du
145149
}
146150

147151
// The number of point is small, so we can afford to iterate through all points.
148-
var totalBytes float64
152+
var totalProcessed float64
149153
for i := range len(s.Progresses) - 1 {
150154
rangeStart := s.Progresses[i].UpdateTime
151155
rangeEnd := s.Progresses[i+1].UpdateTime
152-
rangeBytes := float64(s.Progresses[i+1].Bytes - s.Progresses[i].Bytes)
156+
rangeProcessed := float64(s.Progresses[i+1].Processed - s.Progresses[i].Processed)
153157
if endTime.Before(rangeStart) || startTime.After(rangeEnd) {
154158
continue
155159
} else if startTime.Before(rangeStart) && endTime.After(rangeEnd) {
156-
totalBytes += rangeBytes
160+
totalProcessed += rangeProcessed
157161
continue
158162
}
159163

@@ -167,10 +171,10 @@ func (s *SubtaskSummary) GetSpeedInTimeRange(endTime time.Time, duration time.Du
167171
iEnd = endTime
168172
}
169173

170-
totalBytes += rangeBytes * float64(iEnd.Sub(iStart)) / float64(rangeEnd.Sub(rangeStart))
174+
totalProcessed += rangeProcessed * float64(iEnd.Sub(iStart)) / float64(rangeEnd.Sub(rangeStart))
171175
}
172176

173-
return int64(totalBytes / duration.Seconds())
177+
return int64(totalProcessed / duration.Seconds())
174178
}
175179

176180
// UpdateTime returns the last update time of the summary.
@@ -184,7 +188,7 @@ func (s *SubtaskSummary) UpdateTime() time.Time {
184188
// Reset resets the summary to zero values and clears history data.
185189
func (s *SubtaskSummary) Reset() {
186190
s.RowCnt.Store(0)
187-
s.Bytes.Store(0)
191+
s.Processed.Store(0)
188192
s.ReadBytes.Store(0)
189193
s.PutReqCnt.Store(0)
190194
s.GetReqCnt.Store(0)
@@ -198,12 +202,14 @@ type Collector interface {
198202
// The difference between Accepted and Processed is that Accepted is called
199203
// when the data is accepted to be processed.
200204
Accepted(bytes int64)
201-
// Processed is used collects metrics.
202-
// `bytes` is the number of bytes processed, and `rows` is the number of rows processed.
203-
// The meaning of `bytes` may vary by scenario, for example:
204-
// - During encoding, it represents the number of bytes read from the source data file.
205-
// - During merge sort, it represents the number of bytes merged.
206-
Processed(bytes, rows int64)
205+
// Processed collects progress metrics.
206+
// `processedUnits` is the number of processed units, and `rows` is the number
207+
// of rows processed.
208+
// The meaning of `processedUnits` may vary by scenario, for example:
209+
// - During encoding, it represents bytes read from source data files.
210+
// - During merge sort, it represents bytes merged.
211+
// - During conflict handling, it represents processed conflict KV pairs.
212+
Processed(processedUnits, rows int64)
207213
}
208214

209215
// NoopCollector is a no-op implementation of Collector.
@@ -218,9 +224,9 @@ func (*NoopCollector) Processed(_, _ int64) {}
218224
// TestCollector is an implementation used for test.
219225
type TestCollector struct {
220226
NoopCollector
221-
ReadBytes atomic.Int64
222-
Bytes atomic.Int64
223-
Rows atomic.Int64
227+
ReadBytes atomic.Int64
228+
ProcessedCnt atomic.Int64
229+
Rows atomic.Int64
224230
}
225231

226232
// Accepted implements Collector.Accepted
@@ -229,8 +235,8 @@ func (c *TestCollector) Accepted(bytes int64) {
229235
}
230236

231237
// Processed implements Collector.Processed
232-
func (c *TestCollector) Processed(bytes, rows int64) {
233-
c.Bytes.Add(bytes)
238+
func (c *TestCollector) Processed(processedUnits, rows int64) {
239+
c.ProcessedCnt.Add(processedUnits)
234240
c.Rows.Add(rows)
235241
}
236242

pkg/dxf/framework/taskexecutor/execute/interface_test.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestSubtaskSummaryGetSpeed(t *testing.T) {
3434
name: "insufficient data points",
3535
setup: func(s *SubtaskSummary) {
3636
s.Progresses = []Progress{
37-
{Bytes: 100, UpdateTime: time.Unix(1000, 0)},
37+
{Processed: 100, UpdateTime: time.Unix(1000, 0)},
3838
}
3939
},
4040
endTime: time.Unix(1010, 0),
@@ -47,8 +47,8 @@ func TestSubtaskSummaryGetSpeed(t *testing.T) {
4747
setup: func(s *SubtaskSummary) {
4848
baseTime := time.Unix(1000, 0)
4949
s.Progresses = []Progress{
50-
{Bytes: 0, UpdateTime: baseTime},
51-
{Bytes: 100, UpdateTime: baseTime.Add(1 * time.Second)},
50+
{Processed: 0, UpdateTime: baseTime},
51+
{Processed: 100, UpdateTime: baseTime.Add(1 * time.Second)},
5252
}
5353
},
5454
endTime: time.Unix(1010, 0),
@@ -61,10 +61,10 @@ func TestSubtaskSummaryGetSpeed(t *testing.T) {
6161
setup: func(s *SubtaskSummary) {
6262
baseTime := time.Unix(1000, 0)
6363
s.Progresses = []Progress{
64-
{Bytes: 0, UpdateTime: baseTime},
65-
{Bytes: 50, UpdateTime: baseTime.Add(1 * time.Second)},
66-
{Bytes: 100, UpdateTime: baseTime.Add(2 * time.Second)},
67-
{Bytes: 150, UpdateTime: baseTime.Add(3 * time.Second)},
64+
{Processed: 0, UpdateTime: baseTime},
65+
{Processed: 50, UpdateTime: baseTime.Add(1 * time.Second)},
66+
{Processed: 100, UpdateTime: baseTime.Add(2 * time.Second)},
67+
{Processed: 150, UpdateTime: baseTime.Add(3 * time.Second)},
6868
}
6969
},
7070
endTime: time.Unix(1002, 500000000),
@@ -77,10 +77,10 @@ func TestSubtaskSummaryGetSpeed(t *testing.T) {
7777
setup: func(s *SubtaskSummary) {
7878
baseTime := time.Unix(1000, 0)
7979
s.Progresses = []Progress{
80-
{Bytes: 0, UpdateTime: baseTime},
81-
{Bytes: 30, UpdateTime: baseTime.Add(1 * time.Second)},
82-
{Bytes: 60, UpdateTime: baseTime.Add(2 * time.Second)},
83-
{Bytes: 90, UpdateTime: baseTime.Add(3 * time.Second)},
80+
{Processed: 0, UpdateTime: baseTime},
81+
{Processed: 30, UpdateTime: baseTime.Add(1 * time.Second)},
82+
{Processed: 60, UpdateTime: baseTime.Add(2 * time.Second)},
83+
{Processed: 90, UpdateTime: baseTime.Add(3 * time.Second)},
8484
}
8585
},
8686
endTime: time.Unix(1004, 0),
@@ -93,11 +93,11 @@ func TestSubtaskSummaryGetSpeed(t *testing.T) {
9393
setup: func(s *SubtaskSummary) {
9494
baseTime := time.Unix(1000, 0)
9595
s.Progresses = []Progress{
96-
{Bytes: 0, UpdateTime: baseTime},
97-
{Bytes: 60, UpdateTime: baseTime.Add(1 * time.Second)},
98-
{Bytes: 120, UpdateTime: baseTime.Add(2 * time.Second)},
99-
{Bytes: 180, UpdateTime: baseTime.Add(3 * time.Second)},
100-
{Bytes: 240, UpdateTime: baseTime.Add(4 * time.Second)},
96+
{Processed: 0, UpdateTime: baseTime},
97+
{Processed: 60, UpdateTime: baseTime.Add(1 * time.Second)},
98+
{Processed: 120, UpdateTime: baseTime.Add(2 * time.Second)},
99+
{Processed: 180, UpdateTime: baseTime.Add(3 * time.Second)},
100+
{Processed: 240, UpdateTime: baseTime.Add(4 * time.Second)},
101101
}
102102
},
103103
endTime: time.Unix(1004, 500000000),
@@ -110,11 +110,11 @@ func TestSubtaskSummaryGetSpeed(t *testing.T) {
110110
setup: func(s *SubtaskSummary) {
111111
baseTime := time.Unix(1000, 0)
112112
s.Progresses = []Progress{
113-
{Bytes: 0, UpdateTime: baseTime},
114-
{Bytes: 60, UpdateTime: baseTime.Add(1 * time.Second)},
115-
{Bytes: 120, UpdateTime: baseTime.Add(2 * time.Second)},
116-
{Bytes: 180, UpdateTime: baseTime.Add(3 * time.Second)},
117-
{Bytes: 240, UpdateTime: baseTime.Add(4 * time.Second)},
113+
{Processed: 0, UpdateTime: baseTime},
114+
{Processed: 60, UpdateTime: baseTime.Add(1 * time.Second)},
115+
{Processed: 120, UpdateTime: baseTime.Add(2 * time.Second)},
116+
{Processed: 180, UpdateTime: baseTime.Add(3 * time.Second)},
117+
{Processed: 240, UpdateTime: baseTime.Add(4 * time.Second)},
118118
}
119119
},
120120
endTime: time.Unix(1004, 0),
@@ -127,11 +127,11 @@ func TestSubtaskSummaryGetSpeed(t *testing.T) {
127127
setup: func(s *SubtaskSummary) {
128128
baseTime := time.Unix(1001, 0)
129129
s.Progresses = []Progress{
130-
{Bytes: 0, UpdateTime: baseTime},
131-
{Bytes: 60, UpdateTime: baseTime.Add(1 * time.Second)},
132-
{Bytes: 120, UpdateTime: baseTime.Add(2 * time.Second)},
133-
{Bytes: 180, UpdateTime: baseTime.Add(3 * time.Second)},
134-
{Bytes: 240, UpdateTime: baseTime.Add(4 * time.Second)},
130+
{Processed: 0, UpdateTime: baseTime},
131+
{Processed: 60, UpdateTime: baseTime.Add(1 * time.Second)},
132+
{Processed: 120, UpdateTime: baseTime.Add(2 * time.Second)},
133+
{Processed: 180, UpdateTime: baseTime.Add(3 * time.Second)},
134+
{Processed: 240, UpdateTime: baseTime.Add(4 * time.Second)},
135135
}
136136
},
137137
endTime: time.Unix(1006, 500000000),

pkg/dxf/importinto/collect_conflicts.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type collectConflictsStepExecutor struct {
6868
}
6969

7070
var _ execute.StepExecutor = &collectConflictsStepExecutor{}
71+
var _ execute.Collector = &collectConflictsStepExecutor{}
7172

7273
// NewCollectConflictsStepExecutor creates a new collectConflictsStepExecutor.
7374
// exported for test.
@@ -200,6 +201,7 @@ func (e *collectConflictsStepExecutor) collectConflictsOfKVGroup(
200201
e.sharedHandleSet,
201202
localSet,
202203
&e.sizeOfConflictRowFiles,
204+
e,
203205
e.GetMeterRecorder(),
204206
)
205207
eg.Go(func() (err error) {
@@ -251,6 +253,14 @@ func (e *collectConflictsStepExecutor) ResetSummary() {
251253
e.summary.Reset()
252254
}
253255

256+
// Accepted implements Collector.Accepted interface.
257+
func (*collectConflictsStepExecutor) Accepted(_ int64) {}
258+
259+
// Processed implements Collector.Processed interface.
260+
func (e *collectConflictsStepExecutor) Processed(processedConflictKVs, _ int64) {
261+
e.summary.Processed.Add(processedConflictKVs)
262+
}
263+
254264
// getConflictRowFilenamePrefix returns the file name prefix to store the conflict
255265
// rows for the given task and subtask.
256266
func getConflictRowFilenamePrefix(taskID, subtaskID int64, uuid string) string {

pkg/dxf/importinto/conflict_resolution.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type conflictResolutionStepExecutor struct {
4848
}
4949

5050
var _ execute.StepExecutor = &conflictResolutionStepExecutor{}
51+
var _ execute.Collector = &conflictResolutionStepExecutor{}
5152

5253
// NewConflictResolutionStepExecutor creates a new StepExecutor for conflict
5354
// resolution step, exported for test.
@@ -139,7 +140,7 @@ func (e *conflictResolutionStepExecutor) resolveConflictsOfKVGroup(
139140
pairCh := external.ReadKVFilesAsync(egCtx, eg, objStore, ci.Files)
140141
for i := range concurrency {
141142
encoder := encoders[i]
142-
deleter := conflictedkv.NewDeleter(e.tableImporter.Table, e.logger, e.store, kvGroup, encoder, e.GetMeterRecorder())
143+
deleter := conflictedkv.NewDeleter(e.tableImporter.Table, e.logger, e.store, kvGroup, encoder, e, e.GetMeterRecorder())
143144
eg.Go(func() error {
144145
return deleter.Run(egCtx, pairCh)
145146
})
@@ -162,6 +163,14 @@ func (e *conflictResolutionStepExecutor) ResetSummary() {
162163
e.summary.Reset()
163164
}
164165

166+
// Accepted implements Collector.Accepted interface.
167+
func (*conflictResolutionStepExecutor) Accepted(_ int64) {}
168+
169+
// Processed implements Collector.Processed interface.
170+
func (e *conflictResolutionStepExecutor) Processed(processedConflictKVs, _ int64) {
171+
e.summary.Processed.Add(processedConflictKVs)
172+
}
173+
165174
// when create encoder, if the table have generated column, when calling
166175
// backend/kv.CollectGeneratedColumns(), buildSimpleExpr will rewrite the AST node,
167176
// and data race. and the data race might happen during encoding, in

pkg/dxf/importinto/conflictedkv/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
deps = [
1515
"//pkg/config/kerneltype",
1616
"//pkg/dxf/framework/handle",
17+
"//pkg/dxf/framework/taskexecutor/execute",
1718
"//pkg/executor/importer",
1819
"//pkg/kv",
1920
"//pkg/lightning/backend/external",
@@ -53,6 +54,7 @@ go_test(
5354
shard_count = 8,
5455
deps = [
5556
"//pkg/config/kerneltype",
57+
"//pkg/dxf/framework/taskexecutor/execute",
5658
"//pkg/executor/importer",
5759
"//pkg/kv",
5860
"//pkg/lightning/backend/encode",

pkg/dxf/importinto/conflictedkv/collector.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/docker/go-units"
2424
"github.com/pingcap/errors"
2525
"github.com/pingcap/failpoint"
26+
"github.com/pingcap/tidb/pkg/dxf/framework/taskexecutor/execute"
2627
"github.com/pingcap/tidb/pkg/executor/importer"
2728
tidbkv "github.com/pingcap/tidb/pkg/kv"
2829
"github.com/pingcap/tidb/pkg/lightning/backend/external"
@@ -113,6 +114,7 @@ func NewCollector(
113114
encoder *importer.TableKVEncoder,
114115
globalSet, localSet *BoundedHandleSet,
115116
sharedTotalFileSize *atomic.Int64,
117+
progressCollector execute.Collector,
116118
trafficRec TrafficRecorder,
117119
) *Collector {
118120
// Safety check: if caller doesn't pass the shared counter, allocate one to
@@ -129,7 +131,7 @@ func NewCollector(
129131
hdlSet: localSet,
130132
sharedTotalFileSize: sharedTotalFileSize,
131133
}
132-
base := NewBaseHandler(targetTbl, kvGroup, encoder, collector, logger)
134+
base := NewBaseHandler(targetTbl, kvGroup, encoder, collector, progressCollector, logger)
133135
var h Handler
134136
if kvGroup == external.DataKVGroup {
135137
h = NewDataKVHandler(base)

0 commit comments

Comments
 (0)