Skip to content

Commit 1e60d8e

Browse files
authored
feat: implement realtime expand (#4155)
1 parent 7b40d30 commit 1e60d8e

8 files changed

Lines changed: 343 additions & 14 deletions

File tree

api/v3/handlers/customers/charges/convert.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,15 @@ func convertChargeToAPI(charge billingcharges.Charge) (api.BillingCharge, error)
157157

158158
// convertUsageBasedChargeTotals aggregates booked totals from persisted realization runs.
159159
func convertUsageBasedChargeTotals(charge usagebased.Charge) api.BillingChargeTotals {
160-
return api.BillingChargeTotals{
160+
out := api.BillingChargeTotals{
161161
Booked: toAPIBillingTotals(charge.Realizations.Sum()),
162162
}
163+
164+
if charge.Expands.RealtimeUsage != nil {
165+
out.Realtime = lo.ToPtr(toAPIBillingTotals(*charge.Expands.RealtimeUsage))
166+
}
167+
168+
return out
163169
}
164170

165171
// toAPIBillingTotals maps a domain totals.Totals to the API BillingTotals type.

api/v3/handlers/customers/charges/list.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"slices"
78
"strings"
89

910
"github.com/samber/lo"
@@ -56,14 +57,19 @@ func (h *handler) ListCustomerCharges() ListCustomerChargesHandler {
5657
})
5758
}
5859

60+
// Realization runs are always required to compute booked totals.
61+
expands := meta.Expands{meta.ExpandRealizations}
62+
if args.Params.Expand != nil && slices.Contains(*args.Params.Expand, api.BillingChargesExpandRealTimeUsage) {
63+
expands = expands.With(meta.ExpandRealtimeUsage)
64+
}
65+
5966
req := ListCustomerChargesRequest{
6067
Page: page,
6168
Namespace: ns,
6269
CustomerIDs: []string{args.CustomerID},
6370
// Credit purchases are served by the credit grants API; exclude them here.
6471
ChargeTypes: []meta.ChargeType{meta.ChargeTypeFlatFee, meta.ChargeTypeUsageBased},
65-
// Realization runs are always required to compute booked totals.
66-
Expands: meta.Expands{meta.ExpandRealizations},
72+
Expands: expands,
6773
}
6874

6975
// Parse sort. When omitted, the service defaults to created_at ascending

