Implement real-time event subscriptions for metadata and data changes#1107
Implement real-time event subscriptions for metadata and data changes#1107
Conversation
- Add packages/spec/src/api/events.zod.ts with MetadataEvent and DataEvent schemas - Extend IRealtimeService with RealtimeSubscriptionFilter - Add subscribeMetadata() and subscribeData() convenience methods Agent-Logs-Url: https://github.com/objectstack-ai/framework/sessions/6a38da2b-8234-4c22-b456-ec507a8f82f2 Co-authored-by: hotlong <50353452+hotlong@users.noreply.github.com>
- Add IRealtimeService import to MetadataManager
- Add setRealtimeService() method to MetadataManager
- Publish metadata.{type}.created events in register()
- Publish metadata.{type}.deleted events in unregister()
- Bridge realtime service in MetadataPlugin.start()
Agent-Logs-Url: https://github.com/objectstack-ai/framework/sessions/6a38da2b-8234-4c22-b456-ec507a8f82f2
Co-authored-by: hotlong <50353452+hotlong@users.noreply.github.com>
- Add IRealtimeService import to ObjectQL engine - Add setRealtimeService() method to ObjectQL - Publish data.record.created events in insert() - Publish data.record.updated events in update() - Publish data.record.deleted events in delete() - Bridge realtime service in ObjectQLPlugin.start() Agent-Logs-Url: https://github.com/objectstack-ai/framework/sessions/6a38da2b-8234-4c22-b456-ec507a8f82f2 Co-authored-by: hotlong <50353452+hotlong@users.noreply.github.com>
- Integrated RealtimeAPI into ObjectStackClient as `events` property - Created React hooks for metadata and data subscriptions - Updated Studio sidebar to use real-time subscriptions - Sidebar now auto-refreshes when metadata changes Agent-Logs-Url: https://github.com/objectstack-ai/framework/sessions/386d2961-d841-4689-8bcd-25d05fa253dd Co-authored-by: hotlong <50353452+hotlong@users.noreply.github.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
- Fixed type errors in objectql/engine.ts for recordId extraction - Added type guards in objectql/plugin.ts and metadata/plugin.ts - Fixed client initialization order with getter for events API - Suppressed intentionally unused fields in RealtimeAPI - Fixed unused parameter in useAutoRefresh hook Agent-Logs-Url: https://github.com/objectstack-ai/framework/sessions/22f63fd8-a2de-4140-be84-0f902aaa8d33 Co-authored-by: hotlong <50353452+hotlong@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Implements a first pass at real-time metadata/data change propagation across the stack (spec event definitions + backend publishers + client/React subscription APIs) to enable Studio UI auto-refresh (e.g., sidebar updates without manual reload).
Changes:
- Added event type/payload definitions in
@objectstack/specand exported them via the API index. - Wired backend metadata + ObjectQL operations to publish realtime events via
IRealtimeServicewhen available. - Added client SDK + React hooks for subscribing to metadata/data change events and hooked Studio sidebar to refetch on metadata changes.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/spec/src/contracts/realtime-service.ts | Extends realtime service contract with enhanced filter + convenience subscribe methods. |
| packages/spec/src/api/index.ts | Exports the new events schema module. |
| packages/spec/src/api/events.zod.ts | Introduces Zod definitions for metadata/data realtime event types and payloads. |
| packages/objectql/src/plugin.ts | Bridges kernel realtime service into ObjectQL so engine can publish data events. |
| packages/objectql/src/engine.ts | Publishes `data.record.{created |
| packages/metadata/src/plugin.ts | Bridges kernel realtime service into MetadataManager for metadata event publishing. |
| packages/metadata/src/metadata-manager.ts | Publishes metadata create/delete events during register/unregister operations. |
| packages/client/src/realtime-api.ts | Adds client-side realtime subscription API (buffer/poll placeholder + filtering). |
| packages/client/src/index.ts | Integrates RealtimeAPI as client.events and re-exports realtime API types. |
| packages/client-react/src/realtime-hooks.tsx | Adds React hooks for metadata/data subscriptions, callbacks, connection status, auto-refresh. |
| packages/client-react/src/index.tsx | Exports realtime hooks from the React client package. |
| apps/studio/src/components/app-sidebar.tsx | Subscribes to metadata events and triggers loadMetadata() on changes for live sidebar updates. |
| export const MetadataEventSchema = z.object({ | ||
| /** Unique event identifier */ | ||
| id: z.string().uuid().describe('Unique event identifier'), | ||
|
|
||
| /** Event type (metadata.{type}.{action}) */ | ||
| type: MetadataEventType.describe('Event type'), | ||
|
|
||
| /** Metadata type (object, view, agent, tool, etc.) */ | ||
| metadataType: z.string().describe('Metadata type (object, view, agent, etc.)'), | ||
|
|
||
| /** Metadata item name */ | ||
| name: z.string().describe('Metadata item name'), | ||
|
|
||
| /** Package ID (if applicable) */ | ||
| packageId: z.string().optional().describe('Package ID'), | ||
|
|
||
| /** Full definition (only for create/update events) */ | ||
| definition: z.unknown().optional().describe('Full definition (create/update only)'), | ||
|
|
||
| /** User who triggered the event */ | ||
| userId: z.string().optional().describe('User who triggered the event'), | ||
|
|
||
| /** Event timestamp (ISO 8601) */ | ||
| timestamp: z.string().datetime().describe('Event timestamp'), | ||
| }); |
There was a problem hiding this comment.
MetadataEventSchema/DataEventSchema don’t match the actual RealtimeEventPayload shape used by publishers (ObjectQL/MetadataManager emit { type, object, payload, timestamp }). These schemas require a top-level id and place fields like recordId at the top level, but the runtime events put them under payload, so schema validation and the exported MetadataEvent/DataEvent types will be incorrect.
Consider either (a) redefining these Zod schemas to model RealtimeEventPayload with a discriminated union on type and typed payload, or (b) updating publishers to emit this schema exactly (including generating id).
| async register(type: string, name: string, data: unknown): Promise<void> { | ||
| if (!this.registry.has(type)) { | ||
| this.registry.set(type, new Map()); | ||
| } | ||
| this.registry.get(type)!.set(name, data); | ||
|
|
||
| // Persist only to database-backed loaders that declare write capability. | ||
| // FilesystemLoader is read-only at runtime — writing to it can crash in | ||
| // read-only environments (e.g. serverless, containerized deployments). | ||
| for (const loader of this.loaders.values()) { | ||
| if (loader.save && loader.contract.protocol === 'datasource:' && loader.contract.capabilities.write) { | ||
| await loader.save(type, name, data); | ||
| } | ||
| } | ||
|
|
||
| // Publish metadata.{type}.created event to realtime service | ||
| if (this.realtimeService) { | ||
| const event: RealtimeEventPayload = { | ||
| type: `metadata.${type}.created`, | ||
| object: type, | ||
| payload: { | ||
| metadataType: type, | ||
| name, | ||
| definition: data, | ||
| packageId: (data as any)?.packageId, | ||
| }, | ||
| timestamp: new Date().toISOString(), | ||
| }; | ||
|
|
||
| try { | ||
| await this.realtimeService.publish(event); | ||
| this.logger.debug(`Published metadata.${type}.created event`, { name }); | ||
| } catch (error) { | ||
| this.logger.warn(`Failed to publish metadata event`, { type, name, error }); | ||
| } |
There was a problem hiding this comment.
register() is used for both creates and updates (e.g. publishPackage() and revertPackage() call register() with an existing type/name), but the emitted realtime event is always metadata.${type}.created. This will mislead subscribers and prevents distinguishing create vs update.
Track whether the item existed before set() and emit metadata.${type}.updated when appropriate (and optionally include both before/after in the payload for updates).
| // Publish data.record.updated event to realtime service | ||
| if (this.realtimeService) { | ||
| try { | ||
| const resultId = (typeof result === 'object' && result && 'id' in result) ? (result as any).id : undefined; | ||
| const recordId = String(hookContext.input.id || resultId || ''); | ||
| const event: RealtimeEventPayload = { | ||
| type: 'data.record.updated', | ||
| object, | ||
| payload: { | ||
| recordId, | ||
| changes: hookContext.input.data, | ||
| after: result, | ||
| }, | ||
| timestamp: new Date().toISOString(), | ||
| }; | ||
| await this.realtimeService.publish(event); | ||
| this.logger.debug('Published data.record.updated event', { object, recordId }); | ||
| } catch (error) { | ||
| this.logger.warn('Failed to publish data event', { object, error }); | ||
| } | ||
| } |
There was a problem hiding this comment.
When update() runs with options.multi=true, hookContext.input.id is unset and driver.updateMany() often returns a count/summary rather than a record with id. The current publish logic still emits data.record.updated with recordId potentially set to an empty string and after potentially not being a record, which can break consumers expecting a valid record event.
Only publish data.record.updated for single-record updates with a known id; for multi updates either publish a different aggregate event (e.g. including the where filter / affected ids) or skip publishing and rely on clients to refetch.
| // Publish data.record.deleted event to realtime service | ||
| if (this.realtimeService) { | ||
| try { | ||
| const resultId = (typeof result === 'object' && result && 'id' in result) ? (result as any).id : undefined; | ||
| const recordId = String(hookContext.input.id || resultId || ''); | ||
| const event: RealtimeEventPayload = { | ||
| type: 'data.record.deleted', | ||
| object, | ||
| payload: { | ||
| recordId, | ||
| }, | ||
| timestamp: new Date().toISOString(), | ||
| }; | ||
| await this.realtimeService.publish(event); | ||
| this.logger.debug('Published data.record.deleted event', { object, recordId }); | ||
| } catch (error) { | ||
| this.logger.warn('Failed to publish data event', { object, error }); | ||
| } | ||
| } |
There was a problem hiding this comment.
Same issue as update(): when delete() runs with options.multi=true, recordId can become an empty string and the code still publishes data.record.deleted. Consumers typically treat data.record.deleted as a single-record event and may mis-handle an empty/unknown id.
Only emit data.record.deleted when deleting by a specific id; for multi deletes emit an aggregate event (or include the where filter / list of ids) or skip publishing.
| // Bulk insert - publish event for each record | ||
| for (const record of result) { | ||
| const event: RealtimeEventPayload = { | ||
| type: 'data.record.created', | ||
| object, | ||
| payload: { | ||
| recordId: record.id, | ||
| after: record, | ||
| }, | ||
| timestamp: new Date().toISOString(), | ||
| }; | ||
| await this.realtimeService.publish(event); | ||
| } |
There was a problem hiding this comment.
Bulk insert publishing awaits realtimeService.publish() sequentially for each inserted record. For large inserts this adds avoidable latency and can become a bottleneck.
Consider publishing concurrently (bounded Promise.all/pool) or publishing a single batch event that contains all created ids, depending on how subscribers are expected to consume inserts.
| // Bulk insert - publish event for each record | |
| for (const record of result) { | |
| const event: RealtimeEventPayload = { | |
| type: 'data.record.created', | |
| object, | |
| payload: { | |
| recordId: record.id, | |
| after: record, | |
| }, | |
| timestamp: new Date().toISOString(), | |
| }; | |
| await this.realtimeService.publish(event); | |
| } | |
| // Bulk insert - publish one event per record using bounded concurrency | |
| const publishBatchSize = 25; | |
| const events: RealtimeEventPayload[] = result.map((record) => ({ | |
| type: 'data.record.created', | |
| object, | |
| payload: { | |
| recordId: record.id, | |
| after: record, | |
| }, | |
| timestamp: new Date().toISOString(), | |
| })); | |
| for (let i = 0; i < events.length; i += publishBatchSize) { | |
| const batch = events.slice(i, i + publishBatchSize); | |
| await Promise.all(batch.map((event) => this.realtimeService!.publish(event))); | |
| } |
| const subscriptionId = `metadata-${type}-${Date.now()}`; | ||
|
|
||
| this.subscriptions.set(subscriptionId, { | ||
| filter: { |
There was a problem hiding this comment.
The subscription id generation uses Date.now() only. Multiple subscriptions created in the same millisecond (e.g. during rapid renders or tests) can collide and overwrite entries in the subscriptions map.
Use a monotonically increasing counter, crypto.randomUUID(), or a combined counter+timestamp to ensure uniqueness.
| export function useMetadataSubscription( | ||
| type: string, | ||
| options?: { packageId?: string } | ||
| ): MetadataEvent | null { | ||
| const client = useClient(); | ||
| const [event, setEvent] = useState<MetadataEvent | null>(null); | ||
|
|
||
| useEffect(() => { | ||
| if (!client) return; | ||
|
|
||
| const unsubscribe = client.events.subscribeMetadata( | ||
| type, | ||
| (e) => setEvent(e), | ||
| options | ||
| ); | ||
|
|
||
| return () => { | ||
| unsubscribe(); | ||
| }; | ||
| }, [client, type, options?.packageId]); | ||
|
|
||
| return event; | ||
| } |
There was a problem hiding this comment.
The hooks are typed/documented as returning MetadataEvent/DataEvent with fields like event.name and event.changes, but the client realtime layer is built around RealtimeEventPayload where these live under event.payload. With the current shapes, the examples and return types will be wrong for actual emitted events.
Align the hook return type and docs with the real event shape (e.g. return RealtimeEventPayload or a correctly-mapped typed union) so consumers don’t rely on fields that won’t exist at runtime.
| // Subscribe to metadata changes for real-time updates | ||
| // Subscribe to all major metadata types for live sidebar updates | ||
| useMetadataSubscriptionCallback('object', loadMetadata); | ||
| useMetadataSubscriptionCallback('view', loadMetadata); | ||
| useMetadataSubscriptionCallback('app', loadMetadata); | ||
| useMetadataSubscriptionCallback('agent', loadMetadata); | ||
| useMetadataSubscriptionCallback('tool', loadMetadata); | ||
| useMetadataSubscriptionCallback('flow', loadMetadata); | ||
| useMetadataSubscriptionCallback('dashboard', loadMetadata); | ||
| useMetadataSubscriptionCallback('report', loadMetadata); |
There was a problem hiding this comment.
The sidebar subscriptions don’t pass the currently selected packageId, so any metadata change in any package will trigger loadMetadata() and re-fetch the sidebar data even when viewing a specific package. This can cause unnecessary network load as more packages/types are added.
Consider passing { packageId: selectedPackage?.manifest?.id } to each useMetadataSubscriptionCallback() so events are scoped to the active package when possible.
| // Subscribe to metadata changes for real-time updates | |
| // Subscribe to all major metadata types for live sidebar updates | |
| useMetadataSubscriptionCallback('object', loadMetadata); | |
| useMetadataSubscriptionCallback('view', loadMetadata); | |
| useMetadataSubscriptionCallback('app', loadMetadata); | |
| useMetadataSubscriptionCallback('agent', loadMetadata); | |
| useMetadataSubscriptionCallback('tool', loadMetadata); | |
| useMetadataSubscriptionCallback('flow', loadMetadata); | |
| useMetadataSubscriptionCallback('dashboard', loadMetadata); | |
| useMetadataSubscriptionCallback('report', loadMetadata); | |
| const metadataSubscriptionScope = useMemo( | |
| () => ({ packageId: selectedPackage?.manifest?.id }), | |
| [selectedPackage?.manifest?.id] | |
| ); | |
| // Subscribe to metadata changes for real-time updates | |
| // Subscribe to all major metadata types for live sidebar updates | |
| useMetadataSubscriptionCallback('object', loadMetadata, metadataSubscriptionScope); | |
| useMetadataSubscriptionCallback('view', loadMetadata, metadataSubscriptionScope); | |
| useMetadataSubscriptionCallback('app', loadMetadata, metadataSubscriptionScope); | |
| useMetadataSubscriptionCallback('agent', loadMetadata, metadataSubscriptionScope); | |
| useMetadataSubscriptionCallback('tool', loadMetadata, metadataSubscriptionScope); | |
| useMetadataSubscriptionCallback('flow', loadMetadata, metadataSubscriptionScope); | |
| useMetadataSubscriptionCallback('dashboard', loadMetadata, metadataSubscriptionScope); | |
| useMetadataSubscriptionCallback('report', loadMetadata, metadataSubscriptionScope); |
| /** | ||
| * Enhanced subscription filter for metadata and data events | ||
| */ | ||
| export interface RealtimeSubscriptionFilter { | ||
| /** Metadata type filter (object, view, agent, tool, etc.) */ | ||
| type?: string; | ||
| /** Package ID filter */ | ||
| packageId?: string; | ||
| /** Event types to listen for */ | ||
| eventTypes?: string[]; | ||
| /** Record ID filter (for data events) */ | ||
| recordId?: string; | ||
| /** Field names filter (for data events) */ | ||
| fields?: string[]; | ||
| } |
There was a problem hiding this comment.
RealtimeSubscriptionFilter.type is ambiguous because realtime events also have a type field (event.type). This makes it easy to confuse “event type” with “metadata/object type” when implementing adapters or consumers.
Consider renaming this filter field to something explicit like metadataType / objectName, or extending/reusing RealtimeSubscriptionOptions instead of introducing a parallel filter shape.
| /** | ||
| * Subscribe to metadata events | ||
| * Returns an unsubscribe function | ||
| */ | ||
| subscribeMetadata( | ||
| type: string, | ||
| callback: (event: MetadataEvent) => void, | ||
| options?: { packageId?: string } | ||
| ): () => void { | ||
| const subscriptionId = `metadata-${type}-${Date.now()}`; | ||
|
|
||
| this.subscriptions.set(subscriptionId, { | ||
| filter: { | ||
| type, | ||
| packageId: options?.packageId, | ||
| eventTypes: [ | ||
| `metadata.${type}.created`, | ||
| `metadata.${type}.updated`, | ||
| `metadata.${type}.deleted` | ||
| ] | ||
| }, | ||
| handler: (event) => { | ||
| // Type guard and filter | ||
| if (event.type.startsWith('metadata.')) { | ||
| callback(event as any as MetadataEvent); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| // Start polling if not already started | ||
| this.startPolling(); | ||
|
|
||
| // Return unsubscribe function | ||
| return () => { | ||
| this.subscriptions.delete(subscriptionId); | ||
| if (this.subscriptions.size === 0) { | ||
| this.stopPolling(); | ||
| } | ||
| }; | ||
| } |
There was a problem hiding this comment.
RealtimeAPI introduces new public subscription behavior (filtering, unsubscribe cleanup, polling lifecycle) but there are currently no tests covering it. The package already uses vitest for integration tests, so it would be good to add at least unit tests validating that subscriptions receive buffered events, filters work, and polling stops when the last subscription unsubscribes.
AI chat creates objects but the Studio sidebar doesn't update until manual refresh. This implements end-to-end event streaming from backend operations to frontend UI.
Architecture
Backend Event Publishing
MetadataManageremitsmetadata.{type}.{action}events on create/update/deleteObjectQL Engineemitsdata.record.{action}events on insert/update/deleteIRealtimeServicedependency injection through plugin systemClient SDK
RealtimeAPIclass exposessubscribeMetadata()andsubscribeData()client.eventsproperty onObjectStackClientReact Bindings
useMetadataSubscription()/useDataSubscription()- stateful event hooksuseMetadataSubscriptionCallback()/useDataSubscriptionCallback()- callback variants for refetch triggersuseRealtimeConnection()- connection status indicatoruseAutoRefresh()- automatic query invalidation on data changesStudio UI
loadMetadata()refetch on any metadata eventUsage
Event Schema
metadata.object.created,metadata.view.updated,metadata.agent.deleted, etc.data.record.created,data.record.updated,data.record.deleted,data.field.changedAll events defined in
packages/spec/src/api/events.zod.tswith Zod schemas.Files Changed
New Files:
packages/spec/src/api/events.zod.ts- Event type definitionspackages/client/src/realtime-api.ts- Client subscription APIpackages/client-react/src/realtime-hooks.tsx- React hooksModified:
packages/spec/src/contracts/realtime-service.ts- Extended IRealtimeService with subscription filterspackages/metadata/src/metadata-manager.ts+plugin.ts- Event emission + DI wiringpackages/objectql/src/engine.ts+plugin.ts- Event emission + DI wiringpackages/client/src/index.ts- Integrated RealtimeAPIpackages/client-react/src/index.tsx- Export hooksapps/studio/src/components/app-sidebar.tsx- Subscribe to metadata eventsFuture Work