Skip to content

Commit f3eec6f

Browse files
authored
feat: detailed line persisted flag (#4255)
1 parent c19dca6 commit f3eec6f

15 files changed

Lines changed: 471 additions & 8 deletions

openmeter/billing/charges/usagebased/adapter/detailedline.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/openmeterio/openmeter/openmeter/billing/models/stddetailedline"
1616
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
1717
dbchargeusagebasedrundetailedline "github.com/openmeterio/openmeter/openmeter/ent/db/chargeusagebasedrundetailedline"
18+
dbchargeusagebasedruns "github.com/openmeterio/openmeter/openmeter/ent/db/chargeusagebasedruns"
1819
"github.com/openmeterio/openmeter/pkg/clock"
1920
"github.com/openmeterio/openmeter/pkg/framework/entutils"
2021
)
@@ -42,6 +43,22 @@ func (a *adapter) FetchDetailedLines(ctx context.Context, charge usagebased.Char
4243
return usagebased.Charge{}, err
4344
}
4445

46+
dbRuns, err := tx.db.ChargeUsageBasedRuns.Query().
47+
Where(
48+
dbchargeusagebasedruns.NamespaceEQ(charge.Namespace),
49+
dbchargeusagebasedruns.ChargeIDEQ(charge.ID),
50+
dbchargeusagebasedruns.IDIn(runIDs...),
51+
).
52+
All(ctx)
53+
if err != nil {
54+
return usagebased.Charge{}, err
55+
}
56+
57+
detailedLinesPresentByRunID := make(map[string]bool, len(dbRuns))
58+
for _, dbRun := range dbRuns {
59+
detailedLinesPresentByRunID[dbRun.ID] = dbRun.DetailedLinesPresent
60+
}
61+
4562
linesByRunID := make(map[string]usagebased.DetailedLines, len(charge.Realizations))
4663
for _, dbLine := range dbLines {
4764
line, err := mapDetailedLineFromDB(dbLine)
@@ -55,7 +72,22 @@ func (a *adapter) FetchDetailedLines(ctx context.Context, charge usagebased.Char
5572
for idx, run := range charge.Realizations {
5673
lines := linesByRunID[run.ID.ID]
5774
slices.SortStableFunc(lines, stddetailedline.Compare[usagebased.DetailedLine])
58-
charge.Realizations[idx].DetailedLines = mo.Some(lines)
75+
76+
detailedLinesPresent, found := detailedLinesPresentByRunID[run.ID.ID]
77+
if !found {
78+
charge.Realizations[idx].DetailedLines = mo.None[usagebased.DetailedLines]()
79+
continue
80+
}
81+
82+
// Safety measure: only mark detailed lines as expanded when the persisted
83+
// run records that detailed lines were written at least once. Treating
84+
// unknown detailed lines as an empty set can make
85+
// late-event rating overcharge.
86+
if detailedLinesPresent {
87+
charge.Realizations[idx].DetailedLines = mo.Some(lines)
88+
} else {
89+
charge.Realizations[idx].DetailedLines = mo.None[usagebased.DetailedLines]()
90+
}
5991
}
6092

6193
return charge, nil
@@ -118,6 +150,17 @@ func (a *adapter) UpsertRunDetailedLines(ctx context.Context, chargeID chargesme
118150
return err
119151
}
120152

153+
if _, err := tx.db.ChargeUsageBasedRuns.Update().
154+
Where(
155+
dbchargeusagebasedruns.NamespaceEQ(runID.Namespace),
156+
dbchargeusagebasedruns.ChargeIDEQ(chargeID.ID),
157+
dbchargeusagebasedruns.ID(runID.ID),
158+
).
159+
SetDetailedLinesPresent(true).
160+
Save(ctx); err != nil {
161+
return err
162+
}
163+
121164
if len(createBuilders) == 0 {
122165
return nil
123166
}

openmeter/billing/charges/usagebased/adapter/detailedline_test.go

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"time"
77

88
"github.com/alpacahq/alpacadecimal"
9+
"github.com/oklog/ulid/v2"
910
"github.com/samber/lo"
11+
"github.com/samber/mo"
1012
"github.com/stretchr/testify/require"
1113
"github.com/stretchr/testify/suite"
1214

@@ -18,6 +20,7 @@ import (
1820
"github.com/openmeterio/openmeter/openmeter/billing/models/totals"
1921
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
2022
dbchargeusagebasedrundetailedline "github.com/openmeterio/openmeter/openmeter/ent/db/chargeusagebasedrundetailedline"
23+
dbchargeusagebasedruns "github.com/openmeterio/openmeter/openmeter/ent/db/chargeusagebasedruns"
2124
"github.com/openmeterio/openmeter/openmeter/productcatalog"
2225
"github.com/openmeterio/openmeter/openmeter/testutils"
2326
"github.com/openmeterio/openmeter/pkg/currencyx"
@@ -206,6 +209,220 @@ func (s *DetailedLineAdapterSuite) TestUpsertRunDetailedLinesReplacesAndSoftDele
206209
s.NotNil(deletedRow.DeletedAt)
207210
}
208211

212+
func (s *DetailedLineAdapterSuite) TestFetchDetailedLinesUsesDetailedLinesPresentFlag() {
213+
ctx := s.T().Context()
214+
namespace := "usagebased-detailedline-adapter-fetch-flag"
215+
charge, runBase, _ := s.createChargeWithRun(namespace)
216+
217+
fetchedWithoutMaterializedLines, err := s.adapter.GetByID(ctx, usagebased.GetByIDInput{
218+
ChargeID: charge.GetChargeID(),
219+
Expands: chargesmeta.Expands{
220+
chargesmeta.ExpandRealizations,
221+
chargesmeta.ExpandDetailedLines,
222+
},
223+
})
224+
s.Require().NoError(err)
225+
s.Require().Len(fetchedWithoutMaterializedLines.Realizations, 1)
226+
s.False(fetchedWithoutMaterializedLines.Realizations[0].DetailedLines.IsPresent())
227+
228+
s.Require().NoError(s.adapter.UpsertRunDetailedLines(ctx, charge.GetChargeID(), runBase.ID, nil))
229+
230+
fetchedWithMaterializedEmptyLines, err := s.adapter.GetByID(ctx, usagebased.GetByIDInput{
231+
ChargeID: charge.GetChargeID(),
232+
Expands: chargesmeta.Expands{
233+
chargesmeta.ExpandRealizations,
234+
chargesmeta.ExpandDetailedLines,
235+
},
236+
})
237+
s.Require().NoError(err)
238+
s.Require().Len(fetchedWithMaterializedEmptyLines.Realizations, 1)
239+
s.True(fetchedWithMaterializedEmptyLines.Realizations[0].DetailedLines.IsPresent())
240+
s.Empty(fetchedWithMaterializedEmptyLines.Realizations[0].DetailedLines.OrEmpty())
241+
242+
dbRun, err := s.dbClient.ChargeUsageBasedRuns.Query().
243+
Where(
244+
dbchargeusagebasedruns.NamespaceEQ(namespace),
245+
dbchargeusagebasedruns.ID(runBase.ID.ID),
246+
).
247+
Only(ctx)
248+
s.Require().NoError(err)
249+
s.True(dbRun.DetailedLinesPresent)
250+
}
251+
252+
func (s *DetailedLineAdapterSuite) TestFetchDetailedLinesDoesNotRepairDetailedLinesPresentFlagWhenRowsExist() {
253+
ctx := s.T().Context()
254+
namespace := "usagebased-detailedline-adapter-fetch-does-not-repair-flag"
255+
charge, runBase, servicePeriod := s.createChargeWithRun(namespace)
256+
257+
s.Require().NoError(s.adapter.UpsertRunDetailedLines(ctx, charge.GetChargeID(), runBase.ID, usagebased.DetailedLines{
258+
s.newDetailedLine(newDetailedLineInput{
259+
Charge: charge,
260+
RunID: runBase.ID,
261+
ServicePeriod: servicePeriod,
262+
ChildUniqueReferenceID: "existing@[2026-01-01T00:00:00Z..2026-02-01T00:00:00Z]",
263+
Quantity: 1,
264+
}),
265+
}))
266+
267+
_, err := s.dbClient.ChargeUsageBasedRuns.UpdateOneID(runBase.ID.ID).
268+
Where(dbchargeusagebasedruns.NamespaceEQ(namespace)).
269+
SetDetailedLinesPresent(false).
270+
Save(ctx)
271+
s.Require().NoError(err)
272+
273+
fetchedCharge, err := s.adapter.GetByID(ctx, usagebased.GetByIDInput{
274+
ChargeID: charge.GetChargeID(),
275+
Expands: chargesmeta.Expands{
276+
chargesmeta.ExpandRealizations,
277+
chargesmeta.ExpandDetailedLines,
278+
},
279+
})
280+
s.Require().NoError(err)
281+
s.Require().Len(fetchedCharge.Realizations, 1)
282+
s.False(fetchedCharge.Realizations[0].DetailedLines.IsPresent())
283+
284+
dbRun, err := s.dbClient.ChargeUsageBasedRuns.Query().
285+
Where(
286+
dbchargeusagebasedruns.NamespaceEQ(namespace),
287+
dbchargeusagebasedruns.ID(runBase.ID.ID),
288+
).
289+
Only(ctx)
290+
s.Require().NoError(err)
291+
s.False(dbRun.DetailedLinesPresent)
292+
}
293+
294+
func (s *DetailedLineAdapterSuite) TestFetchDetailedLinesUsesPersistedDetailedLinesPresentFlag() {
295+
ctx := s.T().Context()
296+
namespace := "usagebased-detailedline-adapter-fetch-uses-persisted-flag"
297+
charge, runBase, servicePeriod := s.createChargeWithRun(namespace)
298+
299+
s.Require().NoError(s.adapter.UpsertRunDetailedLines(ctx, charge.GetChargeID(), runBase.ID, usagebased.DetailedLines{
300+
s.newDetailedLine(newDetailedLineInput{
301+
Charge: charge,
302+
RunID: runBase.ID,
303+
ServicePeriod: servicePeriod,
304+
ChildUniqueReferenceID: "persisted@[2026-01-01T00:00:00Z..2026-02-01T00:00:00Z]",
305+
Quantity: 1,
306+
}),
307+
}))
308+
309+
_, err := s.dbClient.ChargeUsageBasedRuns.UpdateOneID(runBase.ID.ID).
310+
Where(dbchargeusagebasedruns.NamespaceEQ(namespace)).
311+
SetDetailedLinesPresent(false).
312+
Save(ctx)
313+
s.Require().NoError(err)
314+
315+
staleCharge := charge
316+
staleCharge.Realizations = usagebased.RealizationRuns{
317+
{
318+
RealizationRunBase: runBase,
319+
},
320+
}
321+
staleCharge.Realizations[0].DetailedLines = mo.Some(usagebased.DetailedLines{
322+
s.newDetailedLine(newDetailedLineInput{
323+
Charge: charge,
324+
RunID: runBase.ID,
325+
ServicePeriod: servicePeriod,
326+
ChildUniqueReferenceID: "stale@[2026-01-01T00:00:00Z..2026-02-01T00:00:00Z]",
327+
Quantity: 1,
328+
}),
329+
})
330+
331+
fetchedCharge, err := s.adapter.FetchDetailedLines(ctx, staleCharge)
332+
s.Require().NoError(err)
333+
s.Require().Len(fetchedCharge.Realizations, 1)
334+
s.False(fetchedCharge.Realizations[0].DetailedLines.IsPresent())
335+
}
336+
337+
func (s *DetailedLineAdapterSuite) TestFetchDetailedLinesClearsStaleDetailedLinesWhenRunMetadataIsMissing() {
338+
ctx := s.T().Context()
339+
namespace := "usagebased-detailedline-adapter-fetch-missing-run-metadata"
340+
charge, runBase, servicePeriod := s.createChargeWithRun(namespace)
341+
342+
missingRunBase := runBase
343+
missingRunBase.ID.ID = ulid.Make().String()
344+
345+
staleCharge := charge
346+
staleCharge.Realizations = usagebased.RealizationRuns{
347+
{
348+
RealizationRunBase: missingRunBase,
349+
DetailedLines: mo.Some(usagebased.DetailedLines{
350+
s.newDetailedLine(newDetailedLineInput{
351+
Charge: charge,
352+
RunID: missingRunBase.ID,
353+
ServicePeriod: servicePeriod,
354+
ChildUniqueReferenceID: "stale@[2026-01-01T00:00:00Z..2026-02-01T00:00:00Z]",
355+
Quantity: 1,
356+
}),
357+
}),
358+
},
359+
}
360+
361+
fetchedCharge, err := s.adapter.FetchDetailedLines(ctx, staleCharge)
362+
s.Require().NoError(err)
363+
s.Require().Len(fetchedCharge.Realizations, 1)
364+
s.False(fetchedCharge.Realizations[0].DetailedLines.IsPresent())
365+
}
366+
367+
func (s *DetailedLineAdapterSuite) createChargeWithRun(namespace string) (usagebased.Charge, usagebased.RealizationRunBase, timeutil.ClosedPeriod) {
368+
s.T().Helper()
369+
370+
featureID := ulid.Make().String()
371+
customerID := s.createCustomer(namespace)
372+
s.createFeature(namespace, featureID)
373+
374+
servicePeriod := timeutil.ClosedPeriod{
375+
From: time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC),
376+
To: time.Date(2026, 2, 1, 0, 0, 0, 0, time.UTC),
377+
}
378+
379+
createdCharges, err := s.adapter.CreateCharges(s.T().Context(), usagebased.CreateChargesInput{
380+
Namespace: namespace,
381+
Intents: []usagebased.CreateIntent{
382+
{
383+
Intent: usagebased.Intent{
384+
Intent: chargesmeta.Intent{
385+
Name: "usage-charge",
386+
ManagedBy: billing.SubscriptionManagedLine,
387+
UniqueReferenceID: nil,
388+
CustomerID: customerID,
389+
Currency: currencyx.Code("USD"),
390+
ServicePeriod: servicePeriod,
391+
FullServicePeriod: servicePeriod,
392+
BillingPeriod: servicePeriod,
393+
},
394+
InvoiceAt: servicePeriod.To,
395+
SettlementMode: productcatalog.CreditOnlySettlementMode,
396+
FeatureKey: featureID,
397+
Price: *productcatalog.NewPriceFrom(productcatalog.UnitPrice{
398+
Amount: alpacadecimal.NewFromFloat(0.1),
399+
}),
400+
},
401+
FeatureID: featureID,
402+
},
403+
},
404+
})
405+
s.Require().NoError(err)
406+
s.Require().Len(createdCharges, 1)
407+
408+
charge := createdCharges[0]
409+
runBase, err := s.adapter.CreateRealizationRun(s.T().Context(), charge.GetChargeID(), usagebased.CreateRealizationRunInput{
410+
FeatureID: featureID,
411+
Type: usagebased.RealizationRunTypeFinalRealization,
412+
StoredAtLT: servicePeriod.To,
413+
ServicePeriodTo: servicePeriod.To,
414+
MeteredQuantity: alpacadecimal.NewFromInt(10),
415+
Totals: totals.Totals{
416+
Amount: alpacadecimal.NewFromInt(1),
417+
ChargesTotal: alpacadecimal.NewFromInt(1),
418+
Total: alpacadecimal.NewFromInt(1),
419+
},
420+
})
421+
s.Require().NoError(err)
422+
423+
return charge, runBase, servicePeriod
424+
}
425+
209426
func (s *DetailedLineAdapterSuite) createCustomer(namespace string) string {
210427
s.T().Helper()
211428

openmeter/billing/charges/usagebased/adapter/realizationrun.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func (a *adapter) CreateRealizationRun(ctx context.Context, chargeID meta.Charge
2929
SetType(input.Type).
3030
SetStoredAtLt(meta.NormalizeTimestamp(input.StoredAtLT)).
3131
SetServicePeriodTo(meta.NormalizeTimestamp(input.ServicePeriodTo)).
32+
SetDetailedLinesPresent(false).
3233
SetNillableBillingInvoiceLineID(input.LineID).
3334
SetMeteredQuantity(input.MeteredQuantity)
3435

openmeter/ent/db/chargeusagebasedruns.go

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

openmeter/ent/db/chargeusagebasedruns/chargeusagebasedruns.go

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)