Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Runtime/workflow-engine-app/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The Altinn-specific command that calls back into Altinn apps via HTTP POST.
- **Type string**: `"app"`
- **Data**: `AppCommandData` — `{ commandKey, payload? }`
- **Context**: `AppWorkflowContext` — `{ actor, lockToken, org, app, instanceOwnerPartyId, instanceGuid }`
- **Endpoint**: Templated URL expanded from context, e.g. `http://host/{Org}/{App}/instances/{InstanceOwnerPartyId}/{InstanceGuid}/process-engine-callbacks`
- **Endpoint**: Templated URL expanded from context, e.g. `http://host/{Org}/{App}/instances/{InstanceOwnerPartyId}/{InstanceGuid}/workflow-engine-callbacks`
- **State passing**: Reads `{ "state": "..." }` from response body, passes forward to next step
- **Validation**: All context fields validated at enqueue time — invalid requests never enter the queue
- **Error classification**: 4xx (except 408/418/429) → critical, 5xx/408/418/429 → retryable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected override CommandValidationResult Validate(
}

/// <inheritdoc/>
protected override async Task<ExecutionResult> ExecuteAsync(
protected override async Task<ExecutionResult> Execute(
CommandExecutionContext context,
CancellationToken cancellationToken
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ internal static class Defaults
{
ApiKey = "injected-at-runtime",
CommandEndpoint =
"http://local.altinn.cloud/{Org}/{App}/instances/{InstanceOwnerPartyId}/{InstanceGuid}/process-engine-callbacks",
"http://local.altinn.cloud/{Org}/{App}/instances/{InstanceOwnerPartyId}/{InstanceGuid}/workflow-engine-callbacks",
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace WorkflowEngine.App.Tests.Commands.AppCommand;

/// <summary>
/// Unit tests for <see cref="App.Commands.AppCommand.AppCommand.ExecuteAsync"/>
/// Unit tests for <see cref="App.Commands.AppCommand.AppCommand.Execute"/>
/// called directly via <see cref="ICommand"/>, backed by a mocked HTTP handler.
/// </summary>
public class AppCommandExecutionTests
Expand All @@ -29,7 +29,7 @@ public async Task Execute_SuccessResponse_ReturnsSuccess()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

var result = await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
var result = await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Equal(ExecutionStatus.Success, result.Status);
Assert.Single(fixture.HttpHandler.Requests);
Expand All @@ -46,7 +46,7 @@ public async Task Execute_SendsCorrectPayload()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

var result = await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
var result = await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Equal(ExecutionStatus.Success, result.Status);
Assert.Single(fixture.HttpHandler.Requests);
Expand All @@ -72,7 +72,7 @@ public async Task Execute_SetsApiKeyHeader()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
await command.Execute(context, TestContext.Current.CancellationToken);

var captured = fixture.HttpHandler.Requests[0];
Assert.True(captured.Headers.ContainsKey("X-Api-Key"));
Expand All @@ -89,7 +89,7 @@ public async Task Execute_UsesCommandKeyAsRelativeUri()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
await command.Execute(context, TestContext.Current.CancellationToken);

var captured = fixture.HttpHandler.Requests[0];
Assert.Contains("my-callback-path", captured.RequestUri.ToString(), StringComparison.Ordinal);
Expand All @@ -105,7 +105,7 @@ public async Task Execute_ExpandsEndpointUrlTemplate()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
await command.Execute(context, TestContext.Current.CancellationToken);

var captured = fixture.HttpHandler.Requests[0];
var url = captured.RequestUri.ToString();
Expand All @@ -130,7 +130,7 @@ public async Task Execute_500_ReturnsRetryableError()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

var result = await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
var result = await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Equal(ExecutionStatus.RetryableError, result.Status);
Assert.Contains("InternalServerError", result.Message, StringComparison.Ordinal);
Expand All @@ -153,7 +153,7 @@ public async Task Execute_RetryableStatusCodes_ReturnsRetryableError(HttpStatusC
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

var result = await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
var result = await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Equal(ExecutionStatus.RetryableError, result.Status);
}
Expand All @@ -177,7 +177,7 @@ public async Task Execute_NonRetryable4xx_ReturnsCriticalError(HttpStatusCode st
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

var result = await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
var result = await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Equal(ExecutionStatus.CriticalError, result.Status);
Assert.Contains("client error", result.Message, StringComparison.OrdinalIgnoreCase);
Expand All @@ -196,7 +196,7 @@ public async Task Execute_SuccessWithStateInResponse_SetsStateOut()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Equal("next-step-state", step.StateOut);
}
Expand All @@ -212,7 +212,7 @@ public async Task Execute_SuccessWithEmptyBody_DoesNotSetStateOut()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Null(step.StateOut);
}
Expand All @@ -228,7 +228,7 @@ public async Task Execute_SuccessWithNullStateInResponse_DoesNotSetStateOut()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Null(step.StateOut);
}
Expand All @@ -244,7 +244,7 @@ public async Task Execute_SuccessWithInvalidJson_ReturnsCriticalError()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

var result = await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
var result = await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Equal(ExecutionStatus.CriticalError, result.Status);
Assert.Contains("invalid response body", result.Message, StringComparison.OrdinalIgnoreCase);
Expand All @@ -262,7 +262,7 @@ public async Task Execute_IncludesStateInPayload()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data, stateIn: "previous-state");

