Skip to content

Commit 6163a01

Browse files
authored
feat[charges]: common state machine operations (#4145)
1 parent bc4528f commit 6163a01

9 files changed

Lines changed: 604 additions & 195 deletions

File tree

openmeter/billing/charges/flatfee/charge.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,24 @@ type Charge struct {
7070
Realizations Realizations `json:"realizations"`
7171
}
7272

73+
func (c Charge) GetStatus() Status {
74+
return c.Status
75+
}
76+
77+
func (c Charge) WithStatus(status Status) Charge {
78+
c.Status = status
79+
return c
80+
}
81+
82+
func (c Charge) GetBase() ChargeBase {
83+
return c.ChargeBase
84+
}
85+
86+
func (c Charge) WithBase(base ChargeBase) Charge {
87+
c.ChargeBase = base
88+
return c
89+
}
90+
7391
func (c Charge) Validate() error {
7492
var errs []error
7593

openmeter/billing/charges/flatfee/service/creditsonly.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,5 +122,9 @@ func (s *CreditsOnlyStateMachine) DeleteCharge(ctx context.Context, policy meta.
122122
return fmt.Errorf("delete charge: %w", err)
123123
}
124124

125-
return s.refetchCharge(ctx)
125+
if err := s.RefetchCharge(ctx); err != nil {
126+
return fmt.Errorf("get charge: %w", err)
127+
}
128+
129+
return nil
126130
}

openmeter/billing/charges/flatfee/service/statemachine.go

Lines changed: 18 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,20 @@ import (
55
"errors"
66
"fmt"
77

8-
"github.com/qmuntal/stateless"
9-
108
"github.com/openmeterio/openmeter/openmeter/billing/charges/flatfee"
119
flatfeerealizations "github.com/openmeterio/openmeter/openmeter/billing/charges/flatfee/service/realizations"
1210
"github.com/openmeterio/openmeter/openmeter/billing/charges/meta"
13-
"github.com/openmeterio/openmeter/pkg/models"
11+
chargestatemachine "github.com/openmeterio/openmeter/openmeter/billing/charges/statemachine"
1412
)
1513

1614
type stateMachine struct {
17-
*stateless.StateMachine
18-
19-
Charge flatfee.Charge
15+
*chargestatemachine.Machine[flatfee.Charge, flatfee.ChargeBase, flatfee.Status]
2016

2117
Adapter flatfee.Adapter
2218
Realizations *flatfeerealizations.Service
2319
}
2420

25-
type StateMachine interface {
26-
AdvanceUntilStateStable(ctx context.Context) (*flatfee.Charge, error)
27-
CanFire(ctx context.Context, trigger meta.Trigger) (bool, error)
28-
FireAndActivate(ctx context.Context, trigger meta.Trigger, args ...any) error
29-
GetCharge() flatfee.Charge
30-
}
21+
type StateMachine = chargestatemachine.StateMachine[flatfee.Charge]
3122

3223
type StateMachineConfig struct {
3324
Charge flatfee.Charge
@@ -60,100 +51,29 @@ func newStateMachineBase(config StateMachineConfig) (*stateMachine, error) {
6051
}
6152

6253
out := &stateMachine{
63-
Charge: config.Charge,
6454
Adapter: config.Adapter,
6555
Realizations: config.Realizations,
6656
}
6757

68-
stateMachine := stateless.NewStateMachineWithExternalStorage(
69-
func(ctx context.Context) (stateless.State, error) {
70-
return out.Charge.Status, nil
71-
},
72-
func(ctx context.Context, state stateless.State) error {
73-
newStatus := state.(flatfee.Status)
74-
if err := newStatus.Validate(); err != nil {
75-
return fmt.Errorf("invalid status: %w", err)
76-
}
77-
78-
out.Charge.Status = newStatus
79-
return nil
58+
machine, err := chargestatemachine.New(chargestatemachine.Config[flatfee.Charge, flatfee.ChargeBase, flatfee.Status]{
59+
Charge: config.Charge,
60+
Persistence: chargestatemachine.Persistence[flatfee.Charge, flatfee.ChargeBase]{
61+
UpdateBase: func(ctx context.Context, base flatfee.ChargeBase) (flatfee.ChargeBase, error) {
62+
return out.Adapter.UpdateCharge(ctx, base)
63+
},
64+
Refetch: func(ctx context.Context, chargeID meta.ChargeID) (flatfee.Charge, error) {
65+
return out.Adapter.GetByID(ctx, flatfee.GetByIDInput{
66+
ChargeID: chargeID,
67+
Expands: meta.Expands{meta.ExpandRealizations},
68+
})
69+
},
8070
},
81-
stateless.FiringImmediate,
82-
)
83-
84-
out.StateMachine = stateMachine
85-
86-
return out, nil
87-
}
88-
89-
var ErrUnsupportedOperation = models.NewGenericPreConditionFailedError(fmt.Errorf("unsupported operation"))
90-
91-
func (s *stateMachine) CanFire(ctx context.Context, trigger meta.Trigger) (bool, error) {
92-
return s.StateMachine.CanFireCtx(ctx, trigger)
93-
}
94-
95-
func (s *stateMachine) FireAndActivate(ctx context.Context, trigger meta.Trigger, args ...any) error {
96-
canFire, err := s.CanFire(ctx, trigger)
97-
if err != nil {
98-
return err
99-
}
100-
101-
if !canFire {
102-
return fmt.Errorf("%w: %s [status=%s,id=%s]", ErrUnsupportedOperation, trigger, s.Charge.Status, s.Charge.ID)
103-
}
104-
105-
if err := s.StateMachine.FireCtx(ctx, trigger, args...); err != nil {
106-
return err
107-
}
108-
109-
return s.StateMachine.ActivateCtx(ctx)
110-
}
111-
112-
func (s *stateMachine) AdvanceUntilStateStable(ctx context.Context) (*flatfee.Charge, error) {
113-
var advanced bool
114-
115-
for {
116-
canFire, err := s.StateMachine.CanFireCtx(ctx, meta.TriggerNext)
117-
if err != nil {
118-
return nil, err
119-
}
120-
121-
if !canFire {
122-
if !advanced {
123-
return nil, nil
124-
}
125-
126-
charge := s.Charge
127-
return &charge, nil
128-
}
129-
130-
if err := s.FireAndActivate(ctx, meta.TriggerNext); err != nil {
131-
return nil, fmt.Errorf("cannot transition to the next status [current_status=%s]: %w", s.Charge.Status, err)
132-
}
133-
134-
updatedChargeBase, err := s.Adapter.UpdateCharge(ctx, s.Charge.ChargeBase)
135-
if err != nil {
136-
return nil, fmt.Errorf("persist charge: %w", err)
137-
}
138-
139-
s.Charge.ChargeBase = updatedChargeBase
140-
141-
advanced = true
142-
}
143-
}
144-
145-
func (s *stateMachine) refetchCharge(ctx context.Context) error {
146-
charge, err := s.Adapter.GetByID(ctx, flatfee.GetByIDInput{
147-
ChargeID: s.Charge.GetChargeID(),
14871
})
14972
if err != nil {
150-
return fmt.Errorf("get charge: %w", err)
73+
return nil, fmt.Errorf("new machine: %w", err)
15174
}
15275

153-
s.Charge = charge
154-
return nil
155-
}
76+
out.Machine = machine
15677

157-
func (s *stateMachine) GetCharge() flatfee.Charge {
158-
return s.Charge
78+
return out, nil
15979
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package statemachine
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"github.com/qmuntal/stateless"
9+
10+
"github.com/openmeterio/openmeter/openmeter/billing/charges/meta"
11+
"github.com/openmeterio/openmeter/pkg/models"
12+
)
13+
14+
type Status interface {
15+
~string
16+
Validate() error
17+
}
18+
19+
type ChargeLike[CHARGE any, BASE any, STATUS Status] interface {
20+
GetChargeID() meta.ChargeID
21+
GetStatus() STATUS
22+
WithStatus(STATUS) CHARGE
23+
GetBase() BASE
24+
WithBase(BASE) CHARGE
25+
}
26+
27+
type Persistence[CHARGE any, BASE any] struct {
28+
UpdateBase func(ctx context.Context, base BASE) (BASE, error)
29+
Refetch func(ctx context.Context, chargeID meta.ChargeID) (CHARGE, error)
30+
}
31+
32+
type Config[CHARGE ChargeLike[CHARGE, BASE, STATUS], BASE any, STATUS Status] struct {
33+
Charge CHARGE
34+
Persistence Persistence[CHARGE, BASE]
35+
}
36+
37+
type StateMachine[CHARGE any] interface {
38+
AdvanceUntilStateStable(ctx context.Context) (*CHARGE, error)
39+
CanFire(ctx context.Context, trigger meta.Trigger) (bool, error)
40+
FireAndActivate(ctx context.Context, trigger meta.Trigger, args ...any) error
41+
GetCharge() CHARGE
42+
}
43+
44+
func (c Config[CHARGE, BASE, STATUS]) Validate() error {
45+
var errs []error
46+
47+
if c.Persistence.UpdateBase == nil {
48+
errs = append(errs, errors.New("persistence.update base is required"))
49+
}
50+
51+
if c.Persistence.Refetch == nil {
52+
errs = append(errs, errors.New("persistence.refetch is required"))
53+
}
54+
55+
return errors.Join(errs...)
56+
}
57+
58+
type Machine[CHARGE ChargeLike[CHARGE, BASE, STATUS], BASE any, STATUS Status] struct {
59+
Charge CHARGE
60+
stateMachine *stateless.StateMachine
61+
config Config[CHARGE, BASE, STATUS]
62+
}
63+
64+
func New[CHARGE ChargeLike[CHARGE, BASE, STATUS], BASE any, STATUS Status](config Config[CHARGE, BASE, STATUS]) (*Machine[CHARGE, BASE, STATUS], error) {
65+
if err := config.Validate(); err != nil {
66+
return nil, fmt.Errorf("config: %w", err)
67+
}
68+
69+
out := &Machine[CHARGE, BASE, STATUS]{
70+
Charge: config.Charge,
71+
config: config,
72+
}
73+
74+
out.stateMachine = stateless.NewStateMachineWithExternalStorage(
75+
func(ctx context.Context) (stateless.State, error) {
76+
return out.Charge.GetStatus(), nil
77+
},
78+
func(ctx context.Context, state stateless.State) error {
79+
newStatus := state.(STATUS)
80+
if err := newStatus.Validate(); err != nil {
81+
return fmt.Errorf("invalid status: %w", err)
82+
}
83+
84+
out.Charge = out.Charge.WithStatus(newStatus)
85+
86+
return nil
87+
},
88+
stateless.FiringImmediate,
89+
)
90+
91+
return out, nil
92+
}
93+
94+
func (m *Machine[CHARGE, BASE, STATUS]) Configure(state STATUS) *stateless.StateConfiguration {
95+
return m.stateMachine.Configure(state)
96+
}
97+
98+
func (m *Machine[CHARGE, BASE, STATUS]) CanFire(ctx context.Context, trigger meta.Trigger) (bool, error) {
99+
return m.stateMachine.CanFireCtx(ctx, trigger)
100+
}
101+
102+
func (m *Machine[CHARGE, BASE, STATUS]) GetCharge() CHARGE {
103+
return m.Charge
104+
}
105+
106+
var ErrUnsupportedOperation = models.NewGenericPreConditionFailedError(fmt.Errorf("unsupported operation"))
107+
108+
func (m *Machine[CHARGE, BASE, STATUS]) FireAndActivate(ctx context.Context, trigger meta.Trigger, args ...any) error {
109+
canFire, err := m.CanFire(ctx, trigger)
110+
if err != nil {
111+
return err
112+
}
113+
114+
if !canFire {
115+
return fmt.Errorf(
116+
"%w: %s [status=%s,id=%s]",
117+
ErrUnsupportedOperation,
118+
trigger,
119+
m.Charge.GetStatus(),
120+
m.Charge.GetChargeID().ID,
121+
)
122+
}
123+
124+
if err := m.stateMachine.FireCtx(ctx, trigger, args...); err != nil {
125+
return err
126+
}
127+
128+
return m.stateMachine.ActivateCtx(ctx)
129+
}
130+
131+
func (m *Machine[CHARGE, BASE, STATUS]) AdvanceUntilStateStable(ctx context.Context) (*CHARGE, error) {
132+
var advanced bool
133+
134+
for {
135+
canFire, err := m.CanFire(ctx, meta.TriggerNext)
136+
if err != nil {
137+
return nil, err
138+
}
139+
140+
if !canFire {
141+
if !advanced {
142+
return nil, nil
143+
}
144+
145+
charge := m.Charge
146+
return &charge, nil
147+
}
148+
149+
currentStatus := m.Charge.GetStatus()
150+
151+
if err := m.FireAndActivate(ctx, meta.TriggerNext); err != nil {
152+
return nil, fmt.Errorf("cannot transition to the next status [current_status=%s]: %w", currentStatus, err)
153+
}
154+
155+
updatedBase, err := m.config.Persistence.UpdateBase(ctx, m.Charge.GetBase())
156+
if err != nil {
157+
return nil, fmt.Errorf("persist charge: %w", err)
158+
}
159+
160+
m.Charge = m.Charge.WithBase(updatedBase)
161+
162+
advanced = true
163+
}
164+
}
165+
166+
func (m *Machine[CHARGE, BASE, STATUS]) RefetchCharge(ctx context.Context) error {
167+
chargeID := m.Charge.GetChargeID()
168+
169+
charge, err := m.config.Persistence.Refetch(ctx, chargeID)
170+
if err != nil {
171+
return err
172+
}
173+
174+
m.Charge = charge
175+
return nil
176+
}

0 commit comments

Comments
 (0)