Skip to content

Commit fdeadaa

Browse files
committed
work on notifications
1 parent 8425a72 commit fdeadaa

10 files changed

Lines changed: 337 additions & 53 deletions

File tree

api-server/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ This file documents changes to the Deconf API Server.
55
## next
66

77
- **new** web-push "test" endpoint
8+
- **new** notify command to run notifications process
89
- **fix** align web-push endpoints
910
- **fix** `metadata` columns are preserved when upserting registrations
1011

api-server/source/legacy/calendar.ts

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import { defineRoute, HTTPError } from "gruber";
22
import ics from "ics";
33
import {
4-
ConferenceRecord,
4+
ConferenceInfo,
5+
getConferenceInfo,
6+
getSessionUrl,
57
SessionRecord,
68
useAppConfig,
79
useAuthz,
810
useTokens,
911
} from "../lib/mod.js";
1012
import { LegacyApiError, LegacyRepo } from "./legacy-lib.ts";
11-
import { AppConfig } from "../config.ts";
1213

1314
function getIcsDate(date: Date) {
1415
return [
@@ -38,7 +39,7 @@ function getSessionIcsOptions(
3839
endInputType: "utc",
3940
title: session.title.en,
4041
description: session.summary.en,
41-
url: info.sessionUrl.replace("{session}", session.id.toString()),
42+
url: getSessionUrl(info.sessionUrl, session.id),
4243
calName: info.appName,
4344
location: location.length > 0 ? location.join(", ") : undefined,
4445
geo: info.geo,
@@ -104,29 +105,6 @@ function getSessionGoogleCalUrl(session: SessionRecord) {
104105
return url;
105106
}
106107

107-
interface ConferenceInfo {
108-
appName: string;
109-
sessionUrl: string;
110-
location?: string;
111-
geo?: {
112-
lat: number;
113-
lon: number;
114-
};
115-
}
116-
117-
function getConferenceInfo(
118-
conferece: ConferenceRecord,
119-
appConfig: AppConfig,
120-
): ConferenceInfo {
121-
const { session_url = "", location, lat, lng } = conferece.metadata;
122-
return {
123-
appName: conferece.title.en ?? appConfig.meta.name,
124-
sessionUrl: session_url,
125-
location: location,
126-
geo: lat && lng ? { lat, lon: lng } : undefined,
127-
};
128-
}
129-
130108
// Calendar - getSessionIcs
131109
export const sessionIcsRoute = defineRoute({
132110
method: "GET",

api-server/source/lib/tables.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ export const SessionSaveTable = defineTable<SessionSaveRecord>({
176176
created_at: Structure.date(),
177177
session_id: Structure.number(),
178178
registration_id: Structure.number(),
179+
notified: Structure.array(Structure.string()),
179180
},
180181
});
181182

api-server/source/lib/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export interface SessionSaveRecord {
114114
created_at: Date;
115115
session_id: number;
116116
registration_id: number;
117+
notified: string[];
117118
}
118119

119120
export interface SessionLabelRecord {

api-server/source/lib/utilities.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
TokenService,
88
} from "gruber";
99
import { ServiceTokenTable } from "./tables.ts";
10+
import { ConferenceRecord } from "./types.ts";
11+
import { AppConfig } from "../config.ts";
1012

1113
export function trimEmail(input: string) {
1214
return input.trim().toLowerCase();
@@ -64,3 +66,30 @@ export class ServiceTokenService implements TokenService {
6466
throw new Error("Method not implemented.");
6567
}
6668
}
69+
70+
export interface ConferenceInfo {
71+
appName: string;
72+
sessionUrl: string;
73+
location?: string;
74+
geo?: {
75+
lat: number;
76+
lon: number;
77+
};
78+
}
79+
80+
export function getConferenceInfo(
81+
conferece: ConferenceRecord,
82+
appConfig: AppConfig,
83+
): ConferenceInfo {
84+
const { session_url = "", location, lat, lng } = conferece.metadata;
85+
return {
86+
appName: conferece.title.en ?? appConfig.meta.name,
87+
sessionUrl: session_url,
88+
location: location,
89+
geo: lat && lng ? { lat, lon: lng } : undefined,
90+
};
91+
}
92+
93+
export function getSessionUrl(template: string, id: number) {
94+
return template.replace("{session}", id.toString());
95+
}

api-server/source/main.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import yargs from "yargs";
88
import { dumpConfiguration } from "./config.ts";
99
import { runMigrations, useAppConfig, useTokens } from "./lib/mod.ts";
1010
import { runServer } from "./server.ts";
11+
import { notifyCommand } from "./notifications/notify-command.ts";
1112

1213
const cli = yargs(process.argv.slice(2))
1314
.help()
@@ -50,6 +51,18 @@ cli.command(
5051
},
5152
);
5253

54+
cli.command(
55+
"notify",
56+
"start a process that periodically sends notifications in the queue",
57+
(yargs) =>
58+
yargs
59+
.option("interval", { type: "number", default: 30_000 })
60+
.option("grace", { type: "number", default: 1_000 })
61+
.option("dry-run", { type: "boolean", default: false })
62+
.option("forever", { type: "boolean", default: false }),
63+
(args) => notifyCommand(args),
64+
);
65+
5366
try {
5467
await cli.parseAsync();
5568
} catch (error) {
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { definePostgresMigration } from "gruber";
2+
3+
export default definePostgresMigration({
4+
async up(sql) {
5+
await sql`
6+
ALTER TABLE session_saves
7+
ADD COLUMN notified JSONB DEFAULT '[]'::JSONB
8+
`;
9+
},
10+
async down(sql) {
11+
await sql`
12+
ALTER TABLE session_saves
13+
DROP COLUMN notified
14+
`;
15+
},
16+
});
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
import { SqlDependency } from "gruber";
2+
import { useAppConfig, useDatabase, useStore } from "../lib/globals.ts";
3+
import { WebPushPayload, WebPushRepo } from "./web-push-repo.ts";
4+
import {
5+
ConferenceTable,
6+
RegistrationTable,
7+
SessionSaveTable,
8+
SessionTable,
9+
WebPushDeviceTable,
10+
WebPushMessageTable,
11+
} from "../lib/tables.ts";
12+
import { getConferenceInfo, getSessionUrl } from "../lib/utilities.ts";
13+
import { AppConfig } from "../config.ts";
14+
15+
//
16+
// A context to run various sub-commands with containing relevant dependencies & helpers
17+
//
18+
class NotifyContext {
19+
options: NotifyOptions;
20+
webPush: WebPushRepo;
21+
sql: SqlDependency;
22+
appConfig: AppConfig;
23+
constructor(
24+
options: NotifyOptions,
25+
webPush: WebPushRepo,
26+
sql: SqlDependency,
27+
appConfig: AppConfig,
28+
) {
29+
this.options = options;
30+
this.webPush = webPush;
31+
this.sql = sql;
32+
this.appConfig = appConfig;
33+
}
34+
35+
/** Log a timestamped message */
36+
log(message: string, ...args: any[]) {
37+
console.error(new Date().toISOString() + " " + message, ...args);
38+
}
39+
40+
/** Wait for a duration to elapse (milliseconds) */
41+
pause(ms: number) {
42+
this.log("pause ms=%o", ms);
43+
return new Promise((r) => setTimeout(r, ms));
44+
}
45+
}
46+
47+
export interface NotifyOptions {
48+
dryRun: boolean;
49+
forever: boolean;
50+
interval: number;
51+
grace: number;
52+
}
53+
54+
export async function notifyCommand(options: NotifyOptions) {
55+
// Set up context
56+
const appConfig = useAppConfig();
57+
const store = useStore();
58+
const sql = useDatabase();
59+
const webPush = WebPushRepo.use();
60+
const ctx = new NotifyContext(options, webPush, sql, appConfig);
61+
62+
ctx.log("init");
63+
64+
try {
65+
while (options.forever) {
66+
ctx.log("starting");
67+
68+
await enqueueMySchedule(ctx);
69+
await sendPendingMessages(ctx);
70+
71+
await ctx.pause(options.interval);
72+
}
73+
74+
ctx.log("done");
75+
} catch (error) {
76+
console.log("Fatal error", error);
77+
}
78+
79+
await store.close();
80+
await sql.end();
81+
}
82+
83+
interface PendingMessage {
84+
deviceId: number;
85+
saveId: number;
86+
payload: WebPushPayload;
87+
}
88+
89+
async function enqueueMySchedule(ctx: NotifyContext, date = new Date()) {
90+
ctx.log("enqueue from schedule…");
91+
92+
// Get sessions starting in 15 minutes or started 5 minutes ago
93+
const upcoming = await SessionTable.select(
94+
ctx.sql,
95+
ctx.sql`
96+
start_date IS NOT NULL
97+
AND start_date >= ${date} - INTERVAL '15 minutes'
98+
AND start_date <= ${date} + INTERVAL '5 minutes'
99+
`,
100+
);
101+
102+
// Fetch conferences for those sessions
103+
const conferences = await ConferenceTable.select(
104+
ctx.sql,
105+
ctx.sql`
106+
id IN ${ctx.sql(upcoming.map((r) => r.conference_id))}
107+
`,
108+
);
109+
110+
// Generate portable information about the conferences
111+
const info = new Map(
112+
conferences.map((c) => [c.id, getConferenceInfo(c, ctx.appConfig)]),
113+
);
114+
115+
// Get saved sessions who haven't been notified yet
116+
const saved = await SessionSaveTable.select(
117+
ctx.sql,
118+
ctx.sql`
119+
session_id IN ${ctx.sql(upcoming.map((r) => r.id))}
120+
AND NOT notified ? 'web-push'
121+
`,
122+
);
123+
124+
// Get devices for people who have saved those sessions
125+
// which have opted-in to MySchedule messages and are not expired
126+
const devices = await WebPushDeviceTable.select(
127+
ctx.sql,
128+
ctx.sql`
129+
registration_id IN ${ctx.sql(saved.map((r) => r.registration_id))}
130+
AND categories ? 'MySchedule'
131+
AND (
132+
expires_at IS NULL
133+
OR expires_at >= NOW()
134+
)
135+
`,
136+
);
137+
138+
ctx.log("saves=%o devices=%o", saved.length, devices.length);
139+
140+
const sessions = new Map(upcoming.map((s) => [s.id, s]));
141+
142+
// Generate a list of messages to enqueu
143+
const queue: PendingMessage[] = [];
144+
145+
// Loop through each saved session & fetch info
146+
for (const save of saved) {
147+
const session = sessions.get(save.session_id)!;
148+
const conference = info.get(session.conference_id)!;
149+
const userDevices = devices.filter(
150+
(r) => r.registration_id === save.registration_id,
151+
);
152+
153+
// Loop through each device and generate a message to send
154+
for (const device of userDevices) {
155+
queue.push({
156+
deviceId: device.id,
157+
saveId: save.id,
158+
payload: {
159+
title: "Session starting soon",
160+
body: session.title.en!,
161+
data: {
162+
url: getSessionUrl(conference.sessionUrl, session.id),
163+
},
164+
},
165+
});
166+
}
167+
}
168+
169+
// Exit early and dump information during a dry run
170+
if (ctx.options.dryRun) {
171+
console.log("Enqueue:");
172+
console.log(JSON.stringify(queue));
173+
return;
174+
}
175+
176+
// Process the queue of messages with a transaction for each
177+
// NOTE: could be one big transaction?
178+
for (const item of queue) {
179+
await ctx.sql.begin(async (trx) => {
180+
ctx.log(
181+
"enqueue device=%o save=%o title=%o",
182+
item.deviceId,
183+
item.saveId,
184+
item.payload.title,
185+
);
186+
187+
// Insert a pending web push message
188+
await WebPushMessageTable.insertOne(trx, {
189+
device_id: item.deviceId,
190+
payload: item.payload,
191+
});
192+
193+
// Mark the save as notified
194+
await trx`
195+
UPDATE session_saves
196+
SET notified = notified || '["web-push"]'::jsonb
197+
WHERE id = ${item.saveId}
198+
`;
199+
});
200+
}
201+
}
202+
203+
async function sendPendingMessages(ctx: NotifyContext) {
204+
ctx.log("send pending messages…");
205+
206+
const { messages, devices } = await ctx.webPush.listPending();
207+
208+
ctx.log("messages=%o devices=%o", messages.length, devices.size);
209+
210+
if (ctx.options.dryRun) {
211+
console.log("Pending:");
212+
console.log(JSON.stringify({ messages, devices }));
213+
return;
214+
}
215+
216+
for (const message of messages) {
217+
const device = devices.get(message.device_id);
218+
if (!device) throw new Error("internal error - bad device");
219+
220+
ctx.log("send message=%o device=%o", message.id, device.id);
221+
const success = await ctx.webPush.attemptToSend(message, device);
222+
ctx.log(" success=%o", success);
223+
224+
await ctx.pause(ctx.options.grace);
225+
}
226+
}

0 commit comments

Comments
 (0)