Skip to content

Commit e19742e

Browse files
logical: add SKIP conflict syntax for transactional LDR jobs (#170155)
logical: add SKIP conflict syntax for transactional LDR jobs
2 parents 5be101c + 424fd8e commit e19742e

14 files changed

Lines changed: 461 additions & 42 deletions

pkg/crosscluster/logical/txnapply/txn_applier.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -493,8 +493,8 @@ func (a *Applier) recordCompletion(
493493
// timestamp order, if txnIDs contains a txn at timestamp T,
494494
// there are no txns for this applier in the interval
495495
// (resolvedTime, T). The resolved time can safely advance
496-
// to T-1.
497-
a.mu.committed.UpdateResolvedTime(id.Timestamp.FloorPrev())
496+
// to T.Prev().
497+
a.mu.committed.UpdateResolvedTime(id.Timestamp.Prev())
498498
break
499499
}
500500
a.mu.committed.UpdateResolvedTime(id.Timestamp)

pkg/crosscluster/logical/txnmode/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ go_library(
6363

6464
go_test(
6565
name = "txnmode_test",
66+
size = "large",
6667
srcs = [
68+
"alter_logical_replication_test.go",
6769
"main_test.go",
6870
"node_router_test.go",
6971
"txnmode_dist_test.go",
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright 2026 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package txnmode_test
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"testing"
12+
13+
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/ldrtestutils"
14+
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
15+
"github.com/cockroachdb/cockroach/pkg/jobs"
16+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
18+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
19+
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
20+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
21+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
22+
"github.com/cockroachdb/cockroach/pkg/util/log"
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
func TestAlterLDRSkipAdvancesReplicatedTime(t *testing.T) {
27+
defer leaktest.AfterTest(t)()
28+
skip.UnderDeadlock(t)
29+
defer log.Scope(t).Close(t)
30+
ctx := context.Background()
31+
32+
srv, sourceDB, destDB := setupTxnModeTest(t)
33+
defer srv.Stopper().Stop(ctx)
34+
jobID := setupConflictingLDR(t, srv, sourceDB, destDB)
35+
jobutils.WaitForJobToPause(t, destDB, jobID)
36+
37+
replicatedBefore, err := ldrtestutils.GetReplicatedTime(t, destDB, jobID)
38+
require.NoError(t, err)
39+
40+
destDB.Exec(t, fmt.Sprintf("ALTER LOGICAL REPLICATION STREAM %d SKIP", jobID))
41+
42+
replicatedAfter, err := ldrtestutils.GetReplicatedTime(t, destDB, jobID)
43+
require.NoError(t, err)
44+
require.Equal(t, replicatedBefore.Next(), replicatedAfter)
45+
46+
destDB.Exec(t, "RESUME JOB $1", jobID)
47+
jobutils.WaitForJobToRun(t, destDB, jobID)
48+
49+
sourceDB.Exec(t, "INSERT INTO tab VALUES (2, 'after-skip')")
50+
ldrtestutils.WaitUntilReplicatedTime(t, srv.Clock().Now(), destDB, jobID)
51+
52+
destDB.CheckQueryResults(t,
53+
"SELECT pk, val FROM tab ORDER BY pk",
54+
[][]string{{"2", "after-skip"}, {"100", "collide"}},
55+
)
56+
}
57+
58+
func TestAlterLDRSkipRejectsInvalidState(t *testing.T) {
59+
defer leaktest.AfterTest(t)()
60+
skip.UnderDeadlock(t)
61+
defer log.Scope(t).Close(t)
62+
ctx := context.Background()
63+
64+
srv, sourceDB, destDB := setupTxnModeTest(t)
65+
defer srv.Stopper().Stop(ctx)
66+
67+
for _, db := range []*sqlutils.SQLRunner{sourceDB, destDB} {
68+
db.Exec(t, "CREATE TABLE tab (pk INT PRIMARY KEY, val STRING NOT NULL)")
69+
}
70+
71+
s := srv.ApplicationLayer()
72+
sourceURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("source_db"))
73+
74+
t.Run("skip on running job", func(t *testing.T) {
75+
var jobID jobspb.JobID
76+
destDB.QueryRow(t,
77+
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH MODE = 'transactional'",
78+
sourceURL.String(),
79+
).Scan(&jobID)
80+
jobutils.WaitForJobToRun(t, destDB, jobID)
81+
82+
_, err := destDB.DB.ExecContext(ctx,
83+
fmt.Sprintf("ALTER LOGICAL REPLICATION STREAM %d SKIP", jobID))
84+
require.Error(t, err)
85+
require.Contains(t, err.Error(), "is not paused")
86+
87+
destDB.Exec(t, "CANCEL JOB $1", jobID)
88+
jobutils.WaitForJobToHaveStatus(t, destDB, jobID, jobs.StateCanceled)
89+
})
90+
91+
t.Run("skip on non-LDR job", func(t *testing.T) {
92+
var nonLDRJobID jobspb.JobID
93+
destDB.QueryRow(t,
94+
"SELECT job_id FROM crdb_internal.jobs WHERE job_type != 'LOGICAL REPLICATION' LIMIT 1",
95+
).Scan(&nonLDRJobID)
96+
destDB.Exec(t, "PAUSE JOB $1", nonLDRJobID)
97+
jobutils.WaitForJobToPause(t, destDB, nonLDRJobID)
98+
99+
_, err := destDB.DB.ExecContext(ctx,
100+
fmt.Sprintf("ALTER LOGICAL REPLICATION STREAM %d SKIP", nonLDRJobID))
101+
require.Error(t, err)
102+
require.Contains(t, err.Error(), "is not a logical replication job")
103+
104+
destDB.Exec(t, "RESUME JOB $1", nonLDRJobID)
105+
})
106+
107+
t.Run("skip on non-transactional job", func(t *testing.T) {
108+
var jobID jobspb.JobID
109+
destDB.QueryRow(t,
110+
"CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH MODE = 'immediate'",
111+
sourceURL.String(),
112+
).Scan(&jobID)
113+
jobutils.WaitForJobToRun(t, destDB, jobID)
114+
destDB.Exec(t, "PAUSE JOB $1", jobID)
115+
jobutils.WaitForJobToPause(t, destDB, jobID)
116+
117+
_, err := destDB.DB.ExecContext(ctx,
118+
fmt.Sprintf("ALTER LOGICAL REPLICATION STREAM %d SKIP", jobID))
119+
require.Error(t, err)
120+
require.Contains(t, err.Error(), "SKIP is only supported for transactional mode")
121+
122+
destDB.Exec(t, "CANCEL JOB $1", jobID)
123+
jobutils.WaitForJobToHaveStatus(t, destDB, jobID, jobs.StateCanceled)
124+
})
125+
}

pkg/crosscluster/logical/txnmode/txnmode_dist.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -417,23 +417,24 @@ func (ch *checkpointHandler) handleMeta(
417417
return nil
418418
}
419419

420-
if replicatedTime.LessEq(ch.replicatedTime) {
420+
reachedEndTime := !replicatedTime.Less(ch.endTime.Prev())
421+
if replicatedTime.LessEq(ch.replicatedTime) && !reachedEndTime {
421422
return nil
422423
}
423-
424-
reachedEndTime := !replicatedTime.Less(ch.endTime.Prev())
425424
updateFreq := ldrsettings.JobCheckpointFrequency.Get(ch.sv)
426425
if !reachedEndTime && (updateFreq == 0 || timeutil.Since(ch.lastPersistenceTime) < updateFreq) {
427426
return nil
428427
}
429428

430-
if err := ch.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
431-
return ch.job.ProgressStorage().SetResolved(ctx, txn, replicatedTime)
432-
}); err != nil {
433-
return err
429+
if !replicatedTime.LessEq(ch.replicatedTime) {
430+
if err := ch.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
431+
return ch.job.ProgressStorage().SetResolved(ctx, txn, replicatedTime)
432+
}); err != nil {
433+
return err
434+
}
435+
ch.replicatedTime = replicatedTime
436+
ch.lastPersistenceTime = timeutil.Now()
434437
}
435-
ch.replicatedTime = replicatedTime
436-
ch.lastPersistenceTime = timeutil.Now()
437438

438439
select {
439440
case ch.frontierUpdates <- replicatedTime:

pkg/crosscluster/logical/txnmode/txnmode_pause_on_conflict_test.go

Lines changed: 96 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,20 @@ import (
2525
"github.com/stretchr/testify/require"
2626
)
2727

28-
// TestTxnModePauseOnConflict verifies that a transactional LDR job pauses when
29-
// a replicated transaction violates a unique constraint on the destination,
30-
// and that the conflicting row is not applied.
31-
func TestTxnModePauseOnConflict(t *testing.T) {
32-
defer leaktest.AfterTest(t)()
33-
skip.UnderDeadlock(t)
34-
defer log.Scope(t).Close(t)
35-
36-
ctx := context.Background()
28+
// setupTxnModeTest starts a server with source and destination databases
29+
// configured for low-latency replication. It is the caller's responsibility
30+
// to stop the returned server.
31+
func setupTxnModeTest(
32+
t *testing.T,
33+
) (serverutils.TestServerInterface, *sqlutils.SQLRunner, *sqlutils.SQLRunner) {
34+
t.Helper()
3735

3836
srv, conn, _ := serverutils.StartServer(t, base.TestServerArgs{
3937
DefaultTestTenant: base.TestDoesNotWorkWithExternalProcessMode(134857),
4038
Knobs: base.TestingKnobs{
4139
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
4240
},
4341
})
44-
defer srv.Stopper().Stop(ctx)
4542

4643
s := srv.ApplicationLayer()
4744
runner := sqlutils.MakeSQLRunner(conn)
@@ -55,14 +52,30 @@ func TestTxnModePauseOnConflict(t *testing.T) {
5552
sourceDB := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.DBName("source_db")))
5653
destDB := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.DBName("dest_db")))
5754

55+
return srv, sourceDB, destDB
56+
}
57+
58+
// setupConflictingLDR creates a table with a unique index on both source and
59+
// destination, seeds a conflicting row on the destination, starts a
60+
// transactional LDR stream, and inserts a conflicting row on the source.
61+
// The caller is responsible for waiting on the job state.
62+
func setupConflictingLDR(
63+
t *testing.T,
64+
srv serverutils.TestServerInterface,
65+
sourceDB *sqlutils.SQLRunner,
66+
destDB *sqlutils.SQLRunner,
67+
) jobspb.JobID {
68+
t.Helper()
69+
5870
for _, db := range []*sqlutils.SQLRunner{sourceDB, destDB} {
5971
db.Exec(t, "CREATE TABLE tab (pk INT PRIMARY KEY, val STRING NOT NULL)")
6072
db.Exec(t, "CREATE UNIQUE INDEX ON tab(val)")
6173
}
6274

6375
destDB.Exec(t, "INSERT INTO tab VALUES (100, 'collide')")
6476

65-
sourceURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("source_db"))
77+
sourceURL := replicationtestutils.GetExternalConnectionURI(
78+
t, srv.ApplicationLayer(), srv.ApplicationLayer(), serverutils.DBName("source_db"))
6679

