Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 78 additions & 24 deletions backend/internal/service/payment_fulfillment.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"time"

"entgo.io/ent/dialect"

dbent "github.com/Wei-Shaw/sub2api/ent"
"github.com/Wei-Shaw/sub2api/ent/paymentauditlog"
"github.com/Wei-Shaw/sub2api/ent/paymentorder"
Expand Down Expand Up @@ -429,16 +431,22 @@ func (s *PaymentService) doSub(ctx context.Context, o *dbent.PaymentOrder) error
if err != nil || g.Status != payment.EntityStatusActive {
return fmt.Errorf("group %d no longer exists or inactive", gid)
}
// Idempotency: check audit log to see if subscription was already assigned.
// Prevents double-extension on retry after markCompleted fails.
if s.hasAuditLog(ctx, o.ID, "SUBSCRIPTION_SUCCESS") {
assigned := s.hasAuditLog(ctx, o.ID, "SUBSCRIPTION_ASSIGNED") || s.hasAuditLog(ctx, o.ID, "SUBSCRIPTION_SUCCESS")
if !assigned {
orderNote := fmt.Sprintf("payment order %d", o.ID)
_, _, err = s.subscriptionSvc.AssignOrExtendSubscription(ctx, &AssignSubscriptionInput{UserID: o.UserID, GroupID: gid, ValidityDays: days, AssignedBy: 0, Notes: orderNote})
if err != nil {
return fmt.Errorf("assign subscription: %w", err)
}
s.writeAuditLog(ctx, o.ID, "SUBSCRIPTION_ASSIGNED", "system", map[string]any{
"groupID": gid,
"validityDays": days,
})
} else {
slog.Info("subscription already assigned for order, skipping", "orderID", o.ID, "groupID", gid)
return s.markCompleted(ctx, o, "SUBSCRIPTION_SUCCESS")
}
orderNote := fmt.Sprintf("payment order %d", o.ID)
_, _, err = s.subscriptionSvc.AssignOrExtendSubscription(ctx, &AssignSubscriptionInput{UserID: o.UserID, GroupID: gid, ValidityDays: days, AssignedBy: 0, Notes: orderNote})
if err != nil {
return fmt.Errorf("assign subscription: %w", err)
if err := s.applyAffiliateRebateForOrder(ctx, o); err != nil {
return err
}
return s.markCompleted(ctx, o, "SUBSCRIPTION_SUCCESS")
}
Expand All @@ -452,7 +460,8 @@ func (s *PaymentService) hasAuditLog(ctx context.Context, orderID int64, action
}

func (s *PaymentService) applyAffiliateRebateForOrder(ctx context.Context, o *dbent.PaymentOrder) error {
if o == nil || o.OrderType != payment.OrderTypeBalance || o.Amount <= 0 {
baseAmount := affiliateRebateBaseAmount(o)
if o == nil || baseAmount <= 0 {
return nil
}
if s.affiliateService == nil {
Expand All @@ -469,7 +478,7 @@ func (s *PaymentService) applyAffiliateRebateForOrder(ctx context.Context, o *db
defer func() { _ = tx.Rollback() }()

txCtx := dbent.NewTxContext(ctx, tx)
claimed, err := s.tryClaimAffiliateRebateAudit(txCtx, tx.Client(), o.ID, o.Amount)
claimed, err := s.tryClaimAffiliateRebateAudit(txCtx, tx.Client(), o.ID, baseAmount)
if err != nil {
s.writeAuditLog(ctx, o.ID, "AFFILIATE_REBATE_FAILED", "system", map[string]any{
"error": err.Error(),
Expand All @@ -481,7 +490,7 @@ func (s *PaymentService) applyAffiliateRebateForOrder(ctx context.Context, o *db
}

sourceOrderID := o.ID
rebateAmount, err := s.affiliateService.AccrueInviteRebateForOrder(txCtx, o.UserID, o.Amount, &sourceOrderID)
rebateAmount, err := s.affiliateService.AccrueInviteRebateForOrder(txCtx, o.UserID, baseAmount, &sourceOrderID)
if err != nil {
s.writeAuditLog(ctx, o.ID, "AFFILIATE_REBATE_FAILED", "system", map[string]any{
"error": err.Error(),
Expand All @@ -491,7 +500,7 @@ func (s *PaymentService) applyAffiliateRebateForOrder(ctx context.Context, o *db

if rebateAmount <= 0 {
if err := s.updateClaimedAffiliateRebateAudit(txCtx, tx.Client(), o.ID, "AFFILIATE_REBATE_SKIPPED", map[string]any{
"baseAmount": o.Amount,
"baseAmount": baseAmount,
"reason": "no inviter bound or rebate amount <= 0",
}); err != nil {
s.writeAuditLog(ctx, o.ID, "AFFILIATE_REBATE_FAILED", "system", map[string]any{
Expand All @@ -509,7 +518,7 @@ func (s *PaymentService) applyAffiliateRebateForOrder(ctx context.Context, o *db
}

if err := s.updateClaimedAffiliateRebateAudit(txCtx, tx.Client(), o.ID, "AFFILIATE_REBATE_APPLIED", map[string]any{
"baseAmount": o.Amount,
"baseAmount": baseAmount,
"rebateAmount": rebateAmount,
}); err != nil {
s.writeAuditLog(ctx, o.ID, "AFFILIATE_REBATE_FAILED", "system", map[string]any{
Expand All @@ -527,6 +536,18 @@ func (s *PaymentService) applyAffiliateRebateForOrder(ctx context.Context, o *db
return nil
}

func affiliateRebateBaseAmount(o *dbent.PaymentOrder) float64 {
if o == nil {
return 0
}
switch o.OrderType {
case payment.OrderTypeBalance, payment.OrderTypeSubscription:
return o.Amount
default:
return 0
}
}

func (s *PaymentService) tryClaimAffiliateRebateAudit(ctx context.Context, client *dbent.Client, orderID int64, baseAmount float64) (bool, error) {
if client == nil {
return false, errors.New("nil payment client")
Expand All @@ -536,17 +557,8 @@ func (s *PaymentService) tryClaimAffiliateRebateAudit(ctx context.Context, clien
"baseAmount": baseAmount,
"status": "reserved",
})
rows, err := client.QueryContext(ctx, `
INSERT INTO payment_audit_logs (order_id, action, detail, operator, created_at)
SELECT $1::text, 'AFFILIATE_REBATE_APPLIED', $2::text, 'system', NOW()
WHERE NOT EXISTS (
SELECT 1
FROM payment_audit_logs
WHERE order_id = $1::text
AND action IN ('AFFILIATE_REBATE_APPLIED', 'AFFILIATE_REBATE_SKIPPED')
)
ON CONFLICT (order_id, action) DO NOTHING
RETURNING id`, oid, string(detail))
query, args := buildAffiliateRebateAuditClaimQuery(client, oid, string(detail))
rows, err := client.QueryContext(ctx, query, args...)
if err != nil {
return false, err
}
Expand All @@ -564,6 +576,48 @@ RETURNING id`, oid, string(detail))
return true, nil
}

func buildAffiliateRebateAuditClaimQuery(client *dbent.Client, orderID, detail string) (string, []any) {
nowExpr := paymentAuditCurrentTimestampExpr(client)
if paymentAuditDialect(client) == dialect.Postgres {
return fmt.Sprintf(`
INSERT INTO payment_audit_logs (order_id, action, detail, operator, created_at)
SELECT $1::text, 'AFFILIATE_REBATE_APPLIED', $2::text, 'system', %s
WHERE NOT EXISTS (
SELECT 1
FROM payment_audit_logs
WHERE order_id = $1::text
AND action IN ('AFFILIATE_REBATE_APPLIED', 'AFFILIATE_REBATE_SKIPPED')
)
ON CONFLICT (order_id, action) DO NOTHING
RETURNING id`, nowExpr), []any{orderID, detail}
}
return fmt.Sprintf(`
INSERT INTO payment_audit_logs (order_id, action, detail, operator, created_at)
SELECT ?, 'AFFILIATE_REBATE_APPLIED', ?, 'system', %s
WHERE NOT EXISTS (
SELECT 1
FROM payment_audit_logs
WHERE order_id = ?
AND action IN ('AFFILIATE_REBATE_APPLIED', 'AFFILIATE_REBATE_SKIPPED')
)
ON CONFLICT (order_id, action) DO NOTHING
RETURNING id`, nowExpr), []any{orderID, detail, orderID}
}

func paymentAuditCurrentTimestampExpr(client *dbent.Client) string {
if paymentAuditDialect(client) == dialect.Postgres {
return "NOW()"
}
return "CURRENT_TIMESTAMP"
}

func paymentAuditDialect(client *dbent.Client) string {
if client == nil || client.Driver() == nil {
return ""
}
return client.Driver().Dialect()
}

func (s *PaymentService) updateClaimedAffiliateRebateAudit(ctx context.Context, client *dbent.Client, orderID int64, action string, detail map[string]any) error {
if client == nil {
return errors.New("nil payment client")
Expand Down
Loading
Loading