Skip to content

Commit b39868d

Browse files
committed
feat: add repository affinity peer sync and r2 failover e2e
1 parent e3dbdb7 commit b39868d

9 files changed

Lines changed: 728 additions & 2 deletions

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ optional env:
177177
- `RELAY_PEERS_JSON` (JSON: peer relay URLs, `RELAY_PEERS` より優先)
178178
- `RELAY_PEER_AUTH_TOKEN` (peer pull 時の Bearer token)
179179
- `RELAY_PEER_SYNC_INTERVAL_SEC` (default: `30`)
180+
- `RELAY_PEER_REPO_FF_COMMIT_WINDOW` (default: `30`, peer 間 repo 判定で見る commit 数)
181+
- `RELAY_REPO_ID` (明示 repo ID, 例: `bit-vcs/bit`)
182+
- `RELAY_REPO_ORIGIN_URL` (origin URL の明示上書き)
183+
- `RELAY_REPO_RECENT_COMMITS` (CSV: 最近 commit hash)
180184
- `RELAY_CACHE_PROVIDER` (`memory` or `r2`, default: `memory`)
181185
- `RELAY_CACHE_TTL_SEC` (default: `86400`)
182186
- `RELAY_CACHE_MAX_BYTES` (任意、上限 bytes)
@@ -189,6 +193,9 @@ optional env:
189193
- `RELAY_TRIGGER_REF_PREFIXES` (CSV, default: `refs/relay/incoming/`)
190194
- `RELAY_CONFIG_JSON` (JSON override)
191195

196+
`RELAY_REPO_ID` を指定しない場合は `git remote.origin.url` と直近 commit を参照して、 peer cache
197+
sync の対象 repo を自動推定します(`deno task dev``git` 実行権限付き)。
198+
192199
互換モード(従来の unsigned publish 許可):
193200

