Skip to content

Commit 839d6a5

Browse files
committed
feat(delivery): apply JMESPath payload transformation before delivery (ADR-003 Phase 2)
DeliveryWorker now consults the new IPayloadTransformer abstraction (backed by JmesPath.Net 1.1.0) before signing and POSTing the payload. The transformed payload is what gets signed, so the receiver verifies the signature against the body they actually receive. The delivery pipeline is fail-safe by construction. Transformation only runs when (a) the global TransformationOptions.Enabled flag is true, (b) the endpoint has TransformEnabled set, and (c) TransformExpression is non-empty. Any failure — invalid expression, 100 ms timeout, output exceeding 256 KB UTF-8, or unparseable JSON — returns PayloadTransformResult.FailOpen, the worker logs a warning, and delivery proceeds with the original payload. Two new metrics expose the split: webhookengine.transformations.applied counts successful applications, and webhookengine.transformations.failed_open counts fallbacks. JmesPathPayloadTransformer wraps the library call in Task.Run + Task.Wait(timeout) so a runaway expression cannot stall the delivery thread; output size is checked via Encoding.UTF8.GetByteCount before returning success. The class is stateless and registered as a singleton. The configuration surface lives under WebhookEngine:Transformation in appsettings (Enabled=true, TimeoutMs=100, MaxOutputBytes=262144). Six unit tests under WebhookEngine.Infrastructure.Tests.Services cover the reshape happy path, identity selector, invalid syntax, empty expression, oversized output, and invalid JSON cases. README test count bumped from 136 to 142.
1 parent 5f28c2d commit 839d6a5

10 files changed

Lines changed: 306 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ and this project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.ht
88
## [Unreleased]
99

1010
### Added
11-
- **Payload transformation schema and API (ADR-003 Phase 1):** endpoints now accept `transformExpression` (JMESPath, max 4096 chars), `transformEnabled` (kill switch, default `false`), and a server-managed `transformValidatedAt` timestamp on create/update. Both the public Bearer-key API (`POST /api/v1/endpoints`, `PUT /api/v1/endpoints/{id}`) and the dashboard endpoints (`POST /api/v1/dashboard/endpoints`, `PUT /api/v1/dashboard/endpoints/{id}`) carry the new fields, and `EndpointResponseDto` exposes them on read. Stored only — pipeline integration (delivery-time application with `JmesPath.Net` + 100 ms timeout + fail-open) and the dashboard editor land in ADR-003 Phase 2 and Phase 3 respectively.
11+
- **Payload transformation delivery integration (ADR-003 Phase 2):** the `DeliveryWorker` now applies the per-endpoint JMESPath expression to the payload before signing and POSTing. Backed by the new `IPayloadTransformer` abstraction and `JmesPathPayloadTransformer` (JmesPath.Net 1.1.0). Hard guardrails enforced at delivery time: 100 ms wall-clock timeout, 256 KB output cap, and a global kill switch via `WebhookEngine:Transformation:Enabled` (defaults to `true`). Every transformation is fail-open — invalid expressions, timeouts, oversized output, or invalid JSON fall back to the original payload with a warning log. New OpenTelemetry counters `webhookengine.transformations.applied` and `webhookengine.transformations.failed_open` track success vs fallback. Six unit tests cover identity, reshape, invalid expression, empty expression, output-size, and invalid-json paths.
12+
- **Payload transformation schema and API (ADR-003 Phase 1):** endpoints now accept `transformExpression` (JMESPath, max 4096 chars), `transformEnabled` (kill switch, default `false`), and a server-managed `transformValidatedAt` timestamp on create/update. Both the public Bearer-key API (`POST /api/v1/endpoints`, `PUT /api/v1/endpoints/{id}`) and the dashboard endpoints (`POST /api/v1/dashboard/endpoints`, `PUT /api/v1/dashboard/endpoints/{id}`) carry the new fields, and `EndpointResponseDto` exposes them on read. The dashboard expression editor and live preview land in ADR-003 Phase 3.
1213
- **Security automations:** CodeQL workflow (csharp + javascript-typescript, push/PR/Mondays at 06:30 UTC), Dependency Review action on PRs (high-severity fail + GPL/LGPL/AGPL/EUPL/SSPL deny-list), and Dependabot config covering NuGet, npm, GitHub Actions, and Docker base images. Five repo labels (`dependencies`, `nuget`, `npm`, `ci`, `docker`) created to support the Dependabot config.
1314

1415
### Changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ All configuration is via `appsettings.json` or environment variables (double-und
272272
# Build
273273
dotnet build WebhookEngine.sln
274274

275-
# Run all tests (136 tests)
275+
# Run all tests (142 tests)
276276
dotnet test WebhookEngine.sln
277277

278278
# Run specific test project

