Skip to content

Commit 2d48009

Browse files
committed
feat: route trigger callback room and enforce peer/issue sync policies
1 parent c3384b4 commit 2d48009

14 files changed

Lines changed: 461 additions & 21 deletions

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,11 @@ optional env:
175175
- `RELAY_NODE_ID` (cache exchange ノード ID)
176176
- `RELAY_PEERS` (CSV: peer relay URLs)
177177
- `RELAY_PEERS_JSON` (JSON: peer relay URLs, `RELAY_PEERS` より優先)
178-
- `RELAY_PEER_AUTH_TOKEN` (peer pull 時の Bearer token)
178+
- `RELAY_PEER_AUTH_TOKEN` (relay 間共有 Bearer token。設定時は `/api/v1/cache/exchange/*`
179+
`/api/v1/cache/issues/*` が必須認証)
179180
- `RELAY_PEER_SYNC_INTERVAL_SEC` (default: `30`)
180181
- `RELAY_PEER_REPO_FF_COMMIT_WINDOW` (default: `30`, peer 間 repo 判定で見る commit 数)
182+
- `RELAY_ISSUE_SOURCE_OF_TRUTH` (`last_write` | `github` | `bit`, default: `last_write`)
181183
- `RELAY_REPO_ID` (明示 repo ID, 例: `bit-vcs/bit`)
182184
- `RELAY_REPO_ORIGIN_URL` (origin URL の明示上書き)
183185
- `RELAY_REPO_RECENT_COMMITS` (CSV: 最近 commit hash)

