Skip to content

Commit e197022

Browse files
committed
logical: use KVSavepoint for tombstone updates in table writer
Move tombstone updates from a separate transaction into the session's SQL transaction using KVSavepoint. Previously, attemptBatch ran tombstone updates in their own db.Txn after the session transaction committed. Now they run inline within the session transaction, each wrapped in a KV savepoint so that a LWW-loser ConditionFailedError on one tombstone does not abort the surrounding transaction. This reduces the number of transactions per batch from two to one. Part of: #169239 Epic: CRDB-60163 Release note: None
1 parent 4da4d68 commit e197022

1 file changed

Lines changed: 19 additions & 28 deletions

File tree

pkg/crosscluster/logical/table_batch_handler.go

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/crosscluster/logical/sqlwriter"
1414
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1515
"github.com/cockroachdb/cockroach/pkg/keys"
16+
"github.com/cockroachdb/cockroach/pkg/kv"
1617
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1718
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1819
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
@@ -30,7 +31,6 @@ type tableHandler struct {
3031
sqlReader sqlwriter.RowReader
3132
sqlWriter *sqlwriter.RowWriter
3233
session isql.Session
33-
db descs.DB
3434
tombstoneUpdater *tombstoneUpdater
3535
}
3636

@@ -132,7 +132,6 @@ func newTableHandler(
132132
return &tableHandler{
133133
sqlReader: reader,
134134
sqlWriter: writer,
135-
db: db,
136135
tombstoneUpdater: tombstoneUpdater,
137136
session: session,
138137
}, nil
@@ -170,7 +169,6 @@ func (t *tableHandler) attemptBatch(
170169
) (tableBatchStats, error) {
171170
var stats tableBatchStats
172171

173-
var hasTombstoneUpdates bool
174172
err := t.session.Txn(ctx, func(ctx context.Context) error {
175173
for _, event := range batch {
176174
switch {
@@ -181,8 +179,24 @@ func (t *tableHandler) attemptBatch(
181179
return err
182180
}
183181
case event.IsTombstoneUpdate():
184-
hasTombstoneUpdates = true
185-
// Skip: handled in its own transaction.
182+
stats.tombstoneUpdates++
183+
// Use a KV savepoint so that a LWW-loser ConditionFailedError
184+
// on one tombstone does not abort the surrounding transaction.
185+
err := t.session.KVSavepoint(ctx, func(ctx context.Context, txn *kv.Txn) error {
186+
batch := txn.NewBatch()
187+
batch.Header.WriteOptions = originID1Options
188+
if err := t.tombstoneUpdater.addToBatch(ctx, txn, batch, event.RowTimestamp, event.Row); err != nil {
189+
return err
190+
}
191+
return txn.Run(ctx, batch)
192+
})
193+
if err != nil {
194+
if isLwwLoser(err) {
195+
stats.kvLwwLosers++
196+
continue
197+
}
198+
return err
199+
}
186200
case event.IsInsertRow():
187201
stats.inserts++
188202
err := t.sqlWriter.InsertRow(ctx, event.RowTimestamp, event.Row)
@@ -211,29 +225,6 @@ func (t *tableHandler) attemptBatch(
211225
return tableBatchStats{}, err
212226
}
213227

214-
if hasTombstoneUpdates {
215-
// TODO(jeffswenson): once we have a way to expose the transaction used by
216-
// the Session, we should bundle this with the other txn. The purpose of
217-
// these transactions is batching writes in a transaction increases
218-
// efficiency. The transactions are not needed for correctness.
219-
err = t.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
220-
for _, event := range batch {
221-
if event.IsTombstoneUpdate() {
222-
stats.tombstoneUpdates++
223-
tombstoneUpdateStats, err := t.tombstoneUpdater.updateTombstone(ctx, txn, event.RowTimestamp, event.Row)
224-
if err != nil {
225-
return err
226-
}
227-
stats.kvLwwLosers += tombstoneUpdateStats.kvWriteTooOld
228-
}
229-
}
230-
return nil
231-
})
232-
if err != nil {
233-
return tableBatchStats{}, err
234-
}
235-
}
236-
237228
return stats, nil
238229
}
239230

0 commit comments

Comments
 (0)