Skip to content

Commit 4c9c3fd

Browse files
committed
try rebroadcast on commit
1 parent 6515e0e commit 4c9c3fd

6 files changed

Lines changed: 54 additions & 9 deletions

File tree

packages/contracts/source/contracts/transaction-pool/service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ export interface Service {
55

66
addTransaction(transaction: Transaction): Promise<void>;
77
reAddTransactions(): Promise<void>;
8-
commit(sendersAddresses: string[]): Promise<void>;
8+
commit(sendersAddresses: string[], consumedGas: number): Promise<void>;
99
flush(): Promise<void>;
1010
}

packages/contracts/source/contracts/transaction-pool/worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export interface WorkerScriptHandler {
99
boot(flags: WorkerFlags): Promise<void>;
1010
getTransactions(): Promise<string[]>;
1111
removeTransaction(address: string, id: string): Promise<void>;
12-
commit(height: number, sendersAddresses: string[]): Promise<void>;
12+
commit(height: number, sendersAddresses: string[], consumedGas: number): Promise<void>;
1313
setPeer(ip: string): Promise<void>;
1414
forgetPeer(ip: string): Promise<void>;
1515
start(height: number): Promise<void>;

packages/transaction-pool-service/source/service.ts

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ export class Service implements Contracts.TransactionPool.Service {
1616
@inject(Identifiers.State.Store)
1717
private readonly stateStore!: Contracts.State.Store;
1818

19+
@inject(Identifiers.TransactionPool.Broadcaster)
20+
private readonly broadcaster!: Contracts.TransactionPool.Broadcaster;
21+
22+
@inject(Identifiers.Cryptography.Configuration)
23+
private readonly cryptoConfiguration!: Contracts.Crypto.Configuration;
24+
1925
@inject(Identifiers.TransactionPool.Storage)
2026
private readonly storage!: Contracts.TransactionPool.Storage;
2127

@@ -55,7 +61,7 @@ export class Service implements Contracts.TransactionPool.Service {
5561
return this.mempool.getSize();
5662
}
5763

58-
public async commit(sendersAddresses: string[]): Promise<void> {
64+
public async commit(sendersAddresses: string[], consumedGas: number): Promise<void> {
5965
await this.#lock.runExclusive(async () => {
6066
if (this.#disposed) {
6167
return;
@@ -70,6 +76,8 @@ export class Service implements Contracts.TransactionPool.Service {
7076
}
7177

7278
await this.#cleanUp();
79+
80+
await this.#rebroadcastStorageTransactions(consumedGas);
7381
});
7482
}
7583

@@ -242,4 +250,35 @@ export class Service implements Contracts.TransactionPool.Service {
242250

243251
await this.mempool.addTransaction(transaction);
244252
}
253+
254+
async #rebroadcastStorageTransactions(consumedGas: number): Promise<void> {
255+
const blockNumber = this.stateStore.getBlockNumber();
256+
const milestones = this.cryptoConfiguration.getMilestone(blockNumber);
257+
258+
const threshold = this.pluginConfiguration.getRequired<number>(
259+
"maxBlockGasUtilizationTransactionRebroadcastThreshold",
260+
);
261+
262+
// If block is not full rebroadcast local transactions.
263+
if (consumedGas > milestones.block.maxGasLimit * (threshold / 100)) {
264+
return;
265+
}
266+
267+
const limit = this.pluginConfiguration.getRequired<number>("maxTransactionsPerRequest");
268+
const broadcastTransactions: Contracts.Crypto.Transaction[] = [];
269+
270+
// Get old transactions up to current block number.
271+
for (const { serialized } of this.storage.getOldTransactions(blockNumber, limit)) {
272+
const transaction = await this.transactionFactory.fromBytes(serialized);
273+
broadcastTransactions.push(transaction);
274+
}
275+
276+
if (broadcastTransactions.length > 0) {
277+
this.logger.info(`Rebroadcasting ${broadcastTransactions.length} transaction(s) from storage`);
278+
279+
this.broadcaster
280+
.broadcastTransactions(broadcastTransactions)
281+
.catch((error) => this.logger.error(error.stack));
282+
}
283+
}
245284
}

packages/transaction-pool-worker/source/handlers/commit.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ export class CommitHandler {
1515
@inject(Identifiers.Services.Log.Service)
1616
protected readonly logger!: Contracts.Kernel.Logger;
1717

18-
public async handle(blockNumber: number, sendersAddresses: string[]): Promise<void> {
18+
public async handle(blockNumber: number, sendersAddresses: string[], consumedGas: number): Promise<void> {
1919
try {
2020
this.stateStore.setBlockNumber(blockNumber);
2121

2222
if (this.configuration.isNewMilestone()) {
2323
void this.transactionPoolService.reAddTransactions();
2424
} else {
25-
await this.transactionPoolService.commit(sendersAddresses);
25+
await this.transactionPoolService.commit(sendersAddresses, consumedGas);
2626
}
2727
} catch (error) {
2828
throw new Error(`Failed to commit block: ${error.message}`);

packages/transaction-pool-worker/source/worker-handler.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ export class WorkerScriptHandler implements Contracts.TransactionPool.WorkerScri
3131
await this.#app.resolve(StartHandler).handle(height);
3232
}
3333

34-
public async commit(height: number, sendersAddresses: string[]): Promise<void> {
35-
await this.#app.resolve(CommitHandler).handle(height, sendersAddresses);
34+
public async commit(height: number, sendersAddresses: string[], consumedGas: number): Promise<void> {
35+
await this.#app.resolve(CommitHandler).handle(height, sendersAddresses, consumedGas);
3636
}
3737

3838
public async getTransactions(): Promise<string[]> {

packages/transaction-pool-worker/source/worker.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,17 @@ export class Worker implements Contracts.TransactionPool.Worker {
5050
async onCommit(unit: Contracts.Processor.ProcessableUnit): Promise<void> {
5151
const sendersAddresses: Set<string> = new Set();
5252

53-
for (const transaction of unit.getBlock().transactions) {
53+
const block = unit.getBlock();
54+
for (const transaction of block.transactions) {
5455
sendersAddresses.add(transaction.data.from);
5556
}
5657

57-
await this.ipcSubprocess.sendRequest("commit", unit.blockNumber, [...sendersAddresses.keys()]);
58+
await this.ipcSubprocess.sendRequest(
59+
"commit",
60+
unit.blockNumber,
61+
[...sendersAddresses.keys()],
62+
block.header.gasUsed,
63+
);
5864
}
5965

6066
public async start(blockNumber: number): Promise<void> {

0 commit comments

Comments
 (0)