Skip to content

Commit 96a63c5

Browse files
committed
refactor(provisioning): TypeScript style improvements from audit
- Import types from @kubernetesjs/ops (ServingKnativeDevV1Service, Secret, Namespace, InterwebClient) instead of hand-rolling interfaces - Define typed DB row interfaces (NamespaceRow, SecretRow, FunctionDefinitionRow) — use pool.query<T>() eliminating unsafe casts - Generic ProvisioningHandler<P, R> type (typed payload/result per handler) - Pass ComputeModuleLoader through ProvisioningContext from compute-worker (shared TTL-cached instance, stop creating per-call) - Extract mergeAndReplace() into k8s-ops.ts (proper module boundary) - buildKnativeServiceSpec() accepts KnativeBuilderInput (Pick<FunctionDefinitionRow>) - DI pattern for K8s client: createK8sClient(url) pure factory + getK8sClient() env-reading convenience at the edge - Decompose seed into provisionNamespaces/syncNamespaceSecrets/provisionFunctions - K8s error type guards with proper type predicates - Update tests to match new handler signatures
1 parent 18a048b commit 96a63c5

11 files changed

Lines changed: 473 additions & 382 deletions

File tree

job/compute-worker/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ export default class ComputeWorker {
341341

342342
private async doWorkProvisioning(
343343
job: ComputeJobRow,
344-
handler: (payload: Record<string, unknown>, context: { pool: import('pg').Pool; databaseId: string }) => Promise<Record<string, unknown>>,
344+
handler: (payload: Record<string, unknown>, context: import('@constructive-io/provisioning-handlers').ProvisioningContext) => Promise<Record<string, unknown>>,
345345
payload: Record<string, unknown>
346346
): Promise<void> {
347347
const { task_identifier } = job;
@@ -364,6 +364,7 @@ export default class ComputeWorker {
364364
const result = await handler(payload, {
365365
pool: this.pgPool,
366366
databaseId,
367+
loader: this.loader,
367368
});
368369

369370
const elapsed = process.hrtime(reqStart);
Lines changed: 48 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,81 @@
11
/**
2-
* function:sync-resources — updates an existing Knative Service spec
3-
* when a function definition's resource config changes.
4-
* Idempotent: replaces the Service spec in-place.
2+
* Handler: function:sync-resources
53
*
6-
* Queue handler — triggered by DB trigger on function_definitions UPDATE.
7-
* Assumes the Knative Service was already created by the seed script.
4+
* Queue handler — triggered by DB events on function_definitions UPDATE
5+
* (scaling changes, image updates, resource adjustments).
6+
*
7+
* Reads the updated function definition, rebuilds the Knative Service spec,
8+
* and replaces it via the merge-and-replace workflow.
9+
*
10+
* Assumes the Knative Service already exists (created by the seed).
811
*/
912

10-
import { ComputeModuleLoader } from '@constructive-io/module-loader';
1113
import { Logger } from '@pgpmjs/logger';
1214

13-
import { getK8sClient, isNotFound } from '../k8s-client';
15+
import { getK8sClient } from '../k8s-client';
16+
import { mergeAndReplace } from '../k8s-ops';
1417
import { buildKnativeServiceSpec, resolveNamespaceName } from '../knative';
15-
import { mergeAndReplace } from '../seed';
16-
import type { ProvisioningContext, ProvisioningHandler } from '../types';
18+
import type {
19+
FunctionDefinitionRow,
20+
ProvisioningHandler,
21+
SyncResourcesPayload,
22+
SyncResourcesResult,
23+
} from '../types';
1724

18-
const log = new Logger('provisioning:function-sync');
25+
const log = new Logger('provisioning:function-sync-resources');
1926

20-
export const handleFunctionSyncResources: ProvisioningHandler = async (
21-
payload: Record<string, unknown>,
22-
context: ProvisioningContext
23-
): Promise<Record<string, unknown>> => {
24-
const { pool, databaseId } = context;
25-
const functionId = payload.id as string;
27+
export const functionSyncResources: ProvisioningHandler<SyncResourcesPayload, SyncResourcesResult> = async (
28+
payload,
29+
{ pool, databaseId, loader }
30+
) => {
31+
const functionId = payload.id;
2632

27-
if (!functionId) {
28-
throw new Error('function:sync-resources — missing "id" in payload');
33+
const client = getK8sClient();
34+
if (!client) {
35+
log.info('[dev-mode] skipping function:sync-resources (no K8S_API_URL)');
36+
return { skipped: true, reason: 'no-k8s' };
2937
}
3038

31-
const loader = new ComputeModuleLoader(pool);
39+
// Resolve table names via shared module loader (TTL-cached instance)
3240
const config = await loader.load(databaseId);
3341
const publicSchema = config.functionModule?.publicSchema ?? 'constructive_compute_public';
3442
const definitionsTable = config.functionModule?.definitionsTable ?? 'platform_function_definitions';
3543

36-
const { rows } = await pool.query(
44+
const { rows } = await pool.query<FunctionDefinitionRow>(
3745
`SELECT id, name, task_identifier, service_url, runtime, image,
38-
concurrency, scale_min, scale_max, timeout_seconds, resources,
46+
concurrency, scale_min, scale_max, scale_target, timeout_seconds, resources,
3947
namespace_id
4048
FROM "${publicSchema}"."${definitionsTable}"
4149
WHERE id = $1`,
4250
[functionId]
4351
);
4452

4553
if (rows.length === 0) {
46-
throw new Error(`function:sync-resources — function_definition id=${functionId} not found`);
47-
}
48-
49-
const fnRow = rows[0] as Record<string, unknown>;
50-
51-
if (fnRow.runtime === 'inline' || !fnRow.image) {
52-
log.info(
53-
`skipping sync for "${fnRow.name}" — ${fnRow.runtime === 'inline' ? 'inline runtime' : 'no image'}`
54-
);
55-
return { skipped: true, reason: fnRow.runtime === 'inline' ? 'inline-runtime' : 'no-image' };
54+
log.warn(`function definition not found: ${functionId}`);
55+
return { skipped: true, reason: 'not-found' };
5656
}
5757

58-
const client = getK8sClient();
59-
if (!client) {
60-
log.info(
61-
`[dev-mode] would sync Knative Service resources for "${fnRow.name}" — skipping (no K8S_API_URL)`
62-
);
63-
return { skipped: true, reason: 'no-k8s' };
58+
const fnRow = rows[0];
59+
if (!fnRow.image || fnRow.runtime === 'inline') {
60+
log.info(`skipping inline/image-less function: ${fnRow.name}`);
61+
return { skipped: true, reason: 'inline-or-no-image' };
6462
}
6563

66-
const namespaceName = await resolveNamespaceName(pool, fnRow.namespace_id as string | null);
67-
const fnName = fnRow.name as string;
64+
const namespaceName = await resolveNamespaceName(pool, fnRow.namespace_id);
6865
const serviceSpec = buildKnativeServiceSpec(fnRow, namespaceName);
6966

70-
try {
71-
const svc = await mergeAndReplace(client, serviceSpec, fnName, namespaceName);
72-
const serviceUrl = svc?.status?.url ?? svc?.status?.address?.url ?? null;
73-
log.info(`updated Knative Service "${fnName}" in namespace "${namespaceName}"`);
74-
75-
if (serviceUrl && serviceUrl !== fnRow.service_url) {
76-
await pool.query(
77-
`UPDATE "${publicSchema}"."${definitionsTable}" SET service_url = $1 WHERE id = $2`,
78-
[serviceUrl, functionId]
79-
);
80-
log.info(`updated service_url for "${fnName}" → ${serviceUrl}`);
81-
}
67+
const svc = await mergeAndReplace(client, serviceSpec, fnRow.name, namespaceName);
68+
const serviceUrl = (svc?.status as any)?.url ?? (svc?.status as any)?.address?.url ?? null;
8269

83-
return { synced: true, name: fnName, serviceUrl };
84-
} catch (err: unknown) {
85-
if (isNotFound(err)) {
86-
log.warn(
87-
`Knative Service "${fnName}" not found in "${namespaceName}" — run the provision seed first`
88-
);
89-
return { skipped: true, reason: 'service-not-found' };
90-
}
91-
throw err;
70+
// Write back updated service_url if available
71+
if (serviceUrl && serviceUrl !== fnRow.service_url) {
72+
await pool.query(
73+
`UPDATE "${publicSchema}"."${definitionsTable}" SET service_url = $1 WHERE id = $2`,
74+
[serviceUrl, fnRow.id]
75+
);
76+
log.info(`updated service_url for "${fnRow.name}" → ${serviceUrl}`);
9277
}
78+
79+
log.info(`synced Knative Service "${fnRow.name}" in "${namespaceName}"`);
80+
return { synced: true, name: fnRow.name, serviceUrl };
9381
};

packages/provisioning-handlers/src/handlers/namespace-sync-secrets.ts

Lines changed: 58 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,69 @@
11
/**
2-
* namespace:sync-secrets — reads secrets for a namespace from DB,
3-
* decrypts them via pgp_sym_decrypt, and syncs to a K8s Secret.
4-
* Idempotent: creates or replaces the K8s Secret resource.
2+
* Handler: namespace:sync-secrets
53
*
6-
* Queue handler — triggered by DB trigger on namespace_secret changes.
4+
* Queue handler — triggered by DB events on namespace_secret INSERT/UPDATE/DELETE.
5+
* Reads all secrets for a namespace, decrypts them, and creates/replaces
6+
* the aggregate K8s Secret in the namespace.
7+
*
8+
* Assumes the K8s namespace already exists (created by the seed).
79
*/
810

11+
import type { Secret } from '@kubernetesjs/ops';
912
import { Logger } from '@pgpmjs/logger';
1013

1114
import { getK8sClient, isConflict } from '../k8s-client';
12-
import type { ProvisioningContext, ProvisioningHandler } from '../types';
15+
import type {
16+
NamespaceRow,
17+
ProvisioningHandler,
18+
SecretRow,
19+
SyncSecretsPayload,
20+
SyncSecretsResult,
21+
} from '../types';
1322

14-
const log = new Logger('provisioning:namespace-secrets');
23+
const log = new Logger('provisioning:namespace-sync-secrets');
1524

16-
export const handleNamespaceSyncSecrets: ProvisioningHandler = async (
17-
payload: Record<string, unknown>,
18-
context: ProvisioningContext
19-
): Promise<Record<string, unknown>> => {
20-
const { pool } = context;
21-
const namespaceId = payload.id as string | undefined;
22-
const namespaceName = payload.namespace_name as string | undefined;
25+
export const namespaceSyncSecrets: ProvisioningHandler<SyncSecretsPayload, SyncSecretsResult> = async (
26+
payload,
27+
{ pool }
28+
) => {
29+
const namespaceId = payload.id;
30+
const namespaceName = payload.namespace_name;
2331

24-
if (!namespaceId && !namespaceName) {
25-
throw new Error('namespace:sync-secrets — missing "id" or "namespace_name" in payload');
32+
const client = getK8sClient();
33+
if (!client) {
34+
log.info('[dev-mode] skipping namespace:sync-secrets (no K8S_API_URL)');
35+
return { skipped: true, reason: 'no-k8s' };
2636
}
2737

28-
// Resolve namespace name from id if not provided directly
29-
let resolvedName = namespaceName;
30-
let resolvedId = namespaceId;
31-
if (!resolvedName && resolvedId) {
32-
const { rows } = await pool.query(
33-
`SELECT name FROM metaschema_public.namespace WHERE id = $1`,
34-
[resolvedId]
35-
);
36-
if (rows.length === 0) {
37-
throw new Error(`namespace:sync-secrets — namespace id=${resolvedId} not found`);
38+
// Resolve namespace name from ID if not provided directly
39+
let resolvedName: string;
40+
let resolvedId: string | undefined;
41+
if (namespaceName) {
42+
resolvedName = namespaceName;
43+
if (!namespaceId) {
44+
const { rows } = await pool.query<NamespaceRow>(
45+
`SELECT id, name FROM metaschema_public.namespace WHERE name = $1`,
46+
[namespaceName]
47+
);
48+
if (rows.length === 0) return { skipped: true, reason: 'namespace-not-found' };
49+
resolvedId = rows[0].id;
50+
} else {
51+
resolvedId = namespaceId;
3852
}
39-
resolvedName = rows[0].name as string;
40-
}
41-
if (!resolvedId && resolvedName) {
42-
const { rows } = await pool.query(
43-
`SELECT id FROM metaschema_public.namespace WHERE name = $1`,
44-
[resolvedName]
53+
} else if (namespaceId) {
54+
resolvedId = namespaceId;
55+
const { rows } = await pool.query<NamespaceRow>(
56+
`SELECT id, name FROM metaschema_public.namespace WHERE id = $1`,
57+
[namespaceId]
4558
);
46-
if (rows.length > 0) {
47-
resolvedId = rows[0].id as string;
48-
}
59+
if (rows.length === 0) return { skipped: true, reason: 'namespace-not-found' };
60+
resolvedName = rows[0].name;
61+
} else {
62+
return { skipped: true, reason: 'missing-id-and-name' };
4963
}
5064

51-
// Read and decrypt secrets for this namespace using pgp_sym_decrypt
52-
const { rows: secretRows } = await pool.query(
65+
// Fetch and decrypt secrets
66+
const { rows: secretRows } = await pool.query<SecretRow>(
5367
`SELECT key, pgp_sym_decrypt(value, key_id::text) AS decrypted_value
5468
FROM metaschema_public.namespace_secret
5569
WHERE namespace_id = $1`,
@@ -58,21 +72,13 @@ export const handleNamespaceSyncSecrets: ProvisioningHandler = async (
5872

5973
const secretData: Record<string, string> = {};
6074
for (const row of secretRows) {
61-
secretData[row.key as string] = Buffer.from(row.decrypted_value as string).toString('base64');
62-
}
63-
64-
const client = getK8sClient();
65-
if (!client) {
66-
log.info(
67-
`[dev-mode] would sync ${secretRows.length} secret(s) for namespace "${resolvedName}" — skipping (no K8S_API_URL)`
68-
);
69-
return { skipped: true, reason: 'no-k8s' };
75+
secretData[row.key] = Buffer.from(row.decrypted_value).toString('base64');
7076
}
7177

7278
const secretName = `${resolvedName}-secrets`;
73-
const secretBody = {
74-
apiVersion: 'v1' as const,
75-
kind: 'Secret' as const,
79+
const secretBody: Secret = {
80+
apiVersion: 'v1',
81+
kind: 'Secret',
7682
metadata: { name: secretName },
7783
data: secretData,
7884
type: 'Opaque',
@@ -81,19 +87,18 @@ export const handleNamespaceSyncSecrets: ProvisioningHandler = async (
8187
try {
8288
await client.createCoreV1NamespacedSecret({
8389
query: {},
84-
path: { namespace: resolvedName! },
90+
path: { namespace: resolvedName },
8591
body: secretBody,
8692
});
87-
log.info(`created K8s secret "${secretName}" in namespace "${resolvedName}" with ${secretRows.length} key(s)`);
93+
log.info(`created K8s secret "${secretName}" with ${secretRows.length} key(s)`);
8894
} catch (err: unknown) {
8995
if (isConflict(err)) {
90-
log.info(`K8s secret "${secretName}" already exists — replacing`);
9196
await client.replaceCoreV1NamespacedSecret({
9297
query: {},
93-
path: { name: secretName, namespace: resolvedName! },
98+
path: { name: secretName, namespace: resolvedName },
9499
body: secretBody,
95100
});
96-
log.info(`replaced K8s secret "${secretName}" in namespace "${resolvedName}"`);
101+
log.info(`replaced K8s secret "${secretName}" with ${secretRows.length} key(s)`);
97102
} else {
98103
throw err;
99104
}
Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,30 @@
1-
export type { ProvisioningContext, ProvisioningHandler } from './types';
1+
// Types
2+
export type {
3+
FunctionDefinitionRow,
4+
NamespaceRow,
5+
ProvisioningContext,
6+
ProvisioningHandler,
7+
SecretRow,
8+
SyncResourcesPayload,
9+
SyncResourcesResult,
10+
SyncSecretsPayload,
11+
SyncSecretsResult,
12+
} from './types';
13+
14+
// Registry
215
export {
316
getProvisioningHandler,
417
registerProvisioningHandler,
518
} from './registry';
6-
export { getK8sClient, isConflict, isNotFound } from './k8s-client';
7-
export type { KnativeServiceSpec } from './knative';
19+
20+
// K8s client + utilities
21+
export { createK8sClient, getK8sClient, isConflict, isNotFound } from './k8s-client';
22+
export { mergeAndReplace } from './k8s-ops';
23+
24+
// Knative builder
25+
export type { KnativeBuilderInput, KnativeServiceSpec } from './knative';
826
export { buildKnativeServiceSpec, resolveNamespaceName } from './knative';
27+
28+
// Seed
929
export type { ProvisionSeedOptions, ProvisionSeedResult } from './seed';
10-
export { mergeAndReplace, provision } from './seed';
30+
export { provision } from './seed';

0 commit comments

Comments
 (0)