Skip to content

Commit 1852a72

Browse files
feat: source-metronome net_balance refactor + cache sidecar
Source-metronome changes: - Switch to /v1/contracts/customerBalances/getNetBalance endpoint - Remove deprecated credit_grants (legacy /v1/credits/listGrants) and invoices streams - Add singleObject resource pattern for single-response endpoints - Add skipLimit flag for rate_cards (API rejects limit param) - Split webhook events into balance vs entitlement categories - Fix webhook backpressure (await push before HTTP 200) Cache sidecar (apps/cache-sidecar): - Standalone HTTP process for optimistic balance enforcement - Two Redis namespaces: sync:* (checkpoint, read-only) and optimistic:* (sidecar-owned) - GET /v1/balance/:cid — merged checkpoint + optimistic view - POST /v1/events — record pending event, decrement balance instantly - Reconciler polls every 5s, discards acknowledged events by watermark - Deduplication via deterministic sorted set members keyed by event_id - Fire-and-forget Metronome ingest forwarding - Integration test suite (25 tests) Pixel-app rewired to call sidecar instead of Redis/Metronome directly. E2E script updated for net_balance primary key (customer_id + credit_type_id). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Committed-By-Agent: claude
1 parent 965b132 commit 1852a72

18 files changed

Lines changed: 1650 additions & 204 deletions

File tree

apps/cache-sidecar/package.json

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"name": "@stripe/sync-cache-sidecar",
3+
"version": "0.1.0",
4+
"private": true,
5+
"description": "HTTP sidecar for optimistic balance enforcement over synced Metronome data",
6+
"type": "module",
7+
"exports": {
8+
".": {
9+
"bun": "./src/index.ts",
10+
"types": "./dist/index.d.ts",
11+
"import": "./dist/index.js"
12+
}
13+
},
14+
"scripts": {
15+
"build": "tsc",
16+
"dev": "tsx --watch --conditions bun src/index.ts",
17+
"start": "node dist/index.js",
18+
"test": "vitest run",
19+
"test:watch": "vitest"
20+
},
21+
"dependencies": {
22+
"@hono/node-server": "^1",
23+
"hono": "^4",
24+
"ioredis": "^5",
25+
"zod": "^4.3.6"
26+
},
27+
"devDependencies": {
28+
"@types/node": "^24.10.1",
29+
"tsx": "^4",
30+
"vitest": "^3.2.4"
31+
}
32+
}

apps/cache-sidecar/src/config.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { z } from 'zod'
2+
3+
const ConfigSchema = z.object({
4+
REDIS_URL: z.string().default('redis://localhost:56379'),
5+
CHECKPOINT_PREFIX: z.string().default('sync:'),
6+
OPTIMISTIC_PREFIX: z.string().default('optimistic:'),
7+
CREDIT_TYPE_ID: z.string().default('2714e483-4ff1-48e4-9e25-ac732e8f24f2'),
8+
METRONOME_API_TOKEN: z.string().optional(),
9+
METRONOME_BASE_URL: z.string().default('https://api.metronome.com'),
10+
PORT: z.coerce.number().default(4100),
11+
RECONCILE_INTERVAL_MS: z.coerce.number().default(5000),
12+
WATERMARK_BUFFER_MS: z.coerce.number().default(10000),
13+
FIXED_EVENT_COST: z.coerce.number().positive().default(1),
14+
})
15+
16+
export type Config = z.infer<typeof ConfigSchema>
17+
18+
export function loadConfig(): Config {
19+
const result = ConfigSchema.safeParse(process.env)
20+
if (!result.success) {
21+
console.error('Invalid configuration:', result.error.format())
22+
process.exit(1)
23+
}
24+
return result.data
25+
}

