-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathpubsub_server_example.py
More file actions
37 lines (30 loc) · 980 Bytes
/
pubsub_server_example.py
File metadata and controls
37 lines (30 loc) · 980 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""
To run this test.
- 1. run this script for the server
- 2. once the server is up, run pubsub_client_example.py
- 3. send get request to server on: 'http://localhost:8000/trigger'
"""
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter
from fastapi_websocket_pubsub import PubSubEndpoint
app = FastAPI()
router = APIRouter()
endpoint = PubSubEndpoint()
endpoint.register_route(router)
app.include_router(router)
async def events():
await asyncio.sleep(1)
# Publish multiple topics (without data)
await endpoint.publish(["guns", "germs"])
await asyncio.sleep(1)
# Publish single topic (without data)
await endpoint.publish(["germs"])
await asyncio.sleep(1)
# Publish single topic (with data)
await endpoint.publish(["steel"], data={"author": "Jared Diamond"})
@app.get("/trigger")
async def trigger_events():
asyncio.create_task(events())
uvicorn.run(app, host="0.0.0.0", port=8000)