-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
104 lines (93 loc) · 3.09 KB
/
app.py
File metadata and controls
104 lines (93 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# pip install fastapi[standard] webpush
from pathlib import Path
import uuid
from typing import Annotated
import datetime
from webpush import WebPush, WebPushSubscription
import httpx
from pydantic import BaseModel
from fastapi import FastAPI, Request, BackgroundTasks, Response, Cookie
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
class Message(BaseModel):
title: str
body: str
require_interaction: bool = False
icon: str # URL
#tag: str
renotify: bool = False
timestamp: float
templates = Jinja2Templates(directory="templates")
wp = WebPush(
private_key=Path("./private_key.pem"),
public_key=Path("./public_key.pem"),
subscriber="eskildsf@gmail.com",
ttl=30,
)
subscriptions: dict[str, WebPushSubscription] = {}
app = FastAPI()
@app.get("/")
async def root(request: Request):
return templates.TemplateResponse(
"index.html",
context={
"request": request,
"application_server_key": "BNaVxIyIyiTJrnjkeA_58YU-9nEgu3fCvBYFl44VA3wTTgygalXyXJvmGgwU4etN1nTQwTZiCNmqdCRy4B8H3j0",
}
)
@app.post("/subscribe")
async def subscribe_user(
subscription: WebPushSubscription,
background_tasks: BackgroundTasks,
response: Response):
id = str(uuid.uuid4())
response.set_cookie(key="session", value=id)
subscriptions[id] = subscription
@app.get("/notify")
async def notify(
session: Annotated[str, Cookie()],
background_tasks: BackgroundTasks,
request: Request):
assert session in subscriptions
subscription = subscriptions[session]
message = wp.get(
message=Message(
title="TFF410A",
body=f"Watch activated: T > 30 degC\n{datetime.datetime.now():%y-%m-%d %H:%M:%S}",
icon=str(request.url_for("static", path="favicon.ico")),
#tag="TFF410A",
timestamp=datetime.datetime.now().timestamp(),
).model_dump_json(),
subscription=subscription,
)
background_tasks.add_task(
httpx.post,
url=str(subscription.endpoint),
data=message.encrypted,
headers=message.headers
)
@app.get("/notify_all")
async def notify(
background_tasks: BackgroundTasks,
request: Request):
for subscription in subscriptions.values():
message = wp.get(
message=Message(
title="TFF410A",
body=f"Watch activated: T > 30 degC\n{datetime.datetime.now():%y-%m-%d %H:%M:%S}",
icon=str(request.url_for("static", path="favicon.ico")),
#tag="TFF410A",
timestamp=datetime.datetime.now().timestamp(),
).model_dump_json(),
subscription=subscription,
)
background_tasks.add_task(
httpx.post,
url=str(subscription.endpoint),
data=message.encrypted,
headers=message.headers
)
@app.get("/app.webmanifest")
async def pwa():
return {"name": "Open Pectus", "id": "12345", "display": "standalone",}
app.mount("/", StaticFiles(directory="static"), name="static")