apps/cache-sidecar/src/index.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { serve } from '@hono/node-server'
2+
import { loadConfig } from './config.js'
3+
import { FixedCostPricing } from './pricing.js'
4+
import { Reconciler } from './reconciler.js'
5+
import { createRedisClient } from './redis.js'
6+
import { createApp } from './server.js'
7+
8+
const config = loadConfig()
9+
const redis = createRedisClient(config)
10+
const pricing = new FixedCostPricing(config.FIXED_EVENT_COST)
11+
const reconciler = new Reconciler(redis, config)
12+
13+
const app = createApp({ redis, config, pricing })
14+
15+
const server = serve({ fetch: app.fetch, port: config.PORT }, (info) => {
16+
console.log(
17+
JSON.stringify({
18+
msg: 'cache-sidecar started',
19+
port: info.port,
20+
redis_url: config.REDIS_URL,
21+
reconcile_interval_ms: config.RECONCILE_INTERVAL_MS,
22+
})
23+
)
24+
})
25+
26+
reconciler.start()
27+
28+
function shutdown() {
29+
console.log(JSON.stringify({ msg: 'shutting down' }))
30+
reconciler.stop()
31+
redis.disconnect()
32+
server.close()
33+
process.exit(0)
34+
}
35+
36+
process.on('SIGTERM', shutdown)
37+
process.on('SIGINT', shutdown)