194201
```bash

deno.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"test:unit": "deno task test:layer unit",
1010
"test:integration": "deno task test:layer integration",
1111
"test:e2e": "deno task test:layer e2e",
12-
"dev": "deno run --allow-net --allow-env --watch src/deno_main.ts",
12+
"dev": "deno run --allow-net --allow-env --allow-run=git --watch src/deno_main.ts",
1313
"mcp": "deno run --allow-run --allow-net --allow-env --allow-read src/mcp_server.ts",
1414
"fmt": "deno fmt",
1515
"lint": "deno lint"

src/deno_main.ts

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ import {
1616
safeWriteGitCache,
1717
} from './git_cache_layer.ts';
1818
import { createWebhookTriggerDispatcher } from './trigger_dispatcher.ts';
19+
import {
20+
decideRepositoryCompatibility,
21+
parseRepositoryFromDiscoveryBody,
22+
type RelayRepositoryIdentity,
23+
resolveLocalRepositoryIdentity,
24+
} from './repository_affinity.ts';
1925

2026
function parsePositiveInt(raw: string | undefined, fallback: number): number {
2127
const value = Number.parseInt(raw ?? '', 10);
@@ -25,6 +31,17 @@ function parsePositiveInt(raw: string | undefined, fallback: number): number {
2531
return Math.trunc(value);
2632
}
2733

34+
function parseCsvList(raw: string | undefined): string[] {
35+
if (typeof raw !== 'string' || raw.trim().length === 0) return [];
36+
const dedupe = new Set<string>();
37+
for (const part of raw.split(',')) {
38+
const value = part.trim();
39+
if (value.length === 0) continue;
40+
dedupe.add(value);
41+
}
42+
return [...dedupe];
43+
}
44+
2845
const SESSION_ID_PATTERN = /^[A-Za-z0-9]{6,16}$/;
2946
const NAMED_SESSION_PATTERN = /^[A-Za-z0-9][A-Za-z0-9._-]{0,38}\/[A-Za-z0-9][A-Za-z0-9._-]{0,63}$/;
3047

@@ -46,6 +63,16 @@ function generateSessionId(): string {
4663
const host = Deno.env.get('HOST') ?? '127.0.0.1';
4764
const port = parsePositiveInt(Deno.env.get('PORT') ?? undefined, 8788);
4865
const runtimeConfig = parseRelayRuntimeConfigFromEnv((key) => Deno.env.get(key) ?? undefined);
66+
const repoFfCommitWindow = parsePositiveInt(
67+
Deno.env.get('RELAY_PEER_REPO_FF_COMMIT_WINDOW') ?? undefined,
68+
30,
69+
);
70+
const localRepositoryIdentity = await resolveLocalRepositoryIdentity({
71+
explicitRepoId: Deno.env.get('RELAY_REPO_ID') ?? undefined,
72+
explicitOriginUrl: Deno.env.get('RELAY_REPO_ORIGIN_URL') ?? undefined,
73+
explicitRecentCommits: parseCsvList(Deno.env.get('RELAY_REPO_RECENT_COMMITS') ?? undefined),
74+
commitWindow: repoFfCommitWindow,
75+
});
4976
const relayCacheStore = createMemoryCacheStore({
5077
ttlSec: runtimeConfig.cache.ttlSec,
5178
maxBytes: runtimeConfig.cache.maxBytes,
@@ -55,6 +82,10 @@ const service = createMemoryRelayService({
5582
peerRelayUrls: runtimeConfig.peers.urls.length > 0
5683
? runtimeConfig.peers.urls
5784
: runtimeConfig.relay.peerRelayUrls,
85+
repositoryId: localRepositoryIdentity.repoId ?? undefined,
86+
repositoryOwner: localRepositoryIdentity.owner ?? undefined,
87+
repositoryName: localRepositoryIdentity.name ?? undefined,
88+
repositoryRecentCommits: localRepositoryIdentity.recentCommits,
5889
cacheStore: relayCacheStore,
5990
githubWebhookSecret: runtimeConfig.github.webhookSecret ?? undefined,
6091
});
@@ -117,6 +148,11 @@ function parseCacheExchangePullBody(body: Record<string, unknown>): {
117148
return { entries, nextCursor };
118149
}
119150

151+
function parsePeerNodeIdFromDiscoveryBody(body: Record<string, unknown>): string | null {
152+
const raw = typeof body.node_id === 'string' ? body.node_id.trim() : '';
153+
return raw.length > 0 ? raw : null;
154+
}
155+
120156
function nowEpochSec(): number {
121157
return Math.floor(Date.now() / 1000);
122158
}
@@ -173,10 +209,84 @@ async function startCacheSyncWorker(): Promise<void> {
173209
return;
174210
}
175211

212+
const repositoryScopedSyncEnabled = localRepositoryIdentity.name !== null;
213+
if (!repositoryScopedSyncEnabled) {
214+
console.warn(
215+
'[bit-relay] repository affinity disabled: set RELAY_REPO_ID or enable git origin detection',
216+
);
217+
}
218+
219+
const discoveryCacheTtlMs = Math.max(5, runtimeConfig.peers.syncIntervalSec) * 1000;
220+
const loggedRepositoryMismatchPeers = new Set<string>();
221+
const peerDiscoveryCache = new Map<
222+
string,
223+
{ expiresAt: number; nodeId: string | null; repository: RelayRepositoryIdentity | null }
224+
>();
225+
226+
async function fetchPeerDiscovery(
227+
peer: string,
228+
): Promise<{ nodeId: string | null; repository: RelayRepositoryIdentity | null }> {
229+
const url = new URL('/api/v1/cache/exchange/discovery', peer);
230+
const headers = new Headers();
231+
if (peerSyncAuthToken.length > 0) {
232+
headers.set('authorization', `Bearer ${peerSyncAuthToken}`);
233+
}
234+
const response = await fetch(url.toString(), {
235+
method: 'GET',
236+
headers,
237+
});
238+
if (response.status !== 200) {
239+
const text = await response.text();
240+
throw new Error(
241+
`peer discovery failed: peer=${peer}, status=${response.status}, body=${text}`,
242+
);
243+
}
244+
const body = await response.json() as Record<string, unknown>;
245+
return {
246+
nodeId: parsePeerNodeIdFromDiscoveryBody(body),
247+
repository: parseRepositoryFromDiscoveryBody(body, repoFfCommitWindow),
248+
};
249+
}
250+
251+
async function resolvePeerDiscovery(
252+
peer: string,
253+
): Promise<{ nodeId: string | null; repository: RelayRepositoryIdentity | null }> {
254+
const cached = peerDiscoveryCache.get(peer);
255+
if (cached && Date.now() < cached.expiresAt) {
256+
return { nodeId: cached.nodeId, repository: cached.repository };
257+
}
258+
const discovered = await fetchPeerDiscovery(peer);
259+
peerDiscoveryCache.set(peer, {
260+
expiresAt: Date.now() + discoveryCacheTtlMs,
261+
nodeId: discovered.nodeId,
262+
repository: discovered.repository,
263+
});
264+
return discovered;
265+
}
266+
176267
const worker = createCacheSyncWorker({
177268
peers,
178269
limit: 200,
179270
async pullFromPeer({ peer, after, limit }) {
271+
if (repositoryScopedSyncEnabled) {
272+
const peerDiscovery = await resolvePeerDiscovery(peer);
273+
const decision = decideRepositoryCompatibility(
274+
localRepositoryIdentity,
275+
peerDiscovery.repository,
276+
);
277+
if (!decision.compatible) {
278+
if (!loggedRepositoryMismatchPeers.has(peer)) {
279+
const peerNode = peerDiscovery.nodeId ?? peer;
280+
console.warn(
281+
`[bit-relay] cache-sync skipped peer=${peerNode} reason=${decision.reason}`,
282+
);
283+
loggedRepositoryMismatchPeers.add(peer);
284+
}
285+
return { entries: [], nextCursor: after };
286+
}
287+
loggedRepositoryMismatchPeers.delete(peer);
288+
}
289+
180290
const url = new URL('/api/v1/cache/exchange/pull', peer);
181291
url.searchParams.set('after', String(after));
182292
url.searchParams.set('limit', String(limit));

src/memory_handler.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ export interface MemoryRelayOptions {
197197
wsIdleTimeoutMs?: number;
198198
relayNodeId?: string;
199199
peerRelayUrls?: string[];
200+
repositoryId?: string;
201+
repositoryOwner?: string;
202+
repositoryName?: string;
203+
repositoryRecentCommits?: string[];
200204
cacheExchangeMaxHops?: number;
201205
cacheExchangeMaxRecords?: number;
202206
cacheStore?: CacheStore;
@@ -238,6 +242,7 @@ const MAX_GITHUB_WEBHOOK_DLQ_ENTRIES = 10_000;
238242
const GITHUB_WEBHOOK_RETRY_BASE_SEC = 30;
239243
const INCOMING_TRIGGER_REF_PREFIX = 'refs/relay/incoming/';
240244
const ROOM_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9._-]{0,63}$/;
245+
const COMMIT_HASH_PATTERN = /^[0-9a-f]{7,40}$/i;
241246
const TOPIC_PATTERN = /^[a-z][a-z0-9._-]{0,63}$/;
242247
const PR_ID_PATTERN = /^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$/;
243248
const NODE_ID_PATTERN = /^[A-Za-z0-9][A-Za-z0-9._-]{0,63}$/;
@@ -330,6 +335,66 @@ function normalizePeerRelayUrls(raw: string[] | undefined): string[] {
330335
return [...dedupe];
331336
}
332337

338+
interface NormalizedRepositoryMetadata {
339+
repoId: string | null;
340+
owner: string | null;
341+
name: string | null;
342+
recentCommits: string[];
343+
}
344+
345+
function normalizeRepositoryToken(raw: string | undefined): string | null {
346+
const value = (raw ?? '').trim().toLowerCase();
347+
if (value.length === 0) return null;
348+
if (!/^[a-z0-9._-]+$/.test(value)) return null;
349+
return value;
350+
}
351+
352+
function normalizeRepositoryCommits(raw: string[] | undefined): string[] {
353+
if (!Array.isArray(raw)) return [];
354+
const dedupe = new Set<string>();
355+
for (const value of raw) {
356+
if (typeof value !== 'string') continue;
357+
const normalized = value.trim().toLowerCase();
358+
if (!COMMIT_HASH_PATTERN.test(normalized)) continue;
359+
dedupe.add(normalized);
360+
if (dedupe.size >= 30) break;
361+
}
362+
return [...dedupe];
363+
}
364+
365+
function parseOwnerNameFromRepositoryId(
366+
repoId: string,
367+
): { owner: string | null; name: string | null } {
368+
const trimmed = repoId.trim().replace(/^github:/i, '');
369+
const cleaned = trimmed.endsWith('.git') ? trimmed.slice(0, -4) : trimmed;
370+
const segments = cleaned.split('/').map((entry) => entry.trim()).filter((entry) =>
371+
entry.length > 0
372+
);
373+
if (segments.length === 0) {
374+
return { owner: null, name: null };
375+
}
376+
const name = normalizeRepositoryToken(segments[segments.length - 1]) ?? null;
377+
const owner = segments.length >= 2
378+
? normalizeRepositoryToken(segments[segments.length - 2])
379+
: null;
380+
return { owner, name };
381+
}
382+
383+
function normalizeRepositoryMetadata(options: MemoryRelayOptions): NormalizedRepositoryMetadata {
384+
const repoIdRaw = (options.repositoryId ?? '').trim();
385+
const repoId = repoIdRaw.length > 0 ? repoIdRaw.replace(/^github:/i, '').toLowerCase() : null;
386+
const fromRepoId = repoId ? parseOwnerNameFromRepositoryId(repoId) : { owner: null, name: null };
387+
const owner = normalizeRepositoryToken(options.repositoryOwner) ?? fromRepoId.owner;
388+
const name = normalizeRepositoryToken(options.repositoryName) ?? fromRepoId.name;
389+
const recentCommits = normalizeRepositoryCommits(options.repositoryRecentCommits);
390+
return {
391+
repoId,
392+
owner,
393+
name,
394+
recentCommits,
395+
};
396+
}
397+
333398
function isValidRoomName(room: string): boolean {
334399
return ROOM_NAME_PATTERN.test(room);
335400
}
@@ -1088,6 +1153,7 @@ export function createMemoryRelayService(options: MemoryRelayOptions = {}): Memo
10881153
);
10891154
const relayNodeId = normalizeRelayNodeId(options.relayNodeId);
10901155
const peerRelayUrls = normalizePeerRelayUrls(options.peerRelayUrls);
1156+
const repositoryMetadata = normalizeRepositoryMetadata(options);
10911157
const cacheExchangeMaxHops = Math.max(
10921158
1,
10931159
Math.trunc(options.cacheExchangeMaxHops ?? DEFAULT_CACHE_EXCHANGE_MAX_HOPS),
@@ -1330,6 +1396,12 @@ export function createMemoryRelayService(options: MemoryRelayOptions = {}): Memo
13301396
node_id: relayNodeId,
13311397
peers: peerRelayUrls,
13321398
max_hops: cacheExchangeMaxHops,
1399+
repository: {
1400+
repo_id: repositoryMetadata.repoId,
1401+
owner: repositoryMetadata.owner,
1402+
name: repositoryMetadata.name,
1403+
recent_commits: repositoryMetadata.recentCommits,
1404+
},
13331405
}, { status: 200 });
13341406
}
13351407

0 commit comments

Comments
 (0)