Skip to content
This repository was archived by the owner on Mar 4, 2026. It is now read-only.

Commit 2e48fde

Browse files
authored
chore: add mutation key heuristic for mutation only transaction (#2345)
1 parent 0666f05 commit 2e48fde

4 files changed

Lines changed: 607 additions & 1 deletion

File tree

src/transaction.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ export class Snapshot extends EventEmitter {
311311
_observabilityOptions?: ObservabilityOptions;
312312
_traceConfig: traceConfig;
313313
protected _dbName?: string;
314+
protected _mutationKey: spannerClient.spanner.v1.Mutation | null;
314315

315316
/**
316317
* The transaction ID.
@@ -383,6 +384,7 @@ export class Snapshot extends EventEmitter {
383384
dbName: this._dbName,
384385
};
385386
this._latestPreCommitToken = null;
387+
this._mutationKey = null;
386388
}
387389

388390
protected _updatePrecommitToken(resp: PrecommitTokenProvider): void {
@@ -395,6 +397,102 @@ export class Snapshot extends EventEmitter {
395397
}
396398
}
397399

400+
/**
401+
* Selects a single representative mutation from a list to be used as the
402+
* transaction's `mutationKey`.
403+
*
404+
* This key is required by Spanner and is sent in the `BeginTransactionRequest`
405+
* for read-write transactions that only contain mutations. The selection follows
406+
* a two-tiered heuristic to choose the most significant mutation.
407+
*
408+
* The selection heuristic is as follows:
409+
*
410+
* 1. Priority of Operation Type: High-priority mutations (`delete`, `update`,
411+
* `replace`, `insertOrUpdate`) are always chosen over low-priority
412+
* (`insert`) mutations.
413+
*
414+
* 2. Selection Strategy:
415+
* - If any high-priority mutations exist, one is chosen randomly from
416+
* that group, ignoring the number of rows.
417+
* - If only `insert` mutations exist, the one(s) with the largest number
418+
* of rows are identified, and one is chosen randomly from that subset.
419+
*
420+
* @protected
421+
* @param mutations The list of mutations from which to select the key.
422+
*/
423+
protected _setMutationKey(mutations: spannerClient.spanner.v1.Mutation[]) {
424+
// return if the list is empty
425+
if (mutations.length === 0) {
426+
return;
427+
}
428+
429+
// maintain a set of high priority keys
430+
const HIGH_PRIORITY_KEYS = new Set([
431+
'delete',
432+
'update',
433+
'replace',
434+
'insertOrUpdate',
435+
]);
436+
437+
// maintain a variable for low priority key
438+
const LOW_PRIORITY_KEY = 'insert';
439+
440+
// Partition mutations into high and low priority groups.
441+
const [highPriority, lowPriority] = mutations.reduce(
442+
(acc, mutation) => {
443+
const key = Object.keys(mutation)[0] as keyof typeof mutation;
444+
if (HIGH_PRIORITY_KEYS.has(key)) {
445+
acc[0].push(mutation);
446+
} else if (key === LOW_PRIORITY_KEY) {
447+
acc[1].push(mutation);
448+
}
449+
// return accumulated mutations list
450+
return acc;
451+
},
452+
[[], []] as [
453+
spannerClient.spanner.v1.Mutation[],
454+
spannerClient.spanner.v1.Mutation[],
455+
],
456+
);
457+
458+
// Apply the selection logic based on the rules.
459+
if (highPriority.length > 0) {
460+
// RULE 1: If high-priority keys exist, pick one randomly.
461+
const randomIndex = Math.floor(Math.random() * highPriority.length);
462+
this._mutationKey = highPriority[randomIndex];
463+
} else if (lowPriority.length > 0) {
464+
// RULE 2: If only 'insert' key(s) exist, find the one with
465+
// highest number of values
466+
const {bestCandidates} = lowPriority.reduce(
467+
(acc, mutation) => {
468+
const size = mutation.insert?.values?.length || 0;
469+
470+
if (size > acc.maxSize) {
471+
// New largest size found, start a new list
472+
return {maxSize: size, bestCandidates: [mutation]};
473+
}
474+
if (size === acc.maxSize) {
475+
// Same size as current max, add to list
476+
acc.bestCandidates.push(mutation);
477+
}
478+
// return accumulated mutations list
479+
return acc;
480+
},
481+
{
482+
maxSize: -1,
483+
bestCandidates: [] as spannerClient.spanner.v1.Mutation[],
484+
},
485+
);
486+
487+
// Pick randomly from the largest 'insert' mutation(s).
488+
const randomIndex = Math.floor(Math.random() * bestCandidates.length);
489+
this._mutationKey = bestCandidates[randomIndex];
490+
} else {
491+
// No mutations to select from.
492+
this._mutationKey = null;
493+
}
494+
}
495+
398496
/**
399497
* Modifies transaction selector to include the multiplexed session previous
400498
* transaction id.
@@ -486,6 +584,10 @@ export class Snapshot extends EventEmitter {
486584
options,
487585
};
488586

587+
if (this._mutationKey) {
588+
reqOpts.mutationKey = this._mutationKey;
589+
}
590+
489591
// Only hand crafted read-write transactions will be able to set a
490592
// transaction tag for the BeginTransaction RPC. Also, this.requestOptions
491593
// is only set in the constructor of Transaction, which is the constructor
@@ -2286,6 +2388,9 @@ export class Transaction extends Dml {
22862388
} else if (!this._useInRunner) {
22872389
reqOpts.singleUseTransaction = this._options;
22882390
} else {
2391+
if ((this.session.parent as Database).isMuxEnabledForRW_) {
2392+
this._setMutationKey(mutations);
2393+
}
22892394
this.begin().then(
22902395
() => {
22912396
this.commit(options, (err, resp) => {

test/mockserver/mockspanner.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import Any = google.protobuf.Any;
3131
import QueryMode = google.spanner.v1.ExecuteSqlRequest.QueryMode;
3232
import NullValue = google.protobuf.NullValue;
3333
import {ExecuteSqlRequest, ReadRequest} from '../../src/transaction';
34+
import {randomInt} from 'crypto';
3435

3536
const PROTO_PATH = 'spanner.proto';
3637
const IMPORT_PATH = __dirname + '/../../../protos';
@@ -1232,9 +1233,17 @@ export class MockSpanner {
12321233
const transactionId = id.toString().padStart(12, '0');
12331234
const fullTransactionId = session.name + '/transactions/' + transactionId;
12341235
const readTimestamp = options && options.readOnly ? now() : undefined;
1236+
const precommitToken =
1237+
session.multiplexed && options?.readWrite
1238+
? {
1239+
precommitToken: Buffer.from('mock-precommit-token'),
1240+
seqNum: randomInt(1, 1000),
1241+
}
1242+
: null;
12351243
const transaction = protobuf.Transaction.create({
12361244
id: Buffer.from(transactionId),
12371245
readTimestamp,
1246+
precommitToken,
12381247
});
12391248
this.transactions.set(fullTransactionId, transaction);
12401249
this.transactionOptions.set(fullTransactionId, options);

test/spanner.ts

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ import {SessionFactory} from '../src/session-factory';
8181
import {MultiplexedSession} from '../src/multiplexed-session';
8282
import {WriteAtLeastOnceOptions} from '../src/database';
8383
import {MetricsTracerFactory} from '../src/metrics/metrics-tracer-factory';
84+
import {randomUUID} from 'crypto';
8485

8586
const {
8687
AlwaysOnSampler,
@@ -3917,6 +3918,124 @@ describe('Spanner with mock server', () => {
39173918
await database.close();
39183919
});
39193920
});
3921+
3922+
// TODO: enable when mux session support is available in public methods
3923+
describe.skip('when multiplexed session is enabled for R/W', () => {
3924+
before(() => {
3925+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true';
3926+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW = 'true';
3927+
});
3928+
3929+
after(() => {
3930+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false';
3931+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW = 'false';
3932+
});
3933+
3934+
it('should select the insertOrUpdate(upsert)/delete(deleteRows) mutation key over insert', async () => {
3935+
const database = newTestDatabase();
3936+
await database.runTransactionAsync(async tx => {
3937+
tx.upsert('foo', [
3938+
{id: 1, name: 'One'},
3939+
{id: 2, name: 'Two'},
3940+
]);
3941+
tx.insert('foo', [{id: 3, name: 'Three'}]);
3942+
tx.insert('foo', [{id: 4, name: 'Four'}]);
3943+
tx.deleteRows('foo', ['3', '4']);
3944+
await tx.commit();
3945+
});
3946+
3947+
const beginTransactionRequest = spannerMock
3948+
.getRequests()
3949+
.filter(val => {
3950+
return (val as v1.BeginTransactionRequest).mutationKey;
3951+
}) as v1.BeginTransactionRequest[];
3952+
3953+
// assert on begin transaction request
3954+
assert.strictEqual(beginTransactionRequest.length, 1);
3955+
3956+
// selected mutation key
3957+
const selectedMutationKey = beginTransactionRequest[0]!.mutationKey;
3958+
3959+
// assert that mutation key have been selected
3960+
assert.ok(
3961+
selectedMutationKey,
3962+
'A mutation key should have been selected',
3963+
);
3964+
3965+
// get the type of mutation key
3966+
const mutationType = Object.keys(selectedMutationKey!)[0];
3967+
3968+
// assert that mutation key is not insert
3969+
assert.notStrictEqual(
3970+
mutationType,
3971+
'insert',
3972+
'The selected mutation key should not be "insert"',
3973+
);
3974+
3975+
// assert that mutation key is either insertOrUpdate or delete
3976+
assert.ok(
3977+
['insertOrUpdate', 'delete'].includes(mutationType),
3978+
"Expected either 'insertOrUpdate' or 'delete' key.",
3979+
);
3980+
3981+
const commitRequest = spannerMock.getRequests().filter(val => {
3982+
return (val as v1.CommitRequest).precommitToken;
3983+
}) as v1.CommitRequest[];
3984+
3985+
// assert on commit request
3986+
assert.strictEqual(commitRequest.length, 1);
3987+
await database.close();
3988+
});
3989+
3990+
it('should select the mutation key with highest number of values when insert key(s) are present', async () => {
3991+
const database = newTestDatabase();
3992+
await database.runTransactionAsync(async tx => {
3993+
tx.insert('foo', [
3994+
{id: randomUUID(), name: 'One'},
3995+
{id: randomUUID(), name: 'Two'},
3996+
{id: randomUUID(), name: 'Three'},
3997+
]);
3998+
tx.insert('foo', {id: randomUUID(), name: 'Four'});
3999+
await tx.commit();
4000+
});
4001+
4002+
const beginTransactionRequest = spannerMock
4003+
.getRequests()
4004+
.filter(val => {
4005+
return (val as v1.BeginTransactionRequest).mutationKey;
4006+
}) as v1.BeginTransactionRequest[];
4007+
4008+
// assert on begin transaction request
4009+
assert.strictEqual(beginTransactionRequest.length, 1);
4010+
4011+
// selected mutation key
4012+
const selectedMutationKey = beginTransactionRequest[0]!.mutationKey;
4013+
4014+
// assert that mutation key have been selected
4015+
assert.ok(
4016+
selectedMutationKey,
4017+
'A mutation key should have been selected',
4018+
);
4019+
4020+
// assert that mutation key is insert
4021+
const mutationType = Object.keys(selectedMutationKey!)[0];
4022+
assert.ok(
4023+
['insert'].includes(mutationType),
4024+
'insert key must have been selected',
4025+
);
4026+
4027+
// assert that insert mutation key with highest number of rows has been selected
4028+
assert.strictEqual(selectedMutationKey.insert?.values?.length, 3);
4029+
4030+
const commitRequest = spannerMock.getRequests().filter(val => {
4031+
return (val as v1.CommitRequest).precommitToken;
4032+
}) as v1.CommitRequest[];
4033+
4034+
// assert on commit request
4035+
assert.strictEqual(commitRequest.length, 1);
4036+
await database.close();
4037+
});
4038+
});
39204039
});
39214040

39224041
describe('hand-crafted transaction', () => {
@@ -5013,6 +5132,68 @@ describe('Spanner with mock server', () => {
50135132

50145133
await database.close();
50155134
});
5135+
5136+
// TODO: enable when mux session support is available in public methods
5137+
describe.skip('when multiplexed session is enabled for R/W', () => {
5138+
before(() => {
5139+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'true';
5140+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW = 'true';
5141+
});
5142+
5143+
after(() => {
5144+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = 'false';
5145+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW = 'false';
5146+
});
5147+
5148+
it('should pass the mutation key in begin transaction request in case of mutations only transactions', async () => {
5149+
const database = newTestDatabase();
5150+
await database.table('foo').upsert({id: 1, name: randomUUID()});
5151+
await database.table('foo').insert({id: 2, name: randomUUID()});
5152+
await database.table('foo').deleteRows(['2']);
5153+
5154+
const beginTransactionRequest = spannerMock
5155+
.getRequests()
5156+
.filter(val => {
5157+
return (val as v1.BeginTransactionRequest).mutationKey;
5158+
}) as v1.BeginTransactionRequest[];
5159+
5160+
// assert on begin transaction request
5161+
assert.strictEqual(beginTransactionRequest.length, 3);
5162+
5163+
// assert that on first begin transaction request insertOrUpdate is being selected as mutation key
5164+
assert.ok(
5165+
['insertOrUpdate'].includes(
5166+
Object.keys(beginTransactionRequest[0]!.mutationKey!)[0],
5167+
),
5168+
'insertOrUpdate key must have been selected',
5169+
);
5170+
5171+
// assert that on second begin transaction request insert is being selected as mutation key
5172+
assert.ok(
5173+
['insert'].includes(
5174+
Object.keys(beginTransactionRequest[1]!.mutationKey!)[0],
5175+
),
5176+
'insert key must have been selected',
5177+
);
5178+
5179+
// assert that on third begin transaction request delete is being selected as mutation key
5180+
assert.ok(
5181+
['delete'].includes(
5182+
Object.keys(beginTransactionRequest[2]!.mutationKey!)[0],
5183+
),
5184+
'delete key must have been selected',
5185+
);
5186+
5187+
const commitRequest = spannerMock.getRequests().filter(val => {
5188+
return (val as v1.CommitRequest).precommitToken;
5189+
}) as v1.CommitRequest[];
5190+
5191+
// assert on commit request
5192+
assert.strictEqual(commitRequest.length, 3);
5193+
5194+
await database.close();
5195+
});
5196+
});
50165197
});
50175198

50185199
describe('chunking', () => {

0 commit comments

Comments
 (0)