var result = await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
var result = await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Equal(ExecutionStatus.Success, result.Status);
Assert.Single(fixture.HttpHandler.Requests);
Expand All @@ -285,7 +285,7 @@ public async Task Execute_SendsCorrectPayload_IncludesWorkflowIdAndState()
var workflow = AppCommandTestFixture.CreateWorkflow(step);
var context = AppCommandTestFixture.CreateExecutionContext(workflow, step, data);

var result = await command.ExecuteAsync(context, TestContext.Current.CancellationToken);
var result = await command.Execute(context, TestContext.Current.CancellationToken);

Assert.Equal(ExecutionStatus.Success, result.Status);
var captured = fixture.HttpHandler.Requests[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void ConfigureAppCommand_NullCommandEndpoint_AppliesDefault()
var options = sp.GetRequiredService<IOptions<AppCommandSettings>>();

Assert.NotNull(options.Value.CommandEndpoint);
Assert.Contains("process-engine-callbacks", options.Value.CommandEndpoint, StringComparison.Ordinal);
Assert.Contains("workflow-engine-callbacks", options.Value.CommandEndpoint, StringComparison.Ordinal);
}

[Fact]
Expand Down
9 changes: 5 additions & 4 deletions src/Runtime/workflow-engine/.k6/lib/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,25 @@ function parseEngineHealth(res) {

/**
* Polls the workflows list endpoint until it returns 204 No Content (no active workflows).
* Uses pageSize=1 to minimise data transfer — only the totalCount matters.
* @param {number} pollIntervalMs - milliseconds between polls
*/
export function waitForQueueDrain(pollIntervalMs = 500) {
console.log('\nWaiting for queue to drain...');

let drained = false;
const start = Date.now();
const pollUrl = `${BASE_URL}?pageSize=1`;

while (!drained) {
try {
const res = http.get(BASE_URL, { tags: { name: 'queue_drain' } });
const res = http.get(pollUrl, { tags: { name: 'queue_drain' } });

if (res.status === 204) {
drained = true;
} else if (res.status === 200) {
const workflows = JSON.parse(res.body);
const count = Array.isArray(workflows) ? workflows.length : '?';
console.log(` Active workflows: ${count}`);
const body = JSON.parse(res.body);
console.log(` Active workflows: ${body.totalCount ?? '?'}`);
} else {
console.warn(` Unexpected status: ${res.status}`);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Runtime/workflow-engine/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Reusable class library for async workflow processing. Provides the core engine,

- `GET /api/v1/namespaces` — list distinct namespaces
- `POST /api/v1/{namespace}/workflows` — enqueue workflows, supports batch with dependency graphs
- `GET /api/v1/{namespace}/workflows` — list active workflows (optional correlationId, label filters)
- `GET /api/v1/{namespace}/workflows` — paginated list of active workflows (optional page, pageSize, correlationId, label filters)
- `GET /api/v1/{namespace}/workflows/{workflowId:guid}` — get single workflow with all steps
- `POST /api/v1/{namespace}/workflows/{workflowId:guid}/cancel` — request cancellation (idempotent)
- `POST /api/v1/{namespace}/workflows/{workflowId:guid}/resume` — resume a terminal workflow for re-processing
Expand Down
2 changes: 1 addition & 1 deletion src/Runtime/workflow-engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ See swagger for a [full list](http://localhost:8080/swagger) of endpoints. The m
```
GET /api/v1/namespaces (list distinct namespaces)
POST /api/v1/{namespace}/workflows (enqueue workflows)
GET /api/v1/{namespace}/workflows (list active workflows)
GET /api/v1/{namespace}/workflows (list active workflows, paginated via ?page & ?pageSize)
GET /api/v1/{namespace}/workflows/{id} (get single workflow with steps)
POST /api/v1/{namespace}/workflows/{id}/cancel (request cancellation)
```
Expand Down
2 changes: 1 addition & 1 deletion src/Runtime/workflow-engine/docs/presentation-technical.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ When active workflows exceed the backpressure threshold, the engine returns **HT
### API

- `GET /api/v1/{namespace}/workflows/{id}` &mdash; status, steps, errors, retry counts
- `GET /api/v1/{namespace}/workflows` &mdash; list with filtering
- `GET /api/v1/{namespace}/workflows?page=1&pageSize=25` &mdash; paginated list with filtering
- Health endpoints: `/health`, `/health/ready`, `/health/live`

---
Expand Down
24 changes: 12 additions & 12 deletions src/Runtime/workflow-engine/docs/technical-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public interface ICommand
Type? WorkflowContextType { get; } // typed workflow context

CommandValidationResult Validate(object? data, object? context);
Task<ExecutionResult> ExecuteAsync(CommandExecutionContext context, CancellationToken ct);
Task<ExecutionResult> Execute(CommandExecutionContext context, CancellationToken ct);
}
```

Expand Down Expand Up @@ -249,10 +249,10 @@ When `ActiveWorkflowCount` ≥ `BackpressureThreshold` (default: 500,000), the e

If a worker crashes mid-processing, the `HeartbeatService` enables recovery:

1. Workers update `HeartbeatAt` for all in-flight workflows on a regular interval (default: 3s)
2. The processor detects stale workflows where the heartbeat has expired (default threshold: 15s)
1. Workers update `HeartbeatAt` for all in-flight workflows on a regular interval (default: 10s)
2. The processor detects stale workflows where the heartbeat has expired (default threshold: 30s)
3. Stale workflows are reclaimed — reset to `Enqueued` and retried
4. After `MaxReclaimCount` (default: 3) reclaim attempts, the workflow is marked `Failed`
4. After `MaxReclaimCount` (default: 5) reclaim attempts, the workflow is marked `Failed`

This enables safe horizontal scaling: if Instance A crashes, Instance B reclaims its work.

Expand Down Expand Up @@ -587,16 +587,16 @@ All via `EngineSettings` (bound from `appsettings.json`):
| `MaxWorkflowsPerRequest` | — | Max workflows in a single enqueue call |
| `MaxStepsPerWorkflow` | — | Max steps per workflow |
| `MaxLabels` | — | Max label key-value pairs |
| `DefaultStepCommandTimeout` | 30s | Per-step execution timeout |
| `DefaultStepCommandTimeout` | 100s | Per-step execution timeout |
| `DefaultStepRetryStrategy` | Exponential(1s, 5m, 24h) | Default retry strategy |

### Heartbeat & Recovery

| Setting | Default | Description |
| ----------------------------- | ------- | ------------------------------------------ |
| `HeartbeatInterval` | 3s | Worker liveness proof interval |
| `StaleWorkflowThreshold` | 15s | Time before a workflow is considered stale |
| `MaxReclaimCount` | 3 | Reclaim attempts before marking as failed |
| `HeartbeatInterval` | 10s | Worker liveness proof interval |
| `StaleWorkflowThreshold` | 30s | Time before a workflow is considered stale |
| `MaxReclaimCount` | 5 | Reclaim attempts before marking as failed |
| `CancellationWatcherInterval` | 2s | Cross-pod cancellation poll interval |

### Concurrency
Expand All @@ -613,8 +613,8 @@ All via `EngineSettings` (bound from `appsettings.json`):
| Setting | Default | Description |
| ------------------------------ | ------- | -------------------------- |
| `WriteBuffer.MaxBatchSize` | 100 | Workflows per batch insert |
| `WriteBuffer.MaxQueueSize` | 1,000 | Channel buffer size |
| `WriteBuffer.FlushConcurrency` | 4 | Concurrent batch flushers |
| `WriteBuffer.MaxQueueSize` | 10,000 | Channel buffer size |
| `WriteBuffer.FlushConcurrency` | 10 | Concurrent batch flushers |

## Testing

Expand Down Expand Up @@ -684,7 +684,7 @@ public sealed class MyCommand : Command<MyCommandData>
return CommandValidationResult.Valid();
}

public override async Task<ExecutionResult> ExecuteAsync(
public override async Task<ExecutionResult> Execute(
CommandExecutionContext context, CancellationToken ct)
{
var response = await httpClient.PostAsync(data.Target, content, ct);
Expand Down Expand Up @@ -755,7 +755,7 @@ AppCommand reads `{ "state": "..." }` from the response body and stores it as `s
{
"AppCommandSettings": {
"ApiKey": "your-api-key",
"CommandEndpoint": "http://host/{Org}/{App}/instances/{InstanceOwnerPartyId}/{InstanceGuid}/process-engine-callbacks"
"CommandEndpoint": "http://host/{Org}/{App}/instances/{InstanceOwnerPartyId}/{InstanceGuid}/workflow-engine-callbacks"
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected override CommandValidationResult Validate(WebhookCommandData? commandD
}

/// <inheritdoc/>
protected override async Task<ExecutionResult> ExecuteAsync(
protected override async Task<ExecutionResult> Execute(
CommandExecutionContext context,
CancellationToken cancellationToken
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,13 @@ internal static class Defaults
MaxDbOperations = 90,
BackpressureThreshold = 500_000,
},
WriteBuffer = new BufferSettings
WriteBuffer = new WriteBufferSettings
{
FlushConcurrency = 8,
FlushConcurrency = 10,
MaxBatchSize = 100,
MaxQueueSize = 10_000,
},
UpdateBuffer = new BufferSettings
{
FlushConcurrency = 8,
MaxBatchSize = 50,
MaxQueueSize = 5_000,
},
UpdateBuffer = new UpdateBufferSettings { MaxBatchSize = 1000, MaxQueueSize = 5_000 },
Retention = new RetentionSettings
{
RetentionPeriod = TimeSpan.FromDays(60),
Expand Down
Loading
Loading