6780
var jobID jobspb.JobID
6881
destDB.QueryRow(t,
@@ -71,7 +84,21 @@ func TestTxnModePauseOnConflict(t *testing.T) {
7184
).Scan(&jobID)
7285

7386
sourceDB.Exec(t, "INSERT INTO tab VALUES (1, 'collide')")
87+
return jobID
88+
}
89+
90+
// TestTxnModePauseOnConflict verifies that a transactional LDR job pauses when
91+
// a replicated transaction violates a unique constraint on the destination,
92+
// and that the conflicting row is not applied.
93+
func TestTxnModePauseOnConflict(t *testing.T) {
94+
defer leaktest.AfterTest(t)()
95+
skip.UnderDeadlock(t)
96+
defer log.Scope(t).Close(t)
97+
ctx := context.Background()
7498

99+
srv, sourceDB, destDB := setupTxnModeTest(t)
100+
defer srv.Stopper().Stop(ctx)
101+
jobID := setupConflictingLDR(t, srv, sourceDB, destDB)
75102
jobutils.WaitForJobToPause(t, destDB, jobID)
76103

77104
var runningStatus string
@@ -95,36 +122,74 @@ func TestTxnModePauseOnConflict(t *testing.T) {
95122
require.Equal(t, conflictMVCC.Prev(), replicatedTime)
96123
}
97124

98-
// TestTxnModePauseOnEarliestConflict verifies that when multiple replicated
99-
// transactions conflict at different timestamps, the job converges on the
100-
// first conflict (by timestamp) and drains every prior transaction before
101-
// pausing.
102-
func TestTxnModePauseOnEarliestConflict(t *testing.T) {
125+
// TestTxnModeResumeAfterFixingConflict verifies that when a transactional LDR
126+
// job pauses on a unique constraint conflict, the user can remove the
127+
// conflicting row on the destination, resume the job, and the
128+
// previously failed transaction is retried and applied successfully.
129+
func TestTxnModeResumeAfterFixingConflict(t *testing.T) {
103130
defer leaktest.AfterTest(t)()
104131
skip.UnderDeadlock(t)
105132
defer log.Scope(t).Close(t)
133+
ctx := context.Background()
134+
135+
srv, sourceDB, destDB := setupTxnModeTest(t)
136+
defer srv.Stopper().Stop(ctx)
137+
jobID := setupConflictingLDR(t, srv, sourceDB, destDB)
138+
jobutils.WaitForJobToPause(t, destDB, jobID)
139+
140+
// Remove the conflicting row on the destination so the transaction
141+
// can be retried successfully.
142+
destDB.Exec(t, "DELETE FROM tab WHERE pk = 100")
143+
144+
destDB.Exec(t, "RESUME JOB $1", jobID)
145+
jobutils.WaitForJobToRun(t, destDB, jobID)
106146

147+
now := srv.Clock().Now()
148+
ldrtestutils.WaitUntilReplicatedTime(t, now, destDB, jobID)
149+
150+
destDB.CheckQueryResults(t,
151+
"SELECT pk, val FROM tab ORDER BY pk",
152+
[][]string{{"1", "collide"}},
153+
)
154+
}
155+
156+
// TestTxnModeResumePausesAgainOnUnresolvedConflict verifies that resuming a
157+
// paused transactional LDR job without fixing the conflict causes the job to
158+
// pause again at the same replicated time.
159+
func TestTxnModeResumePausesAgainOnUnresolvedConflict(t *testing.T) {
160+
defer leaktest.AfterTest(t)()
161+
skip.UnderDeadlock(t)
162+
defer log.Scope(t).Close(t)
107163
ctx := context.Background()
108164

109-
srv, conn, _ := serverutils.StartServer(t, base.TestServerArgs{
110-
DefaultTestTenant: base.TestDoesNotWorkWithExternalProcessMode(134857),
111-
Knobs: base.TestingKnobs{
112-
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
113-
},
114-
})
165+
srv, sourceDB, destDB := setupTxnModeTest(t)
115166
defer srv.Stopper().Stop(ctx)
167+
jobID := setupConflictingLDR(t, srv, sourceDB, destDB)
168+
jobutils.WaitForJobToPause(t, destDB, jobID)
116169

117-
s := srv.ApplicationLayer()
118-
runner := sqlutils.MakeSQLRunner(conn)
170+
progressFirst := jobutils.GetJobProgress(t, destDB, jobID)
171+
replicatedFirst := progressFirst.Details.(*jobspb.Progress_LogicalReplication).LogicalReplication.ReplicatedTime
119172

120-
sysRunner := sqlutils.MakeSQLRunner(srv.SystemLayer().SQLConn(t))
121-
ldrtestutils.ApplyLowLatencyReplicationSettings(t, sysRunner, runner)
173+
destDB.Exec(t, "RESUME JOB $1", jobID)
174+
jobutils.WaitForJobToPause(t, destDB, jobID)
122175

123-
runner.Exec(t, "CREATE DATABASE source_db")
124-
runner.Exec(t, "CREATE DATABASE dest_db")
176+
progressSecond := jobutils.GetJobProgress(t, destDB, jobID)
177+
replicatedSecond := progressSecond.Details.(*jobspb.Progress_LogicalReplication).LogicalReplication.ReplicatedTime
178+
require.Equal(t, replicatedFirst, replicatedSecond)
179+
}
125180

126-
sourceDB := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.DBName("source_db")))
127-
destDB := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.DBName("dest_db")))
181+
// TestTxnModePauseOnEarliestConflict verifies that when multiple replicated
182+
// transactions conflict at different timestamps, the job converges on the
183+
// first conflict (by timestamp) and drains every prior transaction before
184+
// pausing.
185+
func TestTxnModePauseOnEarliestConflict(t *testing.T) {
186+
defer leaktest.AfterTest(t)()
187+
skip.UnderDeadlock(t)
188+
defer log.Scope(t).Close(t)
189+
ctx := context.Background()
190+
191+
srv, sourceDB, destDB := setupTxnModeTest(t)
192+
defer srv.Stopper().Stop(ctx)
128193

129194
for _, db := range []*sqlutils.SQLRunner{sourceDB, destDB} {
130195
db.Exec(t, "CREATE TABLE tab (pk INT PRIMARY KEY, val STRING NOT NULL, extra STRING NOT NULL)")
@@ -134,6 +199,7 @@ func TestTxnModePauseOnEarliestConflict(t *testing.T) {
134199

135200
destDB.Exec(t, "INSERT INTO tab VALUES (100, 'first-collide', 'pre-1'), (101, 'pre-2', 'second-collide')")
136201

202+
s := srv.ApplicationLayer()
137203
sourceURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("source_db"))
138204

139205
var jobID jobspb.JobID

pkg/jobs/update.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,12 @@ WHERE id = $1
283283
var ts hlc.Timestamp
284284
if hwm := progress.GetHighWater(); hwm != nil {
285285
ts = *hwm
286+
} else {
287+
_, existing, _, err := j.ProgressStorage().Get(ctx, u.txn)
288+
if err != nil {
289+
return err
290+
}
291+
ts = existing
286292
}
287293

288294
if err := j.ProgressStorage().Set(ctx, u.txn, float64(progress.GetFractionCompleted()), ts); err != nil {

pkg/sql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"alter_index.go",
1616
"alter_index_visible.go",
1717
"alter_job_owner.go",
18+
"alter_logical_replication_stmt.go",
1819
"alter_primary_key.go",
1920
"alter_role.go",
2021
"alter_schema.go",

0 commit comments

Comments
 (0)