src/WebhookEngine.API/Program.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
builder.Services.Configure<DashboardAuthOptions>(builder.Configuration.GetSection(DashboardAuthOptions.SectionName));
3737
builder.Services.Configure<RetentionOptions>(builder.Configuration.GetSection(RetentionOptions.SectionName));
3838
builder.Services.Configure<RateLimitOptions>(builder.Configuration.GetSection(RateLimitOptions.SectionName));
39+
builder.Services.Configure<TransformationOptions>(builder.Configuration.GetSection(TransformationOptions.SectionName));
3940

4041
// Database
4142
builder.Services.AddDbContext<WebhookDbContext>(options =>
@@ -69,6 +70,7 @@
6970
builder.Services.AddSingleton<IMessageStateMachine, MessageStateMachine>();
7071
builder.Services.AddSingleton<IDeliveryNotifier, SignalRDeliveryNotifier>();
7172
builder.Services.AddSingleton<IDevTrafficGenerator, DevTrafficGenerator>();
73+
builder.Services.AddSingleton<IPayloadTransformer, JmesPathPayloadTransformer>();
7274

7375
// Background Workers
7476
builder.Services.AddHostedService<DeliveryWorker>();
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
namespace WebhookEngine.Core.Interfaces;
2+
3+
/// <summary>
4+
/// Applies a declarative payload transformation (JMESPath expression) to a JSON payload
5+
/// before delivery. Implementations must be fail-safe: any error during transformation
6+
/// returns <see cref="PayloadTransformResult.FailOpen"/> so the caller can fall back to
7+
/// the original payload, never blocking delivery (ADR-003).
8+
/// </summary>
9+
public interface IPayloadTransformer
10+
{
11+
/// <summary>
12+
/// Apply <paramref name="expression"/> to <paramref name="payload"/> with built-in
13+
/// timeout and output-size guards. Always returns a result; check
14+
/// <see cref="PayloadTransformResult.IsSuccess"/> to decide whether to use the
15+
/// transformed payload or fall back to the original.
16+
/// </summary>
17+
PayloadTransformResult Transform(string expression, string payload);
18+
}
19+
20+
public sealed record PayloadTransformResult(bool IsSuccess, string? TransformedPayload, string? Error)
21+
{
22+
public static PayloadTransformResult Success(string transformed) => new(true, transformed, null);
23+
public static PayloadTransformResult FailOpen(string error) => new(false, null, error);
24+
}

src/WebhookEngine.Core/Metrics/WebhookMetrics.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public sealed class WebhookMetrics
2121
private readonly Counter<long> _staleLockRecovered;
2222
private readonly Histogram<double> _deliveryDurationMs;
2323
private readonly UpDownCounter<long> _queueDepth;
24+
private readonly Counter<long> _transformationsApplied;
25+
private readonly Counter<long> _transformationsFailedOpen;
2426

2527
public WebhookMetrics(IMeterFactory meterFactory)
2628
{
@@ -80,6 +82,16 @@ public WebhookMetrics(IMeterFactory meterFactory)
8082
"webhookengine.queue.depth",
8183
unit: "{message}",
8284
description: "Approximate queue depth (enqueue increments, dequeue decrements)");
85+
86+
_transformationsApplied = meter.CreateCounter<long>(
87+
"webhookengine.transformations.applied",
88+
unit: "{transformation}",
89+
description: "Successful payload transformations applied before delivery");
90+
91+
_transformationsFailedOpen = meter.CreateCounter<long>(
92+
"webhookengine.transformations.failed_open",
93+
unit: "{transformation}",
94+
description: "Payload transformations that failed and fell back to the original payload");
8395
}
8496

8597
public void RecordMessageEnqueued(int count = 1) => _messagesEnqueued.Add(count);
@@ -111,4 +123,8 @@ public void RecordDeliveryFailure(double durationMs)
111123
public void RecordQueueEnqueue(int count = 1) => _queueDepth.Add(count);
112124

