Skip to content

Commit aea93e5

Browse files
committed
feat(EAV-929): add sequential-persistent execution type
1 parent a476996 commit aea93e5

10 files changed

Lines changed: 231 additions & 129 deletions

File tree

packages/timeline-state-resolver/src/service/__tests__/commandExecutor.spec.ts

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { waitTime } from '../../__tests__/lib'
22
import { CommandExecutor } from '../commandExecutor'
3+
import { SalvoStrategy } from '../executor/SalvoStrategy'
4+
import { SequentialStrategy } from '../executor/SequentialStrategy'
35

46
describe('CommandExecutor', () => {
57
const FUDGE_TIME = 50 // ms
@@ -24,7 +26,7 @@ describe('CommandExecutor', () => {
2426
})
2527
test('salvo commands', async () => {
2628
timeToExecuteCommand = 100 // ms
27-
const commandExecutor = new CommandExecutor(logger, 'salvo', sendCommand)
29+
const commandExecutor = new CommandExecutor(new SalvoStrategy(logger, sendCommand))
2830

2931
startTime = Date.now()
3032
await commandExecutor.executeCommands([
@@ -44,7 +46,7 @@ describe('CommandExecutor', () => {
4446
})
4547
test('salvo commands, with preliminary', async () => {
4648
timeToExecuteCommand = 100 // ms
47-
const commandExecutor = new CommandExecutor(logger, 'salvo', sendCommand)
49+
const commandExecutor = new CommandExecutor(new SalvoStrategy(logger, sendCommand))
4850

4951
startTime = Date.now()
5052
await commandExecutor.executeCommands([
@@ -62,7 +64,7 @@ describe('CommandExecutor', () => {
6264
})
6365
test('sequential commands', async () => {
6466
timeToExecuteCommand = 100 // ms
65-
const commandExecutor = new CommandExecutor(logger, 'sequential', sendCommand)
67+
const commandExecutor = new CommandExecutor(new SequentialStrategy(logger, sendCommand))
6668

6769
startTime = Date.now()
6870
await commandExecutor.executeCommands([
@@ -82,7 +84,7 @@ describe('CommandExecutor', () => {
8284
})
8385
test('sequential commands, multiple queues', async () => {
8486
timeToExecuteCommand = 100 // ms
85-
const commandExecutor = new CommandExecutor(logger, 'sequential', sendCommand)
87+
const commandExecutor = new CommandExecutor(new SequentialStrategy(logger, sendCommand))
8688

8789
startTime = Date.now()
8890
await commandExecutor.executeCommands([
@@ -112,7 +114,7 @@ describe('CommandExecutor', () => {
112114
})
113115
test('sequential commands, with preliminary', async () => {
114116
timeToExecuteCommand = 100 // ms
115-
const commandExecutor = new CommandExecutor(logger, 'sequential', sendCommand)
117+
const commandExecutor = new CommandExecutor(new SequentialStrategy(logger, sendCommand))
116118

117119
startTime = Date.now()
118120
await commandExecutor.executeCommands([
@@ -134,50 +136,20 @@ describe('CommandExecutor', () => {
134136
expect(Math.abs(receivedCommandTimes['A3'] - 600)).toBeLessThan(FUDGE_TIME)
135137
})
136138
test('sequential with preliminary, in multiple queues', async () => {
137-
const commandExecutor = new CommandExecutor(logger, 'sequential', sendCommand)
139+
const commandExecutor = new CommandExecutor(new SequentialStrategy(logger, sendCommand))
138140

139141
startTime = Date.now()
140142
await commandExecutor.executeCommands([
141143
// Commands are 2 queues, A and B
142144
// in random order, with preliminary times
143-
{
144-
command: 'A-0',
145-
queueId: 'queueA',
146-
},
147-
{
148-
command: 'A-300',
149-
queueId: 'queueA',
150-
preliminary: 300,
151-
},
152-
{
153-
command: 'B-300',
154-
queueId: 'queueB',
155-
preliminary: 300,
156-
},
157-
{
158-
command: 'A-1000',
159-
queueId: 'queueA',
160-
preliminary: 1000,
161-
},
162-
{
163-
command: 'B-1000',
164-
queueId: 'queueB',
165-
preliminary: 1000,
166-
},
167-
{
168-
command: 'B-500',
169-
queueId: 'queueB',
170-
preliminary: 500,
171-
},
172-
{
173-
command: 'A-500',
174-
queueId: 'queueA',
175-
preliminary: 500,
176-
},
177-
{
178-
command: 'B-0',
179-
queueId: 'queueB',
180-
},
145+
{ command: 'A-0', queueId: 'queueA' },
146+
{ command: 'A-300', queueId: 'queueA', preliminary: 300 },
147+
{ command: 'B-300', queueId: 'queueB', preliminary: 300 },
148+
{ command: 'A-1000', queueId: 'queueA', preliminary: 1000 },
149+
{ command: 'B-1000', queueId: 'queueB', preliminary: 1000 },
150+
{ command: 'B-500', queueId: 'queueB', preliminary: 500 },
151+
{ command: 'A-500', queueId: 'queueA', preliminary: 500 },
152+
{ command: 'B-0', queueId: 'queueB' },
181153
])
182154

183155
expect(sendCommand).toHaveBeenCalledTimes(8)
Lines changed: 5 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
1-
import * as _ from 'underscore'
2-
import type { BaseDeviceAPI, CommandWithContext } from 'timeline-state-resolver-api'
1+
import type { CommandWithContext } from 'timeline-state-resolver-api'
32
import { Measurement } from './measure'
4-
import { StateHandlerContext } from './stateHandler'
3+
import { ExecutionStrategy } from './executor/ExecutionStrategy'
54

6-
export class CommandExecutor<DeviceState, Command extends CommandWithContext<any, any>> {
7-
constructor(
8-
private logger: StateHandlerContext['logger'],
9-
private mode: 'salvo' | 'sequential',
10-
private sendCommand: BaseDeviceAPI<DeviceState, void, Command>['sendCommand']
11-
) {}
5+
export class CommandExecutor<Command extends CommandWithContext<unknown, unknown>> {
6+
constructor(private strategy: ExecutionStrategy<Command>) {}
127

138
async executeCommands(commands: Command[], measurement?: Measurement): Promise<void> {
149
if (commands.length === 0) return
@@ -18,77 +13,6 @@ export class CommandExecutor<DeviceState, Command extends CommandWithContext<any
1813

1914
const totalTime = commands[0].preliminary ?? 0
2015

21-
if (this.mode === 'salvo') {
22-
return this._executeCommandsSalvo(totalTime, commands, measurement)
23-
} else {
24-
return this._executeCommandsSequential(totalTime, commands, measurement)
25-
}
16+
return this.strategy.execute(totalTime, commands, measurement)
2617
}
27-
28-
private async _executeCommandsSalvo(
29-
totalTime: number,
30-
commands: Command[],
31-
measurement?: Measurement
32-
): Promise<void> {
33-
const start = Date.now() // note - would be better to use monotonic time here but BigInt's are annoying
34-
35-
await Promise.allSettled(
36-
commands.map(async (command) => {
37-
const targetTime = start + totalTime - (command.preliminary ?? 0)
38-
39-
const timeToWait = targetTime - Date.now()
40-
if (timeToWait > 0) {
41-
await sleep(timeToWait)
42-
}
43-
44-
measurement?.executeCommand(command)
45-
try {
46-
await this.sendCommand(command)
47-
} catch (e) {
48-
this.logger.error('Error while executing command', e as any)
49-
} finally {
50-
measurement?.finishedCommandExecution(command)
51-
}
52-
})
53-
)
54-
}
55-
56-
private async _executeCommandsSequential(
57-
totalTime: number,
58-
commands: Command[],
59-
measurement?: Measurement
60-
): Promise<void> {
61-
const start = Date.now() // note - would be better to use monotonic time here but BigInt's are annoying
62-
63-
const commandQueues = _.groupBy(commands || [], (command) => command.queueId ?? '$$default')
64-
65-
await Promise.allSettled(
66-
Object.values<Command[]>(commandQueues).map(async (commandsInQueue): Promise<void> => {
67-
try {
68-
for (const command of commandsInQueue) {
69-
const targetTime = start + totalTime - (command.preliminary ?? 0)
70-
71-
const timeToWait = targetTime - Date.now()
72-
if (timeToWait > 0) {
73-
await sleep(timeToWait)
74-
}
75-
76-
measurement?.executeCommand(command)
77-
try {
78-
await this.sendCommand(command)
79-
} catch (e) {
80-
this.logger.error('Error while executing command', e as any)
81-
} finally {
82-
measurement?.finishedCommandExecution(command)
83-
}
84-
}
85-
} catch (e) {
86-
this.logger.error('CommandExecutor', new Error('Error in _executeCommandsSequential'))
87-
}
88-
})
89-
)
90-
}
91-
}
92-
async function sleep(duration: number): Promise<void> {
93-
return new Promise((resolve) => setTimeout(resolve, duration))
9418
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { CommandWithContext } from 'timeline-state-resolver-api'
2+
import { Measurement } from '../measure'
3+
4+
/**
5+
* Defines how a batch of commands is dispatched to the device.
6+
*/
7+
export interface ExecutionStrategy<Command extends CommandWithContext<unknown, unknown>> {
8+
/**
9+
* Executes the given commands according to the strategy's ordering rules.
10+
*
11+
* @param totalTime - The maximum preliminary offset (ms) among all commands in the
12+
* batch. Commands use this to calculate how long to wait before being sent.
13+
* @param commands - The commands to execute, pre-sorted by descending preliminary value.
14+
* @param measurement - Optional measurement tracker for profiling command execution.
15+
*/
16+
execute(totalTime: number, commands: Command[], measurement?: Measurement): Promise<void>
17+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Holds per-queue promise chains that persist across state-change batches.
3+
* Commands with the same queueId will always be executed one after another,
4+
* even when they originate from different (consecutive) timeline states.
5+
*
6+
* Create one instance per device and pass it to CommandExecutor when using
7+
* the 'sequential-persistent' execution mode.
8+
*/
9+
export class PersistentCommandQueue {
10+
private queues = new Map<string, Promise<void>>()
11+
12+
/**
13+
* Enqueues a task onto the queue identified by queueId.
14+
* Returns a promise that resolves when the task has completed.
15+
*/
16+
async enqueue(queueId: string, task: () => Promise<void>): Promise<void> {
17+
const previous = this.queues.get(queueId) ?? Promise.resolve()
18+
const next = previous.then(task, task) // always advance even on failure
19+
this.queues.set(queueId, next)
20+
return next
21+
}
22+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { ExecutionStrategy } from './ExecutionStrategy'
2+
import { Measurement } from '../measure'
3+
import { sleep } from './helpers'
4+
import { BaseDeviceAPI, CommandWithContext } from 'timeline-state-resolver-api'
5+
import { StateHandlerContext } from '../stateHandler'
6+
7+
export class SalvoStrategy<DeviceState, Command extends CommandWithContext<unknown, unknown>>
8+
implements ExecutionStrategy<Command>
9+
{
10+
constructor(
11+
private logger: StateHandlerContext['logger'],
12+
private sendCommand: BaseDeviceAPI<DeviceState, void, Command>['sendCommand']
13+
) {}
14+
15+
async execute(totalTime: number, commands: Command[], measurement?: Measurement): Promise<void> {
16+
const start = Date.now()
17+
await Promise.allSettled(
18+
commands.map(async (command) => {
19+
const targetTime = start + totalTime - (command.preliminary ?? 0)
20+
21+
const timeToWait = targetTime - Date.now()
22+
if (timeToWait > 0) {
23+
await sleep(timeToWait)
24+
}
25+
26+
measurement?.executeCommand(command)
27+
try {
28+
await this.sendCommand(command)
29+
} catch (e) {
30+
this.logger.error('Error while executing command', e as any)
31+
} finally {
32+
measurement?.finishedCommandExecution(command)
33+
}
34+
})
35+
)
36+
}
37+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import _ from 'underscore'
2+
import { sleep } from './helpers'
3+
import { ExecutionStrategy } from './ExecutionStrategy'
4+
import { BaseDeviceAPI, CommandWithContext } from 'timeline-state-resolver-api'
5+
import { Measurement } from '../measure'
6+
import { StateHandlerContext } from '../stateHandler'
7+
import { PersistentCommandQueue } from './PersistentCommandQueue'
8+
9+
/**
10+
* Like SequentialStrategy, but queue chains persist across state-change batches via a
11+
* PersistentCommandQueue, so commands with the same queueId from consecutive states are
12+
* never interleaved.
13+
*/
14+
export class SequentialPersistentStrategy<DeviceState, Command extends CommandWithContext<unknown, unknown>>
15+
implements ExecutionStrategy<Command>
16+
{
17+
constructor(
18+
private logger: StateHandlerContext['logger'],
19+
private sendCommand: BaseDeviceAPI<DeviceState, void, Command>['sendCommand'],
20+
private persistentQueue: PersistentCommandQueue
21+
) {}
22+
23+
async execute(totalTime: number, commands: Command[], measurement?: Measurement): Promise<void> {
24+
const start = Date.now()
25+
const commandQueues = _.groupBy(commands || [], (command) => command.queueId ?? '$$default')
26+
27+
await Promise.allSettled(
28+
Object.entries<Command[]>(commandQueues).map(async ([queueId, commandsInQueue]) =>
29+
this.persistentQueue.enqueue(queueId, async () => {
30+
try {
31+
for (const command of commandsInQueue) {
32+
const targetTime = start + totalTime - (command.preliminary ?? 0)
33+
34+
const timeToWait = targetTime - Date.now()
35+
if (timeToWait > 0) {
36+
await sleep(timeToWait)
37+
}
38+
39+
measurement?.executeCommand(command)
40+
try {
41+
await this.sendCommand(command)
42+
} catch (e) {
43+
this.logger.error('Error while executing command', e as any)
44+
} finally {
45+
measurement?.finishedCommandExecution(command)
46+
}
47+
}
48+
} catch (e) {
49+
this.logger.error('CommandExecutor', new Error('Error in _executeCommandsSequential'))
50+
}
51+
})
52+
)
53+
)
54+
}
55+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import _ from 'underscore'
2+
import { ExecutionStrategy } from './ExecutionStrategy'
3+
import { Measurement } from '../measure'
4+
import { StateHandlerContext } from '../stateHandler'
5+
import { sleep } from './helpers'
6+
import { BaseDeviceAPI, CommandWithContext } from 'timeline-state-resolver-api'
7+
8+
export class SequentialStrategy<DeviceState, Command extends CommandWithContext<unknown, unknown>>
9+
implements ExecutionStrategy<Command>
10+
{
11+
constructor(
12+
private logger: StateHandlerContext['logger'],
13+
private sendCommand: BaseDeviceAPI<DeviceState, void, Command>['sendCommand']
14+
) {}
15+
16+
async execute(totalTime: number, commands: Command[], measurement?: Measurement): Promise<void> {
17+
const start = Date.now() // note - would be better to use monotonic time here but BigInt's are annoying
18+
19+
const commandQueues = _.groupBy(commands || [], (command) => command.queueId ?? '$$default')
20+
21+
await Promise.allSettled(
22+
Object.values<Command[]>(commandQueues).map(async (commandsInQueue): Promise<void> => {
23+
try {
24+
for (const command of commandsInQueue) {
25+
const targetTime = start + totalTime - (command.preliminary ?? 0)
26+
27+
const timeToWait = targetTime - Date.now()
28+
if (timeToWait > 0) {
29+
await sleep(timeToWait)
30+
}
31+
32+
measurement?.executeCommand(command)
33+
try {
34+
await this.sendCommand(command)
35+
} catch (e) {
36+
this.logger.error('Error while executing command', e as any)
37+
} finally {
38+
measurement?.finishedCommandExecution(command)
39+
}
40+
}
41+
} catch (e) {
42+
this.logger.error('CommandExecutor', new Error('Error in _executeCommandsSequential'))
43+
}
44+
})
45+
)
46+
}
47+
}

0 commit comments

Comments
 (0)