apps/cache-sidecar/src/pricing.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
export interface PricingStrategy {
2+
estimateCost(eventType: string, properties?: Record<string, unknown>): number
3+
}
4+
5+
/**
6+
* Fixed-cost pricing: every event costs the same amount of credits.
7+
* Good enough for MVP — swap in a lookup-based strategy later.
8+
*/
9+
export class FixedCostPricing implements PricingStrategy {
10+
constructor(private readonly cost: number) {}
11+
12+
estimateCost(_eventType: string, _properties?: Record<string, unknown>): number {
13+
return this.cost
14+
}
15+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import type { Config } from './config.js'
2+
import type { Redis } from './redis.js'
3+
import { pendingSetKey, writeOptimisticBalance } from './redis.js'
4+
import type { CheckpointData, PendingEvent } from './redis.js'
5+
6+
export class Reconciler {
7+
private timer: ReturnType<typeof setInterval> | null = null
8+
9+
constructor(
10+
private readonly redis: Redis,
11+
private readonly config: Config
12+
) {}
13+
14+
start(): void {
15+
if (this.timer) return
16+
console.log(
17+
JSON.stringify({
18+
msg: 'reconciler started',
19+
interval_ms: this.config.RECONCILE_INTERVAL_MS,
20+
})
21+
)
22+
this.timer = setInterval(() => this.reconcile(), this.config.RECONCILE_INTERVAL_MS)
23+
// Run immediately on start
24+
void this.reconcile()
25+
}
26+
27+
stop(): void {
28+
if (this.timer) {
29+
clearInterval(this.timer)
30+
this.timer = null
31+
console.log(JSON.stringify({ msg: 'reconciler stopped' }))
32+
}
33+
}
34+
35+
async reconcile(): Promise<void> {
36+
try {
37+
const pattern = `${this.config.CHECKPOINT_PREFIX}net_balance:*`
38+
let cursor = '0'
39+
let processed = 0
40+
41+
do {
42+
const [nextCursor, keys] = await this.redis.scan(
43+
cursor,
44+
'MATCH',
45+
pattern,
46+
'COUNT',
47+
100
48+
)
49+
cursor = nextCursor
50+
51+
for (const key of keys) {
52+
await this.reconcileKey(key)
53+
processed++
54+
}
55+
} while (cursor !== '0')
56+
57+
if (processed > 0) {
58+
console.log(JSON.stringify({ msg: 'reconcile cycle', keys_processed: processed }))
59+
}
60+
} catch (err) {
61+
console.error(
62+
JSON.stringify({
63+
msg: 'reconcile error',
64+
error: err instanceof Error ? err.message : String(err),
65+
})
66+
)
67+
}
68+
}
69+
70+
private async reconcileKey(key: string): Promise<void> {
71+
const raw = await this.redis.get(key)
72+
if (!raw) return
73+
74+
const checkpoint = JSON.parse(raw) as CheckpointData
75+
const customerId = checkpoint.customer_id
76+
77+
const syncedAtMs = Number(checkpoint._synced_at) * 1000
78+
const cutoff = syncedAtMs - this.config.WATERMARK_BUFFER_MS
79+
80+
const pendingKey = pendingSetKey(this.config, customerId)
81+
82+
// Remove events older than cutoff
83+
await this.redis.zremrangebyscore(pendingKey, '-inf', cutoff)
84+
85+
// Get remaining events and sum costs
86+
const members = await this.redis.zrange(pendingKey, 0, -1)
87+
let totalCost = 0
88+
for (const member of members) {
89+
const event = JSON.parse(member) as PendingEvent
90+
totalCost += event.estimated_cost
91+
}
92+
93+
// Write optimistic balance
94+
const optimisticBalance = checkpoint.balance - totalCost
95+
await writeOptimisticBalance(
96+
this.redis,
97+
this.config,
98+
customerId,
99+
optimisticBalance,
100+
members.length
101+
)
102+
}
103+
}

apps/cache-sidecar/src/redis.ts

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import { Redis } from 'ioredis'
2+
import type { Config } from './config.js'
3+
4+
export interface CheckpointData {
5+
balance: number
6+
customer_id: string
7+
credit_type_id: string
8+
_synced_at: number
9+
}
10+
11+
export interface PendingEvent {
12+
event_id: string
13+
event_type: string
14+
estimated_cost: number
15+
timestamp: number
16+
properties?: Record<string, unknown>
17+
}
18+
19+
export interface OptimisticBalanceHash {
20+
value: string
21+
computed_at: string
22+
event_count: string
23+
}
24+
25+
export type { Redis }
26+
27+
export function createRedisClient(config: Config): Redis {
28+
const client = new Redis(config.REDIS_URL, { maxRetriesPerRequest: 3 })
29+
client.on('error', (err) => {
30+
console.error(JSON.stringify({ msg: 'redis error', error: err.message }))
31+
})
32+
return client
33+
}
34+
35+
export function checkpointKey(config: Config, customerId: string): string {
36+
return `${config.CHECKPOINT_PREFIX}net_balance:${customerId}:${config.CREDIT_TYPE_ID}`
37+
}
38+
39+
export function pendingSetKey(config: Config, customerId: string): string {
40+
return `${config.OPTIMISTIC_PREFIX}pending:${customerId}`
41+
}
42+
43+
export function optimisticBalanceKey(config: Config, customerId: string): string {
44+
return `${config.OPTIMISTIC_PREFIX}balance:${customerId}`
45+
}
46+
47+
export async function getCheckpoint(
48+
redis: Redis,
49+
config: Config,
50+
customerId: string
51+
): Promise<CheckpointData | null> {
52+
const raw = await redis.get(checkpointKey(config, customerId))
53+
if (!raw) return null
54+
return JSON.parse(raw) as CheckpointData
55+
}
56+
57+
export async function getPendingEvents(
58+
redis: Redis,
59+
config: Config,
60+
customerId: string
61+
): Promise<PendingEvent[]> {
62+
const members = await redis.zrange(pendingSetKey(config, customerId), 0, -1)
63+
return members.map((m) => JSON.parse(m) as PendingEvent)
64+
}
65+
66+
export async function sumPendingCosts(
67+
redis: Redis,
68+
config: Config,
69+
customerId: string
70+
): Promise<{ total: number; count: number }> {
71+
const events = await getPendingEvents(redis, config, customerId)
72+
const total = events.reduce((sum, e) => sum + e.estimated_cost, 0)
73+
return { total, count: events.length }
74+
}
75+
76+
export async function writeOptimisticBalance(
77+
redis: Redis,
78+
config: Config,
79+
customerId: string,
80+
value: number,
81+
eventCount: number
82+
): Promise<void> {
83+
const key = optimisticBalanceKey(config, customerId)
84+
await redis.hset(key, {
85+
value: String(value),
86+
computed_at: String(Date.now()),
87+
event_count: String(eventCount),
88+
})
89+
}
90+
91+
export async function getOptimisticBalance(
92+
redis: Redis,
93+
config: Config,
94+
customerId: string
95+
): Promise<OptimisticBalanceHash | null> {
96+
const data = await redis.hgetall(optimisticBalanceKey(config, customerId))
97+
if (!data || !data.value) return null
98+
return data as unknown as OptimisticBalanceHash
99+
}

0 commit comments

Comments
 (0)