113125
public void RecordQueueDequeue(int count) => _queueDepth.Add(-count);
126+
127+
public void RecordTransformationApplied() => _transformationsApplied.Add(1);
128+
129+
public void RecordTransformationFailedOpen() => _transformationsFailedOpen.Add(1);
114130
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
namespace WebhookEngine.Core.Options;
2+
3+
/// <summary>
4+
/// Global configuration for the payload transformation pipeline (ADR-003).
5+
/// All limits are enforced at delivery time before the HTTP POST. Per-endpoint
6+
/// transformation toggles live on the <c>Endpoint</c> entity (TransformEnabled,
7+
/// TransformExpression).
8+
/// </summary>
9+
public class TransformationOptions
10+
{
11+
public const string SectionName = "WebhookEngine:Transformation";
12+
13+
/// <summary>
14+
/// Global kill switch. When false, no transformations are applied regardless
15+
/// of per-endpoint settings — every delivery uses the original payload.
16+
/// </summary>
17+
public bool Enabled { get; set; } = true;
18+
19+
/// <summary>
20+
/// Hard timeout per JMESPath evaluation in milliseconds. Pathological
21+
/// expressions are aborted and the delivery falls back to the original payload.
22+
/// </summary>
23+
public int TimeoutMs { get; set; } = 100;
24+
25+
/// <summary>
26+
/// Maximum size of the transformed payload in bytes (UTF-8). Prevents
27+
/// transformations from inflating output beyond the message-size budget.
28+
/// Matches the input payload limit (256 KB).
29+
/// </summary>
30+
public int MaxOutputBytes { get; set; } = 256 * 1024;
31+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using System.Text;
2+
using DevLab.JmesPath;
3+
using Microsoft.Extensions.Logging;
4+
using Microsoft.Extensions.Options;
5+
using WebhookEngine.Core.Interfaces;
6+
using WebhookEngine.Core.Options;
7+
8+
namespace WebhookEngine.Infrastructure.Services;
9+
10+
/// <summary>
11+
/// JMESPath-backed payload transformer (ADR-003). Wraps the JmesPath.Net library
12+
/// with a hard timeout, an output-size guard, and a fail-open contract: any
13+
/// error returns <see cref="PayloadTransformResult.FailOpen"/> so callers fall
14+
/// back to the original payload. Stateless and safe to register as a singleton.
15+
/// </summary>
16+
public sealed class JmesPathPayloadTransformer : IPayloadTransformer
17+
{
18+
private readonly TransformationOptions _options;
19+
private readonly ILogger<JmesPathPayloadTransformer> _logger;
20+
private readonly JmesPath _jmesPath = new();
21+
22+
public JmesPathPayloadTransformer(
23+
IOptions<TransformationOptions> options,
24+
ILogger<JmesPathPayloadTransformer> logger)
25+
{
26+
_options = options.Value;
27+
_logger = logger;
28+
}
29+
30+
public PayloadTransformResult Transform(string expression, string payload)
31+
{
32+
if (string.IsNullOrWhiteSpace(expression))
33+
{
34+
return PayloadTransformResult.FailOpen("Expression is empty.");
35+
}
36+
37+
// Run the JMESPath evaluation on a thread-pool task so we can enforce a
38+
// hard wall-clock timeout even if the expression hits a pathological
39+
// pattern that the parser does not detect ahead of time.
40+
var task = Task.Run(() => _jmesPath.Transform(payload, expression));
41+
42+
try
43+
{
44+
if (!task.Wait(TimeSpan.FromMilliseconds(_options.TimeoutMs)))
45+
{
46+
_logger.LogWarning(
47+
"JMESPath transformation timed out after {TimeoutMs}ms for expression {Expression}",
48+
_options.TimeoutMs, expression);
49+
return PayloadTransformResult.FailOpen($"Timeout after {_options.TimeoutMs}ms.");
50+
}
51+
}
52+
catch (AggregateException ex) when (ex.InnerException is not null)
53+
{
54+
_logger.LogWarning(
55+
ex.InnerException,
56+
"JMESPath transformation failed for expression {Expression}",
57+
expression);
58+
return PayloadTransformResult.FailOpen(ex.InnerException.Message);
59+
}
60+
catch (Exception ex)
61+
{
62+
_logger.LogWarning(ex, "JMESPath transformation failed for expression {Expression}", expression);
63+
return PayloadTransformResult.FailOpen(ex.Message);
64+
}
65+
66+
var transformed = task.Result;
67+
if (transformed is null)
68+
{
69+
return PayloadTransformResult.FailOpen("JMESPath returned null result.");
70+
}
71+
72+
// Reject results that exceed the output budget to keep delivery payloads
73+
// bounded. UTF-8 byte length matches what the HTTP body will weigh.
74+
var byteCount = Encoding.UTF8.GetByteCount(transformed);
75+
if (byteCount > _options.MaxOutputBytes)
76+
{
77+
_logger.LogWarning(
78+
"JMESPath transformation output exceeded {MaxOutputBytes} bytes ({ActualBytes}) for expression {Expression}",
79+
_options.MaxOutputBytes, byteCount, expression);
80+
return PayloadTransformResult.FailOpen(
81+
$"Output size {byteCount} exceeds limit {_options.MaxOutputBytes}.");
82+
}
83+
84+
return PayloadTransformResult.Success(transformed);
85+
}
86+
}

src/WebhookEngine.Infrastructure/WebhookEngine.Infrastructure.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10+
<PackageReference Include="JmesPath.Net" Version="1.1.0" />
1011
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="10.0.5" />
1112
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="10.0.1" />
1213
<PackageReference Include="Microsoft.Extensions.Http" Version="10.0.5" />

src/WebhookEngine.Worker/DeliveryWorker.cs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ public class DeliveryWorker : BackgroundService
1919
private readonly ILogger<DeliveryWorker> _logger;
2020
private readonly DeliveryOptions _deliveryOptions;
2121
private readonly RetryPolicyOptions _retryPolicy;
22+
private readonly TransformationOptions _transformationOptions;
2223
private readonly IEndpointRateLimiter _endpointRateLimiter;
24+
private readonly IPayloadTransformer _payloadTransformer;
2325
private readonly WebhookMetrics? _metrics;
2426
private readonly string _workerId = $"worker_{Environment.MachineName}_{Guid.NewGuid().ToString("N")[..8]}";
2527

@@ -28,14 +30,18 @@ public DeliveryWorker(
2830
ILogger<DeliveryWorker> logger,
2931
IOptions<DeliveryOptions> deliveryOptions,
3032
IOptions<RetryPolicyOptions> retryPolicy,
33+
IOptions<TransformationOptions> transformationOptions,
3134
IEndpointRateLimiter endpointRateLimiter,
35+
IPayloadTransformer payloadTransformer,
3236
WebhookMetrics? metrics = null)
3337
{
3438
_serviceProvider = serviceProvider;
3539
_logger = logger;
3640
_deliveryOptions = deliveryOptions.Value;
3741
_retryPolicy = retryPolicy.Value;
42+
_transformationOptions = transformationOptions.Value;
3843
_endpointRateLimiter = endpointRateLimiter;
44+
_payloadTransformer = payloadTransformer;
3945
_metrics = metrics;
4046
}
4147

@@ -169,9 +175,15 @@ private async Task ProcessMessageAsync(
169175
return;
170176
}
171177

172-
// Sign the payload
178+
// Apply payload transformation (ADR-003) before signing — the receiver
179+
// verifies the signature against the body they actually receive, so the
180+
// transformed payload is what we sign. Fail-open: any error keeps the
181+
// original payload and lets delivery proceed.
182+
var deliveryPayload = ApplyTransformation(message.Payload, endpoint, message.Id);
183+
184+
// Sign the (possibly transformed) payload
173185
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
174-
var signedHeaders = signingService.Sign(message.Id.ToString(), timestamp, message.Payload, signingSecret);
186+
var signedHeaders = signingService.Sign(message.Id.ToString(), timestamp, deliveryPayload, signingSecret);
175187

176188
var customHeaders = ParseCustomHeaders(endpoint.CustomHeadersJson);
177189
var requestHeaders = BuildRequestHeaders(signedHeaders, customHeaders);
@@ -181,7 +193,7 @@ private async Task ProcessMessageAsync(
181193
{
182194
MessageId = message.Id.ToString(),
183195
EndpointUrl = endpoint.Url,
184-
Payload = message.Payload,
196+
Payload = deliveryPayload,
185197
SignedHeaders = signedHeaders,
186198
CustomHeaders = customHeaders
187199
};
@@ -280,6 +292,34 @@ private DateTime CalculateNextRetryAt(int currentAttempt)
280292
return DateTime.UtcNow.AddSeconds(backoffSeconds);
281293
}
282294

295+
private string ApplyTransformation(string originalPayload, Core.Entities.Endpoint endpoint, Guid messageId)
296+
{
297+
// Skip when globally disabled or per-endpoint not configured.
298+
if (!_transformationOptions.Enabled
299+
|| !endpoint.TransformEnabled
300+
|| string.IsNullOrWhiteSpace(endpoint.TransformExpression))
301+
{
302+
return originalPayload;
303+
}
304+
305+
var result = _payloadTransformer.Transform(endpoint.TransformExpression, originalPayload);
306+
307+
if (result.IsSuccess && result.TransformedPayload is not null)
308+
{
309+
_metrics?.RecordTransformationApplied();
310+
_logger.LogInformation(
311+
"Applied JMESPath transformation for message {MessageId} on endpoint {EndpointId}",
312+
messageId, endpoint.Id);
313+
return result.TransformedPayload;
314+
}
315+
316+
_metrics?.RecordTransformationFailedOpen();
317+
_logger.LogWarning(
318+
"JMESPath transformation failed for message {MessageId} on endpoint {EndpointId}; falling back to original payload. Error: {Error}",
319+
messageId, endpoint.Id, result.Error);
320+
return originalPayload;
321+
}
322+
283323
private static Dictionary<string, string> ParseCustomHeaders(string? customHeadersJson)
284324
{
285325
if (string.IsNullOrWhiteSpace(customHeadersJson))

0 commit comments

Comments
 (0)