docs/host-bit-relay.md

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ All configuration is done via environment variables (set in the Cloudflare dashb
6666

6767
### Authentication
6868

69-
| Variable | Description | Default |
70-
| ---------------------- | ------------------------------------------------------------------------------------------------------------------------------------- | ------------ |
71-
| `BIT_RELAY_AUTH_TOKEN` | Bearer token for API authentication. When set, all `/api/v1/*` and `/ws` requests require this token. Leave unset for a public relay. | (none, open) |
69+
| Variable | Description | Default |
70+
| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------- | ------------ |
71+
| `BIT_RELAY_AUTH_TOKEN` | Bearer token for API authentication. When set, all `/api/v1/*` and `/ws` requests require this token. Leave unset for a public relay. | (none, open) |
72+
| `RELAY_PEER_AUTH_TOKEN` | Shared bearer token for relay-to-relay cache endpoints (`/api/v1/cache/exchange/*`, `/api/v1/cache/issues/*`). | (none) |
7273

7374
### Signature Verification
7475

@@ -91,12 +92,13 @@ All configuration is done via environment variables (set in the Cloudflare dashb
9192

9293
### Rooms and Sessions
9394

94-
| Variable | Description | Default |
95-
| ----------------------------- | ---------------------------------------- | ------------------ |
96-
| `RELAY_MAX_MESSAGES_PER_ROOM` | Max stored messages per room | (built-in default) |
97-
| `RELAY_ROOM_TOKENS` | JSON object mapping room names to tokens | `{}` |
98-
| `RELAY_PRESENCE_TTL_SEC` | Presence heartbeat TTL | (built-in default) |
99-
| `GIT_SERVE_SESSION_TTL_SEC` | Git serve session TTL (0 = no expiry) | `0` |
95+
| Variable | Description | Default |
96+
| ----------------------------- | -------------------------------------------------------------- | ------------------ |
97+
| `RELAY_MAX_MESSAGES_PER_ROOM` | Max stored messages per room | (built-in default) |
98+
| `RELAY_ROOM_TOKENS` | JSON object mapping room names to tokens | `{}` |
99+
| `RELAY_PRESENCE_TTL_SEC` | Presence heartbeat TTL | (built-in default) |
100+
| `GIT_SERVE_SESSION_TTL_SEC` | Git serve session TTL (0 = no expiry) | `0` |
101+
| `RELAY_ISSUE_SOURCE_OF_TRUTH` | Issue snapshot conflict policy (`last_write`, `github`, `bit`) | `last_write` |
100102

101103
### WebSocket
102104

@@ -119,6 +121,10 @@ wrangler secret put RELAY_REQUIRE_SIGNATURE
119121
# Set a room token
120122
wrangler secret put RELAY_ROOM_TOKENS
121123
# Enter: {"my-room":"secret-token"}
124+
125+
# Set relay-to-relay shared token for cache exchange
126+
wrangler secret put RELAY_PEER_AUTH_TOKEN
127+
# Enter: <shared-token>
122128
```
123129

124130
## Verify Deployment

src/cloudflare_worker.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export interface RelayWorkerEnv {
4646
RELAY_MAX_CLOCK_SKEW_SEC?: string;
4747
RELAY_NONCE_TTL_SEC?: string;
4848
RELAY_MAX_NONCES_PER_SENDER?: string;
49+
RELAY_PEER_AUTH_TOKEN?: string;
4950
RELAY_PRESENCE_TTL_SEC?: string;
5051
RELAY_IP_PUBLISH_LIMIT_PER_WINDOW?: string;
5152
RELAY_ROOM_PUBLISH_LIMIT_PER_WINDOW?: string;
@@ -59,6 +60,7 @@ export interface RelayWorkerEnv {
5960
}
6061

6162
const IDENTITY_KEY = 'relay_identity_v1';
63+
const INCOMING_TRIGGER_REF_PREFIX = 'refs/relay/incoming/';
6264
let fallbackService: MemoryRelayService | null = null;
6365
let adminGitHubApi: ReturnType<typeof createAdminGitHubApi> | null = null;
6466
const requestMetrics = createRelayRequestMetricRecorder();
@@ -289,6 +291,48 @@ function invalidRoomResponse(): Response {
289291
return Response.json({ ok: false, error: 'invalid room' }, { status: 400 });
290292
}
291293

294+
function isObjectRecord(value: unknown): value is Record<string, unknown> {
295+
return typeof value === 'object' && value !== null && !Array.isArray(value);
296+
}
297+
298+
function deriveRoomFromIncomingRef(ref: string): string {
299+
const trimmed = ref.trim();
300+
if (!trimmed.startsWith(INCOMING_TRIGGER_REF_PREFIX)) return DEFAULT_ROOM;
301+
const suffix = trimmed.slice(INCOMING_TRIGGER_REF_PREFIX.length).trim();
302+
if (suffix.length === 0) return DEFAULT_ROOM;
303+
const first = suffix.split('/')[0].trim();
304+
if (!isValidRoomName(first)) return DEFAULT_ROOM;
305+
return first;
306+
}
307+
308+
async function resolveRelayRouteRoom(url: URL, request: Request): Promise<string> {
309+
const roomFromQuery = (url.searchParams.get('room') ?? '').trim();
310+
if (roomFromQuery.length > 0) {
311+
return roomFromQuery;
312+
}
313+
314+
if (url.pathname === '/api/v1/trigger/callback' && request.method === 'POST') {
315+
try {
316+
const bodyText = await request.clone().text();
317+
const parsed = JSON.parse(bodyText);
318+
if (isObjectRecord(parsed)) {
319+
const roomFromBody = (typeof parsed.room === 'string' ? parsed.room : '').trim();
320+
if (roomFromBody.length > 0) {
321+
return roomFromBody;
322+
}
323+
const ref = (typeof parsed.ref === 'string' ? parsed.ref : '').trim();
324+
if (ref.length > 0) {
325+
return deriveRoomFromIncomingRef(ref);
326+
}
327+
}
328+
} catch {
329+
// keep default room fallback
330+
}
331+
}
332+
333+
return DEFAULT_ROOM;
334+
}
335+
292336
export class RelayRoom {
293337
private readonly state: DurableObjectStateLike;
294338
private readonly service: MemoryRelayService;
@@ -393,7 +437,7 @@ async function handleWorkerRequest(request: Request, env: RelayWorkerEnv): Promi
393437
return Response.json({ ok: false, error: 'not found' }, { status: 404 });
394438
}
395439

396-
const room = (url.searchParams.get('room') ?? DEFAULT_ROOM).trim();
440+
const room = (await resolveRelayRouteRoom(url, request)).trim();
397441
if (!isValidRoomName(room)) {
398442
return invalidRoomResponse();
399443
}

src/deno_main.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ const relayCacheStore = createMemoryCacheStore({
7979
});
8080
const service = createMemoryRelayService({
8181
...runtimeConfig.relay,
82+
peerAuthToken: runtimeConfig.peers.authToken ?? runtimeConfig.relay.peerAuthToken,
8283
peerRelayUrls: runtimeConfig.peers.urls.length > 0
8384
? runtimeConfig.peers.urls
8485
: runtimeConfig.relay.peerRelayUrls,

src/memory_handler.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { createRelayCacheAdapter } from './relay_cache_adapter.ts';
2020

2121
export type JsonPrimitive = string | number | boolean | null;
2222
export type JsonValue = JsonPrimitive | JsonValue[] | { [key: string]: JsonValue };
23+
export type IssueSourceOfTruth = 'last_write' | 'github' | 'bit';
2324

2425
type JsonObject = { [key: string]: JsonValue };
2526
type KeyStatus = 'active' | 'revoked';
@@ -180,6 +181,8 @@ export interface IdentitySnapshot {
180181

181182
export interface MemoryRelayOptions {
182183
authToken?: string;
184+
peerAuthToken?: string;
185+
issueSourceOfTruth?: IssueSourceOfTruth;
183186
maxMessagesPerRoom?: number;
184187
publishPayloadMaxBytes?: number;
185188
publishLimitPerWindow?: number;
@@ -237,6 +240,7 @@ const DEFAULT_WS_PING_INTERVAL_MS = 30_000;
237240
const DEFAULT_WS_IDLE_TIMEOUT_MS = 90_000;
238241
const DEFAULT_CACHE_EXCHANGE_MAX_HOPS = 3;
239242
const DEFAULT_CACHE_EXCHANGE_MAX_RECORDS = 10_000;
243+
const DEFAULT_ISSUE_SOURCE_OF_TRUTH: IssueSourceOfTruth = 'last_write';
240244
const MAX_GITHUB_WEBHOOK_DELIVERY_IDS = 10_000;
241245
const MAX_GITHUB_WEBHOOK_DLQ_ENTRIES = 10_000;
242246
const GITHUB_WEBHOOK_RETRY_BASE_SEC = 30;
@@ -1100,6 +1104,7 @@ function parseBoolOrDefault(raw: boolean | undefined, fallback: boolean): boolea
11001104

11011105
export function createMemoryRelayService(options: MemoryRelayOptions = {}): MemoryRelayService {
11021106
const authToken = normalizeAuthToken(options.authToken);
1107+
const peerAuthToken = normalizeAuthToken(options.peerAuthToken);
11031108
const roomTokens = parseRoomTokens(options.roomTokens);
11041109
const maxMessagesPerRoom = Math.max(
11051110
1,
@@ -1165,6 +1170,10 @@ export function createMemoryRelayService(options: MemoryRelayOptions = {}): Memo
11651170
const cacheStore = options.cacheStore ?? null;
11661171
const githubWebhookSecret = (options.githubWebhookSecret ?? '').trim();
11671172
const fetchFn = options.fetchFn ?? globalThis.fetch;
1173+
const issueSourceOfTruth = options.issueSourceOfTruth === 'github' ||
1174+
options.issueSourceOfTruth === 'bit'
1175+
? options.issueSourceOfTruth
1176+
: DEFAULT_ISSUE_SOURCE_OF_TRUTH;
11681177
const cacheAdapter = createRelayCacheAdapter({
11691178
cacheStore,
11701179
isValidRoomName,
@@ -1173,6 +1182,7 @@ export function createMemoryRelayService(options: MemoryRelayOptions = {}): Memo
11731182
cachePersistMaxRetries: options.cachePersistMaxRetries,
11741183
cachePersistRetryBaseDelayMs: options.cachePersistRetryBaseDelayMs,
11751184
cachePersistRetryMaxDelayMs: options.cachePersistRetryMaxDelayMs,
1185+
issueSourceOfTruth,
11761186
});
11771187

11781188
const rooms = new Map<string, RoomState>();
@@ -1386,10 +1396,21 @@ export function createMemoryRelayService(options: MemoryRelayOptions = {}): Memo
13861396
return null;
13871397
}
13881398

1399+
function checkPeerAuth(request: Request): Response | null {
1400+
if (peerAuthToken.length === 0) return null;
1401+
const presented = extractPresentedToken(request);
1402+
if (presented.length === 0 || !timingSafeEqual(presented, peerAuthToken)) {
1403+
return unauthorizedResponse();
1404+
}
1405+
return null;
1406+
}
1407+
13891408
function handleCacheExchangeDiscovery(request: Request): Response {
13901409
if (request.method !== 'GET') {
13911410
return methodNotAllowedResponse();
13921411
}
1412+
const peerAuthError = checkPeerAuth(request);
1413+
if (peerAuthError) return peerAuthError;
13931414
return Response.json({
13941415
ok: true,
13951416
protocol: 'cache.exchange.v1',
@@ -1409,6 +1430,8 @@ export function createMemoryRelayService(options: MemoryRelayOptions = {}): Memo
14091430
if (request.method !== 'GET') {
14101431
return methodNotAllowedResponse();
14111432
}
1433+
const peerAuthError = checkPeerAuth(request);
1434+
if (peerAuthError) return peerAuthError;
14121435
const after = normalizeAfter(url.searchParams.get('after'), 0);
14131436
const limit = normalizeLimit(url.searchParams.get('limit'), 100);
14141437
const peerRaw = (url.searchParams.get('peer') ?? '').trim();
@@ -1447,6 +1470,8 @@ export function createMemoryRelayService(options: MemoryRelayOptions = {}): Memo
14471470
if (request.method !== 'POST') {
14481471
return methodNotAllowedResponse();
14491472
}
1473+
const peerAuthError = checkPeerAuth(request);
1474+
if (peerAuthError) return peerAuthError;
14501475

14511476
let parsed: unknown;
14521477
try {
@@ -1569,6 +1594,8 @@ export function createMemoryRelayService(options: MemoryRelayOptions = {}): Memo
15691594
if (request.method !== 'GET') {
15701595
return methodNotAllowedResponse();
15711596
}
1597+
const peerAuthError = checkPeerAuth(request);
1598+
if (peerAuthError) return peerAuthError;
15721599
const room = normalizeRoom(url.searchParams.get('room'));
15731600
if (!isValidRoomName(room)) {
15741601
return invalidRoomResponse();
@@ -1607,6 +1634,8 @@ export function createMemoryRelayService(options: MemoryRelayOptions = {}): Memo
16071634
if (request.method !== 'GET') {
16081635
return methodNotAllowedResponse();
16091636
}
1637+
const peerAuthError = checkPeerAuth(request);
1638+
if (peerAuthError) return peerAuthError;
16101639
const room = normalizeRoom(url.searchParams.get('room'));
16111640
if (!isValidRoomName(room)) {
16121641
return invalidRoomResponse();

src/relay_cache_adapter.ts

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { CacheStore, CacheStoreListResult, CacheStoreObject } from './cache_store.ts';
2+
import type { IssueSourceOfTruth } from './memory_handler.ts';
23
import { createCachePersistenceQueue } from './cache_persistence_queue.ts';
34
import { canonicalizeJson, sha256Hex } from './signing.ts';
45
import {
@@ -42,6 +43,7 @@ export interface RelayCacheAdapterOptions {
4243
cacheStore: CacheStore | null;
4344
isValidRoomName: (room: string) => boolean;
4445
isValidTopic: (topic: string) => boolean;
46+
issueSourceOfTruth?: IssueSourceOfTruth;
4547
nowSec?: () => number;
4648
cachePersistMaxRetries?: number;
4749
cachePersistRetryBaseDelayMs?: number;
@@ -76,6 +78,9 @@ export interface RelayCacheAdapter {
7678
const DEFAULT_CACHE_PERSIST_MAX_RETRIES = 2;
7779
const DEFAULT_CACHE_PERSIST_RETRY_BASE_DELAY_MS = 20;
7880
const DEFAULT_CACHE_PERSIST_RETRY_MAX_DELAY_MS = 500;
81+
const DEFAULT_ISSUE_SOURCE_OF_TRUTH: IssueSourceOfTruth = 'last_write';
82+
83+
type NormalizedIssueSource = 'github' | 'bit' | null;
7984

8085
function nowEpochSec(): number {
8186
return Math.floor(Date.now() / 1000);
@@ -114,11 +119,58 @@ function describeError(error: unknown): string {
114119
return String(error);
115120
}
116121

122+
function normalizeIssueSourceOfTruth(raw: IssueSourceOfTruth | undefined): IssueSourceOfTruth {
123+
return raw === 'github' || raw === 'bit' ? raw : DEFAULT_ISSUE_SOURCE_OF_TRUTH;
124+
}
125+
126+
function parseIssueSource(payload: JsonValue): NormalizedIssueSource {
127+
if (!isObjectRecord(payload)) return null;
128+
if (typeof payload.source !== 'string') return null;
129+
const source = payload.source.trim().toLowerCase();
130+
if (source === 'github') return 'github';
131+
if (source === 'bit') return 'bit';
132+
return null;
133+
}
134+
135+
function isOlderByTimestamp(
136+
incomingSourceUpdatedAtMs: number | null,
137+
existingSourceUpdatedAtMs: number | null,
138+
): boolean {
139+
return incomingSourceUpdatedAtMs !== null &&
140+
existingSourceUpdatedAtMs !== null &&
141+
incomingSourceUpdatedAtMs < existingSourceUpdatedAtMs;
142+
}
143+
144+
function shouldUpdateSnapshotBySourcePolicy(args: {
145+
policy: IssueSourceOfTruth;
146+
incomingSource: NormalizedIssueSource;
147+
existingSource: NormalizedIssueSource;
148+
incomingSourceUpdatedAtMs: number | null;
149+
existingSourceUpdatedAtMs: number | null;
150+
}): boolean {
151+
if (isOlderByTimestamp(args.incomingSourceUpdatedAtMs, args.existingSourceUpdatedAtMs)) {
152+
return false;
153+
}
154+
if (args.policy === 'last_write') {
155+
return true;
156+
}
157+
158+
const preferred = args.policy;
159+
if (args.existingSource === preferred && args.incomingSource !== preferred) {
160+
return false;
161+
}
162+
if (args.existingSource !== preferred && args.incomingSource === preferred) {
163+
return true;
164+
}
165+
return true;
166+
}
167+
117168
export function createRelayCacheAdapter(options: RelayCacheAdapterOptions): RelayCacheAdapter {
118169
const cacheStore = options.cacheStore;
119170
const isValidRoomName = options.isValidRoomName;
120171
const isValidTopic = options.isValidTopic;
121172
const nowSec = options.nowSec ?? nowEpochSec;
173+
const issueSourceOfTruth = normalizeIssueSourceOfTruth(options.issueSourceOfTruth);
122174
const issueProjectionValidators: IssueProjectionValidators = {
123175
isValidRoomName,
124176
isValidTopic,
@@ -263,13 +315,16 @@ export function createRelayCacheAdapter(options: RelayCacheAdapterOptions): Rela
263315
);
264316
if (existing) {
265317
const parsed = parseIssueCacheSnapshotRecord(existing, issueProjectionValidators);
266-
if (
267-
parsed &&
268-
parsed.source_updated_at_ms !== null &&
269-
sourceUpdatedAtMs !== null &&
270-
sourceUpdatedAtMs < parsed.source_updated_at_ms
271-
) {
272-
shouldUpdateSnapshot = false;
318+
if (parsed) {
319+
const existingSource = parseIssueSource(parsed.envelope.payload);
320+
const incomingSource = parseIssueSource(envelope.payload);
321+
shouldUpdateSnapshot = shouldUpdateSnapshotBySourcePolicy({
322+
policy: issueSourceOfTruth,
323+
incomingSource,
324+
existingSource,
325+
incomingSourceUpdatedAtMs: sourceUpdatedAtMs,
326+
existingSourceUpdatedAtMs: parsed.source_updated_at_ms,
327+
});
273328
}
274329
}
275330
} catch {

src/runtime_config.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,14 @@ function parseOptionalString(raw: string | undefined): string | null {
8888
return value.length > 0 ? value : null;
8989
}
9090

91+
function parseIssueSourceOfTruth(
92+
raw: string | undefined,
93+
): 'last_write' | 'github' | 'bit' {
94+
const value = (raw ?? '').trim().toLowerCase();
95+
if (value === 'github' || value === 'bit') return value;
96+
return 'last_write';
97+
}
98+
9199
function parseRoomTokens(raw: string | undefined): Record<string, string> {
92100
if (typeof raw !== 'string' || raw.trim().length === 0) return {};
93101
try {
@@ -294,6 +302,8 @@ export function parseMemoryRelayOptionsFromEnv(getEnv: EnvGetter): MemoryRelayOp
294302

295303
return {
296304
authToken: getEnv('BIT_RELAY_AUTH_TOKEN') ?? undefined,
305+
peerAuthToken: getEnv('RELAY_PEER_AUTH_TOKEN') ?? undefined,
306+
issueSourceOfTruth: parseIssueSourceOfTruth(getEnv('RELAY_ISSUE_SOURCE_OF_TRUTH')),
297307
maxMessagesPerRoom: parsePositiveInt(
298308
getEnv('RELAY_MAX_MESSAGES_PER_ROOM'),
299309
DEFAULT_MAX_MESSAGES_PER_ROOM,

0 commit comments

Comments
 (0)