Skip to content

Commit a23ce51

Browse files
authored
Add GroupRunner to limit concurrent RPCs (#457)
* Add GroupRunner to limit concurrent RPCs * remove unnecessary async * lint * lint * comment typo * Remove type parameter in comment
1 parent 725cffc commit a23ce51

3 files changed

Lines changed: 312 additions & 1 deletion

File tree

src/util/group-runner.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
type Callback<T> = () => Promise<T>
2+
type Resolve<T> = (arg: T | Promise<T>) => void
3+
4+
// Runs tasks in groups of a fixed size.
5+
// The next group won't be started until the previous group is finished.
6+
//
7+
// Example usage:
8+
//
9+
// const fetchBalances(addresses: string[]): Promise<number[]> {
10+
// // addresses can contain thousands of addresses
11+
// const groupRunner = new GroupRunner(10)
12+
// const getBalance = groupRunner.wrapFunction(fetchBalance)
13+
// const balancePromises: Promise<number>[] = []
14+
// for (const address of addresses) {
15+
// // There will be at most 10 concurrent calls to fetchBalance.
16+
// // fetchBalance might do an RPC and we don't want to get rate limited.
17+
// balancePromises.push(getBalance(address)))
18+
// }
19+
// return Promise.all(balancePromises)
20+
// }
21+
//
22+
//
23+
// Implementation note:
24+
// Once the size has been reached, we wait for all previous tasks to finish
25+
// before running the new task.
26+
// Alternatively, we could run more tasks as soon as some (rather than all)
27+
// tasks have finished, to make progress sooner, but the former is what's
28+
// currently used in multiple places in the external-adapters-js repo so we
29+
// chose that behavior.
30+
export class GroupRunner {
31+
private currentGroup: Promise<unknown>[] = []
32+
private previousStartRunning: Promise<void> = Promise.resolve()
33+
34+
constructor(private groupSize: number) {}
35+
36+
// Calls the given callback eventually but makes sure any previous group of
37+
// groupSize size has settled before calling and subsequent callbacks.
38+
run<T>(callback: Callback<T>): Promise<T> {
39+
return new Promise((resolve) => {
40+
// This creates an implicit queue which guarantees that there are no
41+
// concurrent calls into startRunning. This is necessary to avoid having
42+
// currentGroup being cleared concurrently.
43+
this.previousStartRunning = this.previousStartRunning.then(() => {
44+
return this.startRunning(callback, resolve)
45+
})
46+
})
47+
}
48+
49+
// Waits for a previous group to finish, if necessary, and then runs the
50+
// given callback. When this method resolves, the callback has been called
51+
// but not necessarily resolved.
52+
async startRunning<T>(callback: Callback<T>, resolve: Resolve<T>) {
53+
if (this.currentGroup.length >= this.groupSize) {
54+
await Promise.allSettled(this.currentGroup)
55+
this.currentGroup = []
56+
}
57+
const promise = callback()
58+
this.currentGroup.push(promise)
59+
resolve(promise)
60+
}
61+
62+
wrapFunction<Args extends unknown[], Return>(
63+
func: (...args: Args) => Promise<Return>,
64+
): (...args: Args) => Promise<Return> {
65+
return (...args: Args) => {
66+
return this.run(() => func(...args))
67+
}
68+
}
69+
}

src/util/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ export const timeoutPromise = <T>(
115115
]).finally(() => clearTimeout(timer))
116116
}
117117

118-
type DeferredResolve<T> = (value: T) => void
118+
type DeferredResolve<T> = (value: T | PromiseLike<T>) => void
119119
type DeferredReject = (reason?: unknown) => void
120120

