@@ -13,6 +13,7 @@ import (
1313 "github.com/cockroachdb/cockroach/pkg/crosscluster/logical/sqlwriter"
1414 "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1515 "github.com/cockroachdb/cockroach/pkg/keys"
16+ "github.com/cockroachdb/cockroach/pkg/kv"
1617 "github.com/cockroachdb/cockroach/pkg/settings/cluster"
1718 "github.com/cockroachdb/cockroach/pkg/sql/catalog"
1819 "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
@@ -30,7 +31,6 @@ type tableHandler struct {
3031 sqlReader sqlwriter.RowReader
3132 sqlWriter * sqlwriter.RowWriter
3233 session isql.Session
33- db descs.DB
3434 tombstoneUpdater * tombstoneUpdater
3535}
3636
@@ -132,7 +132,6 @@ func newTableHandler(
132132 return & tableHandler {
133133 sqlReader : reader ,
134134 sqlWriter : writer ,
135- db : db ,
136135 tombstoneUpdater : tombstoneUpdater ,
137136 session : session ,
138137 }, nil
@@ -170,7 +169,6 @@ func (t *tableHandler) attemptBatch(
170169) (tableBatchStats , error ) {
171170 var stats tableBatchStats
172171
173- var hasTombstoneUpdates bool
174172 err := t .session .Txn (ctx , func (ctx context.Context ) error {
175173 for _ , event := range batch {
176174 switch {
@@ -181,8 +179,24 @@ func (t *tableHandler) attemptBatch(
181179 return err
182180 }
183181 case event .IsTombstoneUpdate ():
184- hasTombstoneUpdates = true
185- // Skip: handled in its own transaction.
182+ stats .tombstoneUpdates ++
183+ // Use a KV savepoint so that a LWW-loser ConditionFailedError
184+ // on one tombstone does not abort the surrounding transaction.
185+ err := t .session .KVSavepoint (ctx , func (ctx context.Context , txn * kv.Txn ) error {
186+ batch := txn .NewBatch ()
187+ batch .Header .WriteOptions = originID1Options
188+ if err := t .tombstoneUpdater .addToBatch (ctx , txn , batch , event .RowTimestamp , event .Row ); err != nil {
189+ return err
190+ }
191+ return txn .Run (ctx , batch )
192+ })
193+ if err != nil {
194+ if isLwwLoser (err ) {
195+ stats .kvLwwLosers ++
196+ continue
197+ }
198+ return err
199+ }
186200 case event .IsInsertRow ():
187201 stats .inserts ++
188202 err := t .sqlWriter .InsertRow (ctx , event .RowTimestamp , event .Row )
@@ -211,29 +225,6 @@ func (t *tableHandler) attemptBatch(
211225 return tableBatchStats {}, err
212226 }
213227
214- if hasTombstoneUpdates {
215- // TODO(jeffswenson): once we have a way to expose the transaction used by
216- // the Session, we should bundle this with the other txn. The purpose of
217- // these transactions is batching writes in a transaction increases
218- // efficiency. The transactions are not needed for correctness.
219- err = t .db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
220- for _ , event := range batch {
221- if event .IsTombstoneUpdate () {
222- stats .tombstoneUpdates ++
223- tombstoneUpdateStats , err := t .tombstoneUpdater .updateTombstone (ctx , txn , event .RowTimestamp , event .Row )
224- if err != nil {
225- return err
226- }
227- stats .kvLwwLosers += tombstoneUpdateStats .kvWriteTooOld
228- }
229- }
230- return nil
231- })
232- if err != nil {
233- return tableBatchStats {}, err
234- }
235- }
236-
237228 return stats , nil
238229}
239230
0 commit comments