Skip to content

Commit 60c7866

Browse files
committed
feat: payment settlement support refactor
1 parent 06bab4b commit 60c7866

21 files changed

Lines changed: 1622 additions & 79 deletions

File tree

.agents/skills/billing/SKILL.md

Lines changed: 497 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Subscription → Billing Sync Algorithm
2+
3+
Primary location: `openmeter/billing/worker/subscriptionsync/`
4+
5+
## Entry Points
6+
7+
The sync is triggered by the billing worker (`worker/worker.go`) on these events:
8+
9+
| Event | Handler |
10+
|---|---|
11+
| `subscription.Created/Updated/Continued` | `SynchronizeSubscriptionAndInvoiceCustomer` |
12+
| `subscription.Cancelled` | `HandleCancelledEvent` |
13+
| `subscription.SubscriptionSyncEvent` | `HandleSubscriptionSyncEvent` (self-loop) |
14+
| `billing.StandardInvoiceCreatedEvent` | `HandleInvoiceCreation` → re-syncs all subscriptions in the new invoice's lines |
15+
16+
The self-loop mechanism (`SubscriptionSyncEvent`) ensures the gathering invoice is refilled with the next period's lines immediately after an invoice is issued, without waiting for the cron.
17+
18+
## Main Algorithm (`service/sync.go`)
19+
20+
```
21+
SynchronizeSubscriptionAndInvoiceCustomer(ctx, SubscriptionView, asOf)
22+
→ SynchronizeSubscription
23+
→ billingService.WithLock(customerID) ← advisory lock, serializes per customer
24+
→ persistedstate.Loader.LoadForSubscription ← fetch current DB state
25+
→ targetstate.Builder.Build ← compute desired state
26+
→ reconciler.Plan ← diff: new / delete / upsert
27+
→ reconciler.Apply ← write patches to DB
28+
→ updateSyncState ← upsert SubscriptionBillingSyncState row
29+
→ invoicePendingLines ← InvoicePendingLines(ProgressiveBillingOverride=false)
30+
```
31+
32+
## Target State Builder (`service/targetstate/`)
33+
34+
`Builder.Build` iterates each phase and calls `PhaseIterator.Generate(ctx, generationLimit)`.
35+
36+
**Generation limit**: the end of the current aligned billing period (`spec.GetAlignedBillingPeriodAt(asOf)`). Lines are never generated past the current period boundary, except for in-advance items whose `InvoiceAt` equals the period start — these are pre-generated so the gathering invoice is immediately ready.
37+
38+
**Phase iterator loop** (`phaseiterator.go`): for each item in the phase, loops by `BillingCadence` until `InvoiceAt > iterationEnd`. Each iteration produces a `SubscriptionItemWithPeriods` with:
39+
- `ServicePeriod`: the actual usage window for this item cadence period
40+
- `BillingPeriod`: the subscription-aligned outer billing period
41+
42+
### `GetInvoiceAt()` — When Does a Line Appear on an Invoice?
43+
44+
```go
45+
// Flat-fee in-advance: bill at the start of the billing period
46+
if price.Type() == FlatPrice && paymentTerm == InAdvance {
47+
return BillingPeriod.Start
48+
}
49+
// Everything else (in-arrears, usage-based): bill at end of service or billing period
50+
return max(ServicePeriod.End, BillingPeriod.End)
51+
```
52+
53+
This is defined in `phaseiterator.go` and is the canonical source of billing timing truth.
54+
55+
## Reconciler (`service/reconciler/`)
56+
57+
### Plan (`reconciler.go`)
58+
59+
Computes symmetric difference on `ChildUniqueReferenceID` keys:
60+
- **Only in persisted**: `LinesToDelete`
61+
- **Only in target**: `NewSubscriptionItems`
62+
- **In both**: `LinesToUpsert`
63+
64+
### ChildUniqueReferenceID Format
65+
66+
```
67+
{subscriptionID}/{phaseKey}/{itemKey}/v[{version}]/period[{periodIndex}]
68+
```
69+
70+
One-time (non-recurring) items omit `/period[N]`. Version increments when the rate card changes within a phase.
71+
72+
### Apply (`apply.go` + `invoiceupdate.go`)
73+
74+
Translates the plan into `Patch` objects and calls `InvoiceUpdater.ApplyPatches`. The updater handles three invoice states:
75+
76+
| Invoice state | Action |
77+
|---|---|
78+
| Gathering invoice | Direct edit via `UpdateGatheringInvoice` |
79+
| Mutable standard invoice | `UpdateStandardInvoice` + `SnapshotLineQuantity` |
80+
| Immutable standard invoice | Emit `ValidationIssue` (no actual mutation — detects drift) |
81+
82+
## Persisted State (`service/persistedstate/`)
83+
84+
`Loader.LoadForSubscription` calls `billingService.GetLinesForSubscription`, which queries `BillingInvoiceLine` rows where:
85+
- `subscription_id = ?`
86+
- `parent_line_id IS NULL` (top-level lines only; split children handled by mapper)
87+
- Either `deleted_at IS NULL` OR (`deleted_at IS NOT NULL` AND `managed_by = manually_managed`)
88+
89+
Returns `[]billing.LineOrHierarchy` — gathering lines and standard lines/split hierarchies mixed.
90+
91+
## SubscriptionBillingSyncState Table
92+
93+
Managed in `adapter/syncstate.go`. Upserted on `(subscription_id, namespace)` after each sync:
94+
95+
```go
96+
.OnConflictColumns(FieldSubscriptionID, FieldNamespace).
97+
UpdateHasBillables().UpdateSyncedAt().UpdateNextSyncAfter().Exec(ctx)
98+
```
99+
100+
`NextSyncAfter` is set to `SubscriptionMaxGenerationTimeLimit` (the end of the last generated billing period). The worker uses this to know when to next sync without processing every event repeatedly.
101+
102+
## Billing Cadence and Alignment
103+
104+
`SubscriptionSpec.BillingCadence` (ISO duration, e.g. `P1M`) is the master cadence.
105+
`BillingAnchor` is the fixed time anchor for period alignment.
106+
107+
`GetAlignedBillingPeriodAt(at)` in `subscription/subscriptionspec.go`:
108+
1. Finds the active phase at `at`
109+
2. Builds `timeutil.Recurrence` from `(BillingCadence, BillingAnchor)`
110+
3. Calls `recurrence.GetPeriodAt(at)` to find the enclosing period
111+
4. Clips boundaries to phase start/end
112+
113+
## Subscription-Level Timing Resolution
114+
115+
`Timing.ResolveForSpec` in `subscription/timing.go` resolves `next_billing_cycle` to `GetAlignedBillingPeriodAt(now).To`. Subscription edits with `next_billing_cycle` timing are deferred to the next billing period boundary.
116+
117+
## Subscription Validator (`validators/subscription/validator.go`)
118+
119+
Implements `subscription.SubscriptionCommandHook`. On `AfterCreate` and `AfterUpdate`, validates:
120+
- Customer has a `CustomerOverride` pointing to a billing profile
121+
- Profile apps have the required capabilities: `CapabilityTypeCalculateTax`, `CapabilityTypeInvoiceCustomers`, `CapabilityTypeCollectPayments`
122+
123+
If the subscription has no priced rate cards, validation is skipped entirely.
124+
125+
## Annotations
126+
127+
On `GatheringLine` and `StandardLine`:
128+
129+
- `AnnotationSubscriptionSyncIgnore` — sync algorithm skips this line (managed externally)
130+
- `AnnotationSubscriptionSyncForceContinuousLines` — forces progressive billing to treat lines as continuous even if the pricing type would normally disallow it (use with care)
131+
132+
Both are defined in `billing/annotations.go`.
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
# Billing Test Patterns
2+
3+
## Suite Hierarchy
4+
5+
```
6+
billingtest.BaseSuite (test/billing/suite.go)
7+
↳ billingtest.SubscriptionMixin (test/billing/subscription_suite.go)
8+
↳ subscriptionsync.SuiteBase (worker/subscriptionsync/service/suitebase_test.go)
9+
```
10+
11+
Embed the lowest suite that gives you what you need. Most billing service tests only need `BaseSuite`. Tests involving subscription creation need `SubscriptionMixin`. Tests for the sync algorithm need `SuiteBase`.
12+
13+
## BaseSuite (`test/billing/suite.go`)
14+
15+
Sets up a full in-process stack with a real PostgreSQL database:
16+
17+
1. `testutils.InitPostgresDB(t)` — real Postgres test DB (requires `POSTGRES_HOST=127.0.0.1`)
18+
2. Atlas migrations unless `TEST_DISABLE_ATLAS` is set (falls back to Ent auto-create)
19+
3. Full billing service + adapter chain
20+
4. `ForegroundAdvancementStrategy` — state machine runs synchronously in tests (no async events)
21+
5. `MockStreamingConnector` for meter queries
22+
6. `invoicecalc.NewMockableCalculator` for overriding pricing calculations
23+
7. Sandbox app factory + customer/subject sync hooks
24+
25+
**Key exposed fields:**
26+
```go
27+
BillingService billing.Service
28+
BillingAdapter billing.Adapter
29+
MockStreamingConnector *streamingtestutils.MockStreamingConnector
30+
MeterAdapter *metermock.MockRepository
31+
CustomerService customer.Service
32+
FeatureService feature.FeatureConnector
33+
```
34+
35+
**Namespace isolation:**
36+
```go
37+
ns := s.GetUniqueNamespace("my-test") // returns "my-test-{ulid}"
38+
```
39+
Always use unique namespaces to isolate test data between test cases.
40+
41+
## SubscriptionMixin (`test/billing/subscription_suite.go`)
42+
43+
Call `mixin.SetupSuite(t, deps)` from your suite's `SetupSuite`. Adds:
44+
- `PlanService`, `SubscriptionService`, `SubscriptionAddonService`, `SubscriptionWorkflowService`
45+
- `EntitlementConnector` (full entitlement/credit/grant stack for metered entitlements)
46+
47+
**Access via:**
48+
```go
49+
s.PlanService
50+
s.SubscriptionService
51+
s.SubscriptionWorkflowService
52+
```
53+
54+
## SuiteBase for Sync Tests (`worker/subscriptionsync/service/suitebase_test.go`)
55+
56+
Embeds both `BaseSuite` and `SubscriptionMixin`. Also provides:
57+
- `SubscriptionSyncService subscriptionsync.Service`
58+
- `SubscriptionSyncAdapter subscriptionsync.Adapter`
59+
60+
**Per-test setup** (`BeforeTest`):
61+
```go
62+
// Creates fresh per-test state:
63+
ns := getUniqueTestNamespace(suiteName, testName)
64+
s.InstallSandboxApp(t, ns)
65+
s.ProvisionBillingProfile(t, ns)
66+
// Creates test meter + feature
67+
// Creates test customer
68+
```
69+
70+
**Per-test teardown** (`AfterTest`):
71+
```go
72+
clock.UnFreeze()
73+
s.MockStreamingConnector.Reset()
74+
// resets feature flags on the service
75+
```
76+
77+
## Provisioning Helpers
78+
79+
### `InstallSandboxApp(t, ns)`
80+
Required before any invoice operations. Installs the sandbox invoicing app in the namespace.
81+
82+
### `ProvisionBillingProfile(t, ns, opts...)`
83+
Creates a billing profile with option functions:
84+
85+
```go
86+
s.ProvisionBillingProfile(t, ns,
87+
billingtest.WithProgressiveBilling(),
88+
billingtest.WithCollectionInterval(isodate.MustParse("P1D")),
89+
billingtest.WithManualApproval(),
90+
billingtest.WithBillingProfileEditFn(func(p *billing.CreateProfileInput) {
91+
p.WorkflowConfig.Tax.Enabled = true
92+
}),
93+
)
94+
```
95+
96+
Default: auto-advance, monthly collection, immediate approval.
97+
98+
### Subscription Creation Helpers (on `SuiteBase`)
99+
100+
```go
101+
// Create from explicit phase definitions
102+
sub, err := s.createSubscriptionFromPlanPhases([]subscriptiontestutils.CreatePhasesInput{...})
103+
104+
// Create from a full plan input
105+
sub, err := s.createSubscriptionFromPlan(plan.CreatePlanInput{...})
106+
```
107+
108+
## Gathering Invoice Helpers (on `SuiteBase`)
109+
110+
```go
111+
// Assert exactly 1 gathering invoice exists and return it with lines expanded
112+
gi := s.gatheringInvoice(ctx, ns, customerID)
113+
114+
// Assert no gathering invoice exists
115+
s.expectNoGatheringInvoice(ctx, ns, customerID)
116+
117+
// Verify lines on an invoice
118+
s.expectLines(invoice, subscriptionID, []expectedLine{
119+
{PhaseKey: "default", ItemKey: "api-calls", ...},
120+
})
121+
```
122+
123+
### `recurringLineMatcher{PhaseKey, ItemKey, Version, PeriodMin, PeriodMax}`
124+
Generates expected `ChildUniqueReferenceID` strings for a range of billing periods:
125+
```go
126+
matcher := recurringLineMatcher{
127+
PhaseKey: "default",
128+
ItemKey: "api-calls",
129+
Version: 0,
130+
PeriodMin: 0,
131+
PeriodMax: 2,
132+
}
133+
// Generates: {subID}/default/api-calls/v[0]/period[0], /period[1], /period[2]
134+
```
135+
136+
## MockStreamingConnector
137+
138+
```go
139+
// Set meter values returned by queries
140+
s.MockStreamingConnector.AddSimpleEvent(meterSlug, value, at)
141+
142+
// Or set a fixed return value for all queries
143+
s.MockStreamingConnector.SetDefaultMeter(meterSlug, value)
144+
145+
// Reset all values (called in AfterTest)
146+
s.MockStreamingConnector.Reset()
147+
```
148+
149+
## MockableInvoiceCalculator
150+
151+
Override the invoice calculator for a single test:
152+
```go
153+
s.BillingService.GetInvoiceCalculator().(*invoicecalc.MockableCalculator).
154+
SetupMock(func(inv *billing.StandardInvoice) {
155+
// modify the invoice directly before totals are calculated
156+
})
157+
defer s.BillingService.GetInvoiceCalculator().(*invoicecalc.MockableCalculator).Reset()
158+
```
159+
160+
## Clock Control
161+
162+
```go
163+
clock.SetTime(t1) // advance clock without freezing
164+
clock.FreezeTime(t) // freeze at specific time (t is *testing.T for cleanup)
165+
clock.UnFreeze() // manual unfreeze (also called in AfterTest)
166+
clock.ResetTime() // reset to wall clock time
167+
```
168+
169+
Always call `clock.UnFreeze()` in `AfterTest` or use `clock.FreezeTime(t)` which registers cleanup automatically.
170+
171+
## Progressive Billing Test Helpers
172+
173+
```go
174+
// Enable progressive billing on the billing profile
175+
s.enableProgressiveBilling()
176+
177+
// Set feature flags on the service directly (bypasses profile)
178+
s.enableProrating()
179+
```
180+
181+
## `RemoveMetaForCompare()`
182+
183+
Use before `require.Equal` comparisons to strip DB-only fields:
184+
```go
185+
expected.RemoveMetaForCompare()
186+
actual.RemoveMetaForCompare()
187+
s.Equal(expected, actual)
188+
```
189+
190+
Available on both `StandardInvoice` and `StandardLine`. Strips: `DBState`, `DetailedLines`, IDs, timestamps.
191+
192+
## Running Billing Tests
193+
194+
```bash
195+
# All billing tests (requires postgres)
196+
POSTGRES_HOST=127.0.0.1 go test -tags=dynamic -v ./openmeter/billing/...
197+
198+
# Just sync algorithm tests
199+
POSTGRES_HOST=127.0.0.1 go test -tags=dynamic -v ./openmeter/billing/worker/subscriptionsync/service/...
200+
201+
# Just charges tests
202+
POSTGRES_HOST=127.0.0.1 go test -tags=dynamic -v ./openmeter/billing/charges/...
203+
204+
# Full billing integration tests (test/ package)
205+
POSTGRES_HOST=127.0.0.1 go test -tags=dynamic -v ./test/billing/...
206+
```
207+
208+
Skip atlas migrations (faster, uses Ent auto-create):
209+
```bash
210+
TEST_DISABLE_ATLAS=true POSTGRES_HOST=127.0.0.1 go test -tags=dynamic -v ./openmeter/billing/...
211+
```