openmeter/billing/charges/adapter/search.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ func (a *adapter) ListCharges(ctx context.Context, input charges.ListChargesInpu
7070

7171
if len(input.StatusIn) > 0 {
7272
query = query.Where(dbchargessearchv1.StatusIn(input.StatusIn...))
73-
} else if len(input.StatusNotIn) > 0 {
73+
}
74+
75+
if len(input.StatusNotIn) > 0 {
7476
query = query.Where(dbchargessearchv1.StatusNotIn(input.StatusNotIn...))
7577
}
7678

openmeter/billing/charges/meta/charge.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,14 @@ func (t ChargeType) Validate() error {
6767
type Expand string
6868

6969
const (
70-
ExpandRealizations Expand = "realizations"
70+
ExpandRealizations Expand = "realizations"
71+
ExpandRealtimeUsage Expand = "realtime_usage"
7172
)
7273

7374
func (e Expand) Values() []Expand {
7475
return []Expand{
7576
ExpandRealizations,
77+
ExpandRealtimeUsage,
7678
}
7779
}
7880

openmeter/billing/charges/service.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,9 @@ type ListChargesInput struct {
144144
CustomerIDs []string
145145
SubscriptionIDs []string
146146
ChargeTypes []meta.ChargeType
147-
// StatusIn filters to only charges with one of the given statuses.
148-
// Takes precedence over StatusNotIn when both are set.
149-
// When empty and StatusNotIn is also empty, deleted charges are still
150-
// excluded via the IncludeDeleted flag.
151-
StatusIn []meta.ChargeStatus
152-
StatusNotIn []meta.ChargeStatus
153-
IncludeDeleted bool
147+
StatusIn []meta.ChargeStatus
148+
StatusNotIn []meta.ChargeStatus
149+
IncludeDeleted bool
154150

155151
// OrderBy is the field to sort by. Supported values: id, created_at,
156152
// service_period.from, billing_period.from.
@@ -198,6 +194,10 @@ func (i ListChargesInput) Validate() error {
198194
}
199195
}
200196

197+
if len(i.StatusIn) > 0 && len(i.StatusNotIn) > 0 {
198+
errs = append(errs, errors.New("status_in and status_not_in cannot be set at the same time"))
199+
}
200+
201201
if err := i.Expands.Validate(); err != nil {
202202
errs = append(errs, fmt.Errorf("expands: %w", err))
203203
}

openmeter/billing/charges/usagebased/charge.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"github.com/samber/lo"
99

1010
"github.com/openmeterio/openmeter/openmeter/billing/charges/meta"
11+
"github.com/openmeterio/openmeter/openmeter/billing/models/totals"
12+
"github.com/openmeterio/openmeter/openmeter/customer"
1113
"github.com/openmeterio/openmeter/openmeter/productcatalog"
1214
"github.com/openmeterio/openmeter/openmeter/productcatalog/feature"
1315
"github.com/openmeterio/openmeter/pkg/models"
@@ -68,6 +70,7 @@ type Charge struct {
6870
ChargeBase
6971

7072
Realizations RealizationRuns `json:"realizations"`
73+
Expands Expands `json:"expands"`
7174
}
7275

7376
type Charges []Charge
@@ -79,6 +82,10 @@ func (c Charge) Validate() error {
7982
errs = append(errs, fmt.Errorf("charge base: %w", err))
8083
}
8184

85+
if err := c.Expands.Validate(); err != nil {
86+
errs = append(errs, fmt.Errorf("expands: %w", err))
87+
}
88+
8289
return models.NewNillableGenericValidationError(errors.Join(errs...))
8390
}
8491

@@ -90,6 +97,13 @@ func (c Charge) GetCurrentRealizationRun() (RealizationRun, error) {
9097
return c.Realizations.GetByID(*c.State.CurrentRealizationRunID)
9198
}
9299

100+
func (c Charge) GetCustomerID() customer.CustomerID {
101+
return customer.CustomerID{
102+
Namespace: c.Namespace,
103+
ID: c.Intent.CustomerID,
104+
}
105+
}
106+
93107
func (c Charge) GetFeatureKeyOrID() ref.IDOrKey {
94108
switch c.Status {
95109
case StatusCreated:
@@ -213,3 +227,19 @@ func (s State) Validate() error {
213227

214228
return models.NewNillableGenericValidationError(errors.Join(errs...))
215229
}
230+
231+
type Expands struct {
232+
RealtimeUsage *totals.Totals `json:"realtimeUsage,omitempty"`
233+
}
234+
235+
func (e Expands) Validate() error {
236+
var errs []error
237+
238+
if e.RealtimeUsage != nil {
239+
if err := e.RealtimeUsage.Validate(); err != nil {
240+
errs = append(errs, fmt.Errorf("realtime usage: %w", err))
241+
}
242+
}
243+
244+
return models.NewNillableGenericValidationError(errors.Join(errs...))
245+
}

openmeter/billing/charges/usagebased/service/get.go

Lines changed: 164 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,27 @@ package service
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
58

9+
"github.com/samber/lo"
10+
"golang.org/x/sync/semaphore"
11+
12+
"github.com/openmeterio/openmeter/openmeter/billing"
13+
"github.com/openmeterio/openmeter/openmeter/billing/charges/meta"
614
"github.com/openmeterio/openmeter/openmeter/billing/charges/usagebased"
15+
usagebasedrating "github.com/openmeterio/openmeter/openmeter/billing/charges/usagebased/service/rating"
16+
"github.com/openmeterio/openmeter/openmeter/customer"
17+
"github.com/openmeterio/openmeter/pkg/clock"
718
"github.com/openmeterio/openmeter/pkg/framework/transaction"
19+
"github.com/openmeterio/openmeter/pkg/ref"
20+
"github.com/openmeterio/openmeter/pkg/slicesx"
21+
)
22+
23+
const (
24+
// defaultMaxParallelRatingsPerRequest is the number of workers to use for the rating (fetching from CH).
25+
defaultMaxParallelRatingsPerRequest = 5
826
)
927

1028
func (s *service) GetByIDs(ctx context.Context, input usagebased.GetByIDsInput) ([]usagebased.Charge, error) {
@@ -13,7 +31,19 @@ func (s *service) GetByIDs(ctx context.Context, input usagebased.GetByIDsInput)
1331
}
1432

1533
return transaction.Run(ctx, s.adapter, func(ctx context.Context) ([]usagebased.Charge, error) {
16-
return s.adapter.GetByIDs(ctx, input)
34+
charges, err := s.adapter.GetByIDs(ctx, input)
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
if input.Expands.Has(meta.ExpandRealtimeUsage) {
40+
charges, err = s.expandChargesUsage(ctx, input.Namespace, charges)
41+
if err != nil {
42+
return nil, err
43+
}
44+
}
45+
46+
return charges, nil
1747
})
1848
}
1949

@@ -23,6 +53,138 @@ func (s *service) GetByID(ctx context.Context, input usagebased.GetByIDInput) (u
2353
}
2454

2555
return transaction.Run(ctx, s.adapter, func(ctx context.Context) (usagebased.Charge, error) {
26-
return s.adapter.GetByID(ctx, input)
56+
charge, err := s.adapter.GetByID(ctx, input)
57+
if err != nil {
58+
return usagebased.Charge{}, err
59+
}
60+
61+
if input.Expands.Has(meta.ExpandRealtimeUsage) {
62+
totals, err := s.GetCurrentTotals(ctx, usagebased.GetCurrentTotalsInput{
63+
ChargeID: charge.GetChargeID(),
64+
})
65+
if err != nil {
66+
return usagebased.Charge{}, err
67+
}
68+
69+
charge.Expands.RealtimeUsage = &totals.DueTotals
70+
}
71+
72+
return charge, nil
73+
})
74+
}
75+
76+
func (s *service) expandChargesUsage(ctx context.Context, namespace string, charges usagebased.Charges) (usagebased.Charges, error) {
77+
// Fetch unique customers from the charges to avoid duplicate calls to the customer override service.
78+
uniqueCustomers := lo.Uniq(lo.Map(charges, func(charge usagebased.Charge, _ int) customer.CustomerID {
79+
return charge.GetCustomerID()
80+
}))
81+
82+
customerOverridesById := make(map[customer.CustomerID]billing.CustomerOverrideWithDetails)
83+
for _, customerID := range uniqueCustomers {
84+
customerOverride, err := s.customerOverrideService.GetCustomerOverride(ctx, billing.GetCustomerOverrideInput{
85+
Customer: customerID,
86+
Expand: billing.CustomerOverrideExpand{
87+
Customer: true,
88+
},
89+
})
90+
if err != nil {
91+
return nil, err
92+
}
93+
customerOverridesById[customerID] = customerOverride
94+
}
95+
96+
// Fetch all references featureMeters in bulk
97+
referencedFeatureMeters := lo.Uniq(lo.Map(charges, func(charge usagebased.Charge, _ int) ref.IDOrKey {
98+
return charge.GetFeatureKeyOrID()
99+
}))
100+
101+
featureMeters, err := s.featureService.ResolveFeatureMeters(ctx, namespace, referencedFeatureMeters...)
102+
if err != nil {
103+
return nil, err
104+
}
105+
106+
// Let's do the rating for each charge
107+
sem := semaphore.NewWeighted(int64(defaultMaxParallelRatingsPerRequest))
108+
storedAt := clock.Now()
109+
110+
errCh := make(chan error, len(charges))
111+
ratingResults := sync.Map{}
112+
113+
var wg sync.WaitGroup
114+
115+
for _, charge := range charges {
116+
featureMeter, err := charge.ResolveFeatureMeter(featureMeters)
117+
if err != nil {
118+
errCh <- fmt.Errorf("resolving feature meter: %w", err)
119+
break
120+
}
121+
122+
err = sem.Acquire(ctx, 1)
123+
if err != nil {
124+
// Clean up and stop the loop
125+
errCh <- fmt.Errorf("acquiring worker slot: %w", err)
126+
break
127+
}
128+
129+
wg.Go(func() {
130+
defer sem.Release(1)
131+
var err error
132+
defer func() {
133+
if err != nil {
134+
errCh <- err
135+
}
136+
}()
137+
138+
defer func() {
139+
if r := recover(); r != nil {
140+
err = fmt.Errorf("rating charge %s: %v", charge.ID, r)
141+
}
142+
}()
143+
144+
var ratingResult usagebasedrating.GetRatingForUsageResult
145+
ratingResult, err = s.rater.GetRatingForUsage(ctx, usagebasedrating.GetRatingForUsageInput{
146+
Charge: charge,
147+
Customer: customerOverridesById[charge.GetCustomerID()],
148+
FeatureMeter: featureMeter,
149+
StoredAtOffset: storedAt,
150+
})
151+
if err != nil {
152+
err = fmt.Errorf("rating charge %s: %w", charge.ID, err)
153+
return
154+
}
155+
156+
ratingResults.Store(charge.GetChargeID(), ratingResult)
157+
})
158+
}
159+
160+
wg.Wait()
161+
162+
close(errCh)
163+
164+
var errs []error
165+
166+
for err := range errCh {
167+
if err != nil {
168+
errs = append(errs, err)
169+
}
170+
}
171+
172+
if len(errs) > 0 {
173+
return nil, errors.Join(errs...)
174+
}
175+
176+
return slicesx.MapWithErr(charges, func(charge usagebased.Charge) (usagebased.Charge, error) {
177+
ratingResultAny, ok := ratingResults.Load(charge.GetChargeID())
178+
if !ok {
179+
return charge, fmt.Errorf("rating result not found for charge %s", charge.ID)
180+
}
181+
182+
ratingResult, ok := ratingResultAny.(usagebasedrating.GetRatingForUsageResult)
183+
if !ok {
184+
return charge, fmt.Errorf("rating result not found for charge %s", charge.ID)
185+
}
186+
187+
charge.Expands.RealtimeUsage = &ratingResult.Totals
188+
return charge, nil
27189
})
28190
}

0 commit comments

Comments
 (0)