121121
/**

test/util/group-runner.test.ts

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
import { deferredPromise, sleep } from '../../src/util'
2+
import { GroupRunner } from '../../src/util/group-runner'
3+
import test from 'ava'
4+
5+
type Task<T> = {
6+
promise: Promise<T>
7+
resolve: (arg: T) => void
8+
reject: (arg: unknown) => void
9+
getCallCount: () => number
10+
callback: () => Promise<T>
11+
}
12+
13+
const createTask = <T>(): Task<T> => {
14+
const [promise, resolve, reject] = deferredPromise<T>()
15+
let callCount = 0
16+
return {
17+
promise,
18+
resolve,
19+
reject,
20+
getCallCount: () => callCount,
21+
callback: () => {
22+
callCount++
23+
return promise
24+
},
25+
}
26+
}
27+
28+
test('should run tasks', async (t) => {
29+
const runner = new GroupRunner(2)
30+
const result = 'result'
31+
const callback = () => Promise.resolve(result)
32+
t.is(await runner.run(callback), result)
33+
})
34+
35+
test('should wait after group size is reached', async (t) => {
36+
const runner = new GroupRunner(2)
37+
38+
const tasks: Task<void>[] = []
39+
for (let i = 0; i < 3; i++) {
40+
tasks.push(createTask())
41+
}
42+
43+
runner.run(tasks[0].callback)
44+
runner.run(tasks[1].callback)
45+
runner.run(tasks[2].callback)
46+
await sleep(0)
47+
t.is(tasks[0].getCallCount(), 1)
48+
t.is(tasks[1].getCallCount(), 1)
49+
t.is(tasks[2].getCallCount(), 0)
50+
})
51+
52+
test('should continue after group has finished', async (t) => {
53+
const runner = new GroupRunner(2)
54+
55+
const tasks: Task<void>[] = []
56+
for (let i = 0; i < 3; i++) {
57+
tasks.push(createTask())
58+
}
59+
60+
runner.run(tasks[0].callback)
61+
runner.run(tasks[1].callback)
62+
runner.run(tasks[2].callback)
63+
64+
await sleep(0)
65+
t.is(tasks[2].getCallCount(), 0)
66+
67+
tasks[0].resolve(undefined)
68+
69+
await sleep(0)
70+
t.is(tasks[2].getCallCount(), 0)
71+
72+
tasks[1].resolve(undefined)
73+
74+
await sleep(0)
75+
t.is(tasks[2].getCallCount(), 1)
76+
})
77+
78+
test('should not clear current group concurrently', async (t) => {
79+
const runner = new GroupRunner(2)
80+
81+
const tasks: Task<void>[] = []
82+
for (let i = 0; i < 4; i++) {
83+
tasks.push(createTask())
84+
runner.run(tasks[i].callback)
85+
}
86+
// Tasks 3 and 4 will both be waiting for tasks 1 and 2 to finish.
87+
// When they do, they should not both clear the current group resulting in a
88+
// group with only task 4.
89+
tasks[0].resolve(undefined)
90+
tasks[1].resolve(undefined)
91+
92+
// If we did end up with a group of 1 after concurrent clearing, then task 5
93+
// will run immediately, which it shouldn't because task 3 and 4 are not
94+
// finished yet.
95+
const task5: Task<void> = createTask()
96+
runner.run(task5.callback)
97+
98+
await sleep(0)
99+
t.is(task5.getCallCount(), 0)
100+
})
101+
102+
test('multiple groups', async (t) => {
103+
const runner = new GroupRunner(3)
104+
105+
const tasks: Task<void>[] = []
106+
for (let i = 0; i < 5; i++) {
107+
tasks.push(createTask())
108+
runner.run(tasks[i].callback)
109+
}
110+
111+
await sleep(0)
112+
t.is(tasks[0].getCallCount(), 1)
113+
t.is(tasks[1].getCallCount(), 1)
114+
t.is(tasks[2].getCallCount(), 1)
115+
t.is(tasks[3].getCallCount(), 0)
116+
t.is(tasks[4].getCallCount(), 0)
117+
118+
tasks[0].resolve(undefined)
119+
tasks[1].resolve(undefined)
120+
tasks[2].resolve(undefined)
121+
122+
await sleep(0)
123+
t.is(tasks[3].getCallCount(), 1)
124+
t.is(tasks[4].getCallCount(), 1)
125+
126+
// 5 more tasks for a total of 10:
127+
for (let i = 5; i < 10; i++) {
128+
tasks.push(createTask())
129+
runner.run(tasks[i].callback)
130+
}
131+
132+
await sleep(0)
133+
t.is(tasks[5].getCallCount(), 1)
134+
t.is(tasks[6].getCallCount(), 0)
135+
t.is(tasks[7].getCallCount(), 0)
136+
t.is(tasks[8].getCallCount(), 0)
137+
t.is(tasks[9].getCallCount(), 0)
138+
139+
tasks[3].resolve(undefined)
140+
tasks[4].resolve(undefined)
141+
tasks[5].resolve(undefined)
142+
143+
await sleep(0)
144+
t.is(tasks[6].getCallCount(), 1)
145+
t.is(tasks[7].getCallCount(), 1)
146+
t.is(tasks[8].getCallCount(), 1)
147+
t.is(tasks[9].getCallCount(), 0)
148+
149+
tasks[6].resolve(undefined)
150+
tasks[7].resolve(undefined)
151+
tasks[8].resolve(undefined)
152+
153+
await sleep(0)
154+
t.is(tasks[9].getCallCount(), 1)
155+
})
156+
157+
test('multiple return values', async (t) => {
158+
const runner = new GroupRunner(3)
159+
160+
const tasks: Task<number>[] = []
161+
const promises: Promise<number>[] = []
162+
for (let i = 0; i < 10; i++) {
163+
tasks.push(createTask())
164+
promises.push(runner.run(tasks[i].callback))
165+
}
166+
167+
await sleep(0)
168+
for (let i = 0; i < 10; i++) {
169+
tasks[i].resolve(i)
170+
}
171+
172+
await sleep(0)
173+
for (let i = 0; i < 10; i++) {
174+
t.is(await promises[i], i)
175+
}
176+
})
177+
178+
test('rejecting promises', async (t) => {
179+
const runner = new GroupRunner(2)
180+
181+
const tasks: Task<void>[] = []
182+
for (let i = 0; i < 3; i++) {
183+
tasks.push(createTask())
184+
}
185+
186+
runner.run(tasks[0].callback).catch(() => {
187+
/* Ignore */
188+
})
189+
runner.run(tasks[1].callback).catch(() => {
190+
/* Ignore */
191+
})
192+
runner.run(tasks[2].callback)
193+
194+
await sleep(0)
195+
t.is(tasks[2].getCallCount(), 0)
196+
197+
tasks[0].reject(undefined)
198+
199+
await sleep(0)
200+
t.is(tasks[2].getCallCount(), 0)
201+
202+
await sleep(0)
203+
tasks[1].reject(undefined)
204+
205+
await sleep(0)
206+
t.is(tasks[2].getCallCount(), 1)
207+
})
208+
209+
test('wrap function', async (t) => {
210+
const runner = new GroupRunner(2)
211+
212+
const tasks: Task<number>[] = []
213+
for (let i = 0; i < 3; i++) {
214+
tasks.push(createTask())
215+
}
216+
217+
const f = runner.wrapFunction((i: number) => tasks[i].callback())
218+
219+
const promise0 = f(0)
220+
const promise1 = f(1)
221+
const promise2 = f(2)
222+
223+
await sleep(0)
224+
t.is(tasks[2].getCallCount(), 0)
225+
226+
tasks[0].resolve(0)
227+
228+
await sleep(0)
229+
t.is(tasks[2].getCallCount(), 0)
230+
231+
await sleep(0)
232+
tasks[1].resolve(1)
233+
234+
await sleep(0)
235+
t.is(tasks[2].getCallCount(), 1)
236+
237+
tasks[2].resolve(2)
238+
239+
t.is(await promise0, 0)
240+
t.is(await promise1, 1)
241+
t.is(await promise2, 2)
242+
})

0 commit comments

Comments
 (0)