Skip to content

Commit 6f7a13e

Browse files
authored
Improve JoinWaveShortCircuitProtection (#636)
- We add the status command that was missing and show how other protections could add their own commands to Draupnir. - We changed the token system to use a "lazy"[^lazy] version of leaky bucket, which isn't perfect but what was happening before was the protection counted the joins in the elapsed time, and then dropped them all rather than rolling over. - We've given it a general tidy The reason for this is because I want to use this protection to write generic documentation (a tutorial) on how to configure protections. [^lazy]: "lazy" in the sense that we don't bother to remove tokens all the time, only when a token gets added, which is fine because the throughput on room joins will be tiny.
1 parent 86c887b commit 6f7a13e

3 files changed

Lines changed: 190 additions & 64 deletions

File tree

src/commands/DraupnirCommands.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import { DraupnirTopLevelCommands } from "./DraupnirCommandTable";
5151
import { DraupnirSafeModeCommand } from "./SafeModeCommand";
5252
import { DraupnirProtectionsShowCommand } from "./ProtectionsShowCommand";
5353
import { DraupnirProtectionsCapabilityCommand } from "./ProtectionsCapabilitiesCommand";
54+
import { JoinWaveCommandTable } from "../protections/JoinWaveShortCircuit";
5455

5556
// TODO: These commands should all be moved to subdirectories tbh and this
5657
// should be split like an index file for each subdirectory.
@@ -107,3 +108,7 @@ const DraupnirCommands = new StandardCommandTable("draupnir")
107108

108109
DraupnirCommands.importTable(SynapseAdminCommands, []);
109110
DraupnirTopLevelCommands.importTable(DraupnirCommands, ["draupnir"]);
111+
DraupnirTopLevelCommands.importTable(JoinWaveCommandTable, [
112+
"draupnir",
113+
"joinwave",
114+
]);

src/protections/JoinWaveShortCircuit.tsx

Lines changed: 109 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
ActionResult,
1414
CapabilitySet,
1515
EDStatic,
16+
JoinRulesEvent,
1617
Logger,
1718
MembershipChange,
1819
MembershipChangeType,
@@ -26,8 +27,21 @@ import {
2627
import { LogLevel } from "matrix-bot-sdk";
2728
import { Draupnir } from "../Draupnir";
2829
import { DraupnirProtection } from "./Protection";
29-
import { StringRoomID } from "@the-draupnir-project/matrix-basic-types";
30+
import {
31+
MatrixRoomReference,
32+
StringRoomID,
33+
} from "@the-draupnir-project/matrix-basic-types";
3034
import { Type } from "@sinclair/typebox";
35+
import {
36+
DeadDocumentJSX,
37+
StandardCommandTable,
38+
describeCommand,
39+
tuple,
40+
} from "@the-draupnir-project/interface-manager";
41+
import { Result } from "@gnuxie/typescript-result";
42+
import { DraupnirInterfaceAdaptor } from "../commands/DraupnirCommandPrerequisites";
43+
import { LazyLeakyBucket, LeakyBucket } from "../queues/LeakyBucket";
44+
import { renderRoomPill } from "../commands/interface-manager/MatrixHelpRenderer";
3145

3246
const log = new Logger("JoinWaveShortCircuitProtection");
3347

@@ -36,8 +50,16 @@ const DEFAULT_TIMESCALE_MINUTES = 60;
3650
const ONE_MINUTE = 60_000; // 1min in ms
3751

3852
const JoinWaveShortCircuitProtectionSettings = Type.Object({
39-
maxPer: Type.Integer({ default: DEFAULT_MAX_PER_TIMESCALE }),
40-
timescaleMinutes: Type.Integer({ default: DEFAULT_TIMESCALE_MINUTES }),
53+
maxPer: Type.Integer({
54+
default: DEFAULT_MAX_PER_TIMESCALE,
55+
description:
56+
"The maximum number of users that can join a room in the timescaleMinutes timescale before the room is set to invite-only.",
57+
}),
58+
timescaleMinutes: Type.Integer({
59+
default: DEFAULT_TIMESCALE_MINUTES,
60+
description:
61+
"The timescale in minutes over which the maxPer users can join before the room is set to invite-only.",
62+
}),
4163
});
4264

4365
type JoinWaveShortCircuitProtectionSettings = EDStatic<
@@ -53,14 +75,18 @@ type JoinWaveShortCircuitProtectionDescription = ProtectionDescription<
5375
JoinWaveShortCircuitProtectionCapabilities
5476
>;
5577

78+
export const JoinWaveCommandTable = new StandardCommandTable(
79+
"JoinWaveShortCircuitProtection"
80+
);
81+
5682
describeProtection<
5783
JoinWaveShortCircuitProtectionCapabilities,
5884
Draupnir,
5985
typeof JoinWaveShortCircuitProtectionSettings
6086
>({
6187
name: "JoinWaveShortCircuitProtection",
6288
description:
63-
"If X amount of users join in Y time, set the room to invite-only.",
89+
"If a wave of users join in a given time frame, then the protection can set the room to invite-only.",
6490
capabilityInterfaces: {},
6591
defaultCapabilities: {},
6692
configSchema: JoinWaveShortCircuitProtectionSettings,
@@ -91,14 +117,7 @@ export class JoinWaveShortCircuitProtection
91117
extends AbstractProtection<JoinWaveShortCircuitProtectionDescription>
92118
implements DraupnirProtection<JoinWaveShortCircuitProtectionDescription>
93119
{
94-
private joinBuckets: {
95-
[roomID: StringRoomID]:
96-
| {
97-
lastBucketStart: Date;
98-
numberOfJoins: number;
99-
}
100-
| undefined;
101-
} = {};
120+
public joinBuckets: LeakyBucket<StringRoomID>;
102121

103122
constructor(
104123
description: JoinWaveShortCircuitProtectionDescription,
@@ -110,6 +129,10 @@ export class JoinWaveShortCircuitProtection
110129
super(description, capabilities, protectedRoomsSet, {
111130
requiredStatePermissions: ["m.room.join_rules"],
112131
});
132+
this.joinBuckets = new LazyLeakyBucket(
133+
this.settings.maxPer,
134+
this.timescaleMilliseconds()
135+
);
113136
}
114137
public async handleMembershipChange(
115138
revision: RoomMembershipRevision,
@@ -131,23 +154,29 @@ export class JoinWaveShortCircuitProtection
131154
if (change.membershipChangeType !== MembershipChangeType.Joined) {
132155
return;
133156
}
134-
135-
// If either the roomId bucket didn't exist, or the bucket has expired, create a new one
136-
if (
137-
!this.joinBuckets[roomID] ||
138-
this.hasExpired(this.joinBuckets[roomID].lastBucketStart)
139-
) {
140-
this.joinBuckets[roomID] = {
141-
lastBucketStart: new Date(),
142-
numberOfJoins: 0,
143-
};
144-
}
145-
146-
if (++this.joinBuckets[roomID].numberOfJoins >= this.settings.maxPer) {
157+
const numberOfJoins = this.joinBuckets.addToken(roomID);
158+
if (numberOfJoins >= this.settings.maxPer) {
159+
// we should check that we haven't already set the room to invite only
160+
const revision = this.protectedRoomsSet.setRoomState.getRevision(roomID);
161+
if (revision === undefined) {
162+
throw new TypeError(
163+
`Shouldn't be possible to not have the room state revision for a protected room yet`
164+
);
165+
}
166+
const joinRules = revision.getStateEvent<JoinRulesEvent>(
167+
"m.room.join_rules",
168+
""
169+
);
170+
if ((joinRules?.content.join_rule ?? "public") !== "public") {
171+
log.info(
172+
`Room ${roomID} is already invite-only, not changing join rules`
173+
);
174+
return;
175+
}
147176
await this.draupnir.managementRoomOutput.logMessage(
148177
LogLevel.WARN,
149178
"JoinWaveShortCircuit",
150-
`Setting ${roomID} to invite-only as more than ${this.settings.maxPer} users have joined over the last ${this.settings.timescaleMinutes} minutes (since ${this.joinBuckets[roomID].lastBucketStart.toString()})`,
179+
`Setting ${roomID} to invite-only as more than ${this.settings.maxPer} users have joined over the last ${this.settings.timescaleMinutes} minutes.`,
151180
roomID
152181
);
153182

@@ -169,47 +198,63 @@ export class JoinWaveShortCircuitProtection
169198
}
170199
}
171200

172-
private hasExpired(at: Date): boolean {
173-
return new Date().getTime() - at.getTime() > this.timescaleMilliseconds();
174-
}
175-
176201
private timescaleMilliseconds(): number {
177202
return this.settings.timescaleMinutes * ONE_MINUTE;
178203
}
204+
}
179205

180-
/**
181-
* Yeah i know this is evil but
182-
* We need to figure this out once we allow protections to have their own
183-
* command tables somehow.
184-
* which will probably entail doing the symbol case hacks from Utena for camel case etc.
185-
public async status(keywords, subcommands): Promise<DocumentNode> {
186-
const withExpired = subcommand.includes("withExpired");
187-
const withStart = subcommand.includes("withStart");
188-
189-
let html = `<b>Short Circuit join buckets (max ${this.settings.maxPer.value} per ${this.settings.timescaleMinutes.value} minutes}):</b><br/><ul>`;
190-
let text = `Short Circuit join buckets (max ${this.settings.maxPer.value} per ${this.settings.timescaleMinutes.value} minutes):\n`;
191-
192-
for (const roomId of Object.keys(this.joinBuckets)) {
193-
const bucket = this.joinBuckets[roomId];
194-
const isExpired = this.hasExpired(bucket.lastBucketStart);
195-
196-
if (isExpired && !withExpired) {
197-
continue;
198-
}
199-
200-
const startText = withStart ? ` (since ${bucket.lastBucketStart})` : "";
201-
const expiredText = isExpired ? ` (bucket expired since ${new Date(bucket.lastBucketStart.getTime() + this.timescaleMilliseconds())})` : "";
202-
203-
html += `<li><a href="https://matrix.to/#/${roomId}">${roomId}</a>: ${bucket.numberOfJoins} joins${startText}${expiredText}.</li>`;
204-
text += `* ${roomId}: ${bucket.numberOfJoins} joins${startText}${expiredText}.\n`;
205-
}
206-
207-
html += "</ul>";
206+
const JoinWaveStatusCommand = describeCommand({
207+
summary: "Show the current status of the JoinWaveShortCircuitProtection",
208+
parameters: tuple(),
209+
async executor(
210+
draupnir: Draupnir,
211+
_info,
212+
_keywords,
213+
_rest
214+
): Promise<Result<JoinWaveShortCircuitProtection | undefined>> {
215+
return Ok(
216+
draupnir.protectedRoomsSet.protections.findEnabledProtection(
217+
JoinWaveShortCircuitProtection.name
218+
) as JoinWaveShortCircuitProtection | undefined
219+
);
220+
},
221+
});
208222

209-
return {
210-
html,
211-
text,
212-
}
223+
DraupnirInterfaceAdaptor.describeRenderer(JoinWaveStatusCommand, {
224+
JSXRenderer(result) {
225+
if (isError(result)) {
226+
return Ok(undefined);
213227
}
214-
*/
215-
}
228+
if (result.ok === undefined) {
229+
return Ok(
230+
<root>The JoinWaveShortCircuitProtection has not been enabled.</root>
231+
);
232+
}
233+
const joinBuckets = result.ok.joinBuckets.getAllTokens();
234+
return Ok(
235+
<root>
236+
<b>
237+
Recent room joins (max {result.ok.settings.maxPer} per{" "}
238+
{result.ok.settings.timescaleMinutes} minutes):
239+
</b>
240+
{joinBuckets.size === 0 ? (
241+
<p>No rooms have had join events since the protection was enabled.</p>
242+
) : (
243+
<fragment></fragment>
244+
)}
245+
<ul>
246+
{[...joinBuckets.entries()].map(([roomID, joinCount]) => {
247+
return (
248+
<li>
249+
{renderRoomPill(MatrixRoomReference.fromRoomID(roomID, []))}{" "}
250+
{joinCount} joins.
251+
</li>
252+
);
253+
})}
254+
</ul>
255+
</root>
256+
);
257+
},
258+
});
259+
260+
JoinWaveCommandTable.internCommand(JoinWaveStatusCommand, ["status"]);

src/queues/LeakyBucket.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// SPDX-FileCopyrightText: 2024 Gnuxie <Gnuxie@protonmail.com>
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
export interface LeakyBucket<Key> {
6+
/**
7+
* Return the token count.
8+
*/
9+
addToken(key: Key): number;
10+
getTokenCount(key: Key): number;
11+
getAllTokens(): Map<Key, number>;
12+
}
13+
14+
type BucketEntry = {
15+
tokens: number;
16+
lastLeak: Date;
17+
};
18+
19+
/**
20+
* A lazy version of the bucket to be used when the throuhgput is really
21+
* low most of the time, so doesn't warrant constant filling/leaking.
22+
*
23+
* This won't be good to use in a high throughput situation because
24+
* of the way it will spam calling for the current time.
25+
*/
26+
export class LazyLeakyBucket<Key> implements LeakyBucket<Key> {
27+
private readonly buckets: Map<Key, BucketEntry> = new Map();
28+
private readonly leakDelta: number;
29+
30+
public constructor(
31+
private readonly capacity: number,
32+
private readonly timescale: number
33+
) {
34+
this.leakDelta = this.timescale / this.capacity;
35+
}
36+
getAllTokens(): Map<Key, number> {
37+
const map = new Map<Key, number>();
38+
for (const key of this.buckets.keys()) {
39+
map.set(key, this.getTokenCount(key));
40+
}
41+
return map;
42+
}
43+
44+
private leak(now: Date, entry: BucketEntry): void {
45+
const elapsed = now.getTime() - entry.lastLeak.getTime();
46+
const tokensToRemove = Math.floor(elapsed / this.timescale);
47+
entry.tokens = Math.max(entry.tokens - tokensToRemove, 0);
48+
entry.lastLeak = new Date(
49+
entry.lastLeak.getTime() + tokensToRemove * this.leakDelta
50+
);
51+
}
52+
53+
public addToken(key: Key): number {
54+
const now = new Date();
55+
const entry = this.buckets.get(key);
56+
if (entry === undefined) {
57+
this.buckets.set(key, {
58+
tokens: 1,
59+
lastLeak: now,
60+
});
61+
return 1;
62+
}
63+
this.leak(now, entry);
64+
return entry.tokens;
65+
}
66+
67+
public getTokenCount(key: Key): number {
68+
const now = new Date();
69+
const entry = this.buckets.get(key);
70+
if (entry === undefined) {
71+
return 0;
72+
}
73+
this.leak(now, entry);
74+
return entry.tokens;
75+
}
76+
}

0 commit comments

Comments
 (0)