|
| 1 | +import asyncio, os, time |
| 2 | +from redis.asyncio import Redis |
| 3 | +import asyncpg |
| 4 | + |
| 5 | +GROUP = os.getenv("GROUP", "g_fuzz") |
| 6 | +CONSUMER = os.getenv("CONSUMER", "c_sync_1") |
| 7 | +STREAMS = os.getenv("STREAMS", "redis1=redis://redis1:6379,redis2=redis://redis2:6379").split(",") |
| 8 | +STREAM_NAME = "stream:fuzz:updates" |
| 9 | +PG_DSN = os.getenv("PG_DSN", "postgres://fuzzuser:pass@pg:5432/main") |
| 10 | + |
| 11 | +CREATE_GROUP_OK = {"OK", "BUSYGROUP Consumer Group name already exists"} |
| 12 | + |
| 13 | +UPSERT_SQL = """ |
| 14 | +INSERT INTO fuzz_data (key, val, origin, vclock, updated_at) |
| 15 | +VALUES ($1, $2, $3, $4, NOW()) |
| 16 | +ON CONFLICT (key) DO UPDATE SET |
| 17 | + val = CASE WHEN EXCLUDED.vclock >= fuzz_data.vclock THEN EXCLUDED.val ELSE fuzz_data.val END, |
| 18 | + origin = CASE WHEN EXCLUDED.vclock >= fuzz_data.vclock THEN EXCLUDED.origin ELSE fuzz_data.origin END, |
| 19 | + vclock = GREATEST(fuzz_data.vclock, EXCLUDED.vclock), |
| 20 | + updated_at = CASE WHEN EXCLUDED.vclock >= fuzz_data.vclock THEN NOW() ELSE fuzz_data.updated_at END; |
| 21 | +""" |
| 22 | + |
| 23 | +async def ensure_group(r: Redis, stream: str): |
| 24 | + try: |
| 25 | + await r.xgroup_create(stream, GROUP, id="$", mkstream=True) |
| 26 | + except Exception as e: |
| 27 | + if "BUSYGROUP" not in str(e): |
| 28 | + raise |
| 29 | + |
| 30 | +async def consume_stream(label: str, redis_url: str, pg): |
| 31 | + r = Redis.from_url(redis_url) |
| 32 | + await ensure_group(r, STREAM_NAME) |
| 33 | + while True: |
| 34 | + try: |
| 35 | + # Read new messages for this consumer |
| 36 | + resp = await r.xreadgroup(GROUP, CONSUMER, {STREAM_NAME: ">"}, count=100, block=5000) |
| 37 | + if not resp: |
| 38 | + continue |
| 39 | + # resp = [(b'stream:fuzz:updates', [(id, {b'k':b'v', ...}), ...])] |
| 40 | + for _, entries in resp: |
| 41 | + for msg_id, data in entries: |
| 42 | + op = data.get(b'op', b'').decode() |
| 43 | + key = data.get(b'key', b'').decode() |
| 44 | + origin = data.get(b'origin', b'').decode() |
| 45 | + vclock = int(data.get(b'vclock', b'0').decode() or 0) |
| 46 | + if op == "del": |
| 47 | + # Represent deletes: write NULL / tombstone (optional) |
| 48 | + await pg.execute( |
| 49 | + "DELETE FROM fuzz_data WHERE key=$1 AND vclock <= $2", key, vclock |
| 50 | + ) |
| 51 | + else: |
| 52 | + val = data.get(b'val', b'') |
| 53 | + await pg.execute(UPSERT_SQL, key, val, origin, vclock) |
| 54 | + await r.xack(STREAM_NAME, GROUP, msg_id) |
| 55 | + except Exception as e: |
| 56 | + # backoff on errors |
| 57 | + await asyncio.sleep(1) |
| 58 | + |
| 59 | +async def main(): |
| 60 | + pg = await asyncpg.connect(PG_DSN) |
| 61 | + tasks = [] |
| 62 | + for pair in STREAMS: |
| 63 | + label, url = pair.split("=") |
| 64 | + tasks.append(asyncio.create_task(consume_stream(label, url, pg))) |
| 65 | + await asyncio.gather(*tasks) |
| 66 | + |
| 67 | +if __name__ == "__main__": |
| 68 | + asyncio.run(main()) |
0 commit comments