openmeter/billing/adapter/invoice.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,8 +850,15 @@ func (a *adapter) IsAppUsed(ctx context.Context, appID app.AppID) error {
850850
billing.StandardInvoiceStatusIssuingChargeBookingFailed,
851851
billing.StandardInvoiceStatusIssued,
852852
billing.StandardInvoiceStatusPaymentProcessingPending,
853+
billing.StandardInvoiceStatusPaymentProcessingBookingAuthorized,
854+
billing.StandardInvoiceStatusPaymentProcessingBookingAuthorizedFailed,
855+
billing.StandardInvoiceStatusPaymentProcessingBookingAuthorizedAndSettled,
856+
billing.StandardInvoiceStatusPaymentProcessingBookingAuthorizedAndSettledFailed,
857+
billing.StandardInvoiceStatusPaymentProcessingAuthorized,
853858
billing.StandardInvoiceStatusPaymentProcessingFailed,
854859
billing.StandardInvoiceStatusPaymentProcessingActionRequired,
860+
billing.StandardInvoiceStatusPaymentProcessingBookingSettled,
861+
billing.StandardInvoiceStatusPaymentProcessingBookingSettledFailed,
855862
billing.StandardInvoiceStatusOverdue,
856863
),
857864
billinginvoice.DeletedAtIsNil(),

openmeter/billing/charges/creditpurchase/lineengine/engine.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,14 @@ func (e *Engine) OnInvoiceIssued(_ context.Context, _ billing.OnInvoiceIssuedInp
8787
return nil
8888
}
8989

90+
func (e *Engine) OnPaymentAuthorized(_ context.Context, _ billing.OnPaymentAuthorizedInput) error {
91+
return nil
92+
}
93+
94+
func (e *Engine) OnPaymentSettled(_ context.Context, _ billing.OnPaymentSettledInput) error {
95+
return nil
96+
}
97+
9098
func (e *Engine) CalculateLines(input billing.CalculateLinesInput) (billing.StandardLines, error) {
9199
if input.Invoice.ID == "" {
92100
return nil, fmt.Errorf("invoice id is required")

openmeter/billing/charges/flatfee/lineengine/engine.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,14 @@ func (e *Engine) OnInvoiceIssued(_ context.Context, _ billing.OnInvoiceIssuedInp
130130
return nil
131131
}
132132

133+
func (e *Engine) OnPaymentAuthorized(_ context.Context, _ billing.OnPaymentAuthorizedInput) error {
134+
return nil
135+
}
136+
137+
func (e *Engine) OnPaymentSettled(_ context.Context, _ billing.OnPaymentSettledInput) error {
138+
return nil
139+
}
140+
133141
func (e *Engine) CalculateLines(input billing.CalculateLinesInput) (billing.StandardLines, error) {
134142
if input.Invoice.ID == "" {
135143
return nil, fmt.Errorf("invoice id is required")

0 commit comments

Comments
 (0)