diff --git a/backend/internal/notify/manager.go b/backend/internal/notify/manager.go index cd951c27b4..42ea5a652c 100644 --- a/backend/internal/notify/manager.go +++ b/backend/internal/notify/manager.go @@ -13,6 +13,8 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/ports" ) +const defaultPublishTimeout = 2 * time.Second + // Store is the write-side notification persistence boundary. type Store interface { CreateNotification(ctx context.Context, rec domain.NotificationRecord) (domain.NotificationRecord, bool, error) @@ -29,23 +31,34 @@ type Intent = ports.NotificationIntent // Manager validates lifecycle intents, enriches them into stored rows, persists // unread notifications, and publishes newly inserted rows to live subscribers. type Manager struct { - store Store - publisher Publisher - clock func() time.Time - newID func() string + store Store + publisher Publisher + publishTimeout time.Duration + clock func() time.Time + newID func() string } // Deps configures a Manager. type Deps struct { - Store Store - Publisher Publisher - Clock func() time.Time - NewID func() string + Store Store + Publisher Publisher + PublishTimeout time.Duration + Clock func() time.Time + NewID func() string } // New constructs a write-side notification manager. func New(d Deps) *Manager { - m := &Manager{store: d.Store, publisher: d.Publisher, clock: d.Clock, newID: d.NewID} + m := &Manager{ + store: d.Store, + publisher: d.Publisher, + publishTimeout: d.PublishTimeout, + clock: d.Clock, + newID: d.NewID, + } + if m.publishTimeout <= 0 { + m.publishTimeout = defaultPublishTimeout + } if m.clock == nil { m.clock = time.Now } @@ -76,8 +89,28 @@ func (m *Manager) Notify(ctx context.Context, intent Intent) error { if !inserted || m.publisher == nil { return nil } - if err := m.publisher.Publish(ctx, created); err != nil { + if err := m.publish(ctx, created); err != nil { return fmt.Errorf("notify publish: %w", err) } return nil } + +func (m *Manager) publish(ctx context.Context, rec domain.NotificationRecord) error { + if m.publishTimeout <= 0 { + return m.publisher.Publish(ctx, rec) + } + pubCtx, cancel := context.WithTimeout(ctx, m.publishTimeout) + defer cancel() + + errCh := make(chan error, 1) + go func() { + errCh <- m.publisher.Publish(pubCtx, rec) + }() + + select { + case err := <-errCh: + return err + case <-pubCtx.Done(): + return pubCtx.Err() + } +} diff --git a/backend/internal/notify/manager_test.go b/backend/internal/notify/manager_test.go index 4751682be0..a9ff4063f1 100644 --- a/backend/internal/notify/manager_test.go +++ b/backend/internal/notify/manager_test.go @@ -26,6 +26,27 @@ func (f *fakeStore) CreateNotification(_ context.Context, rec domain.Notificatio return rec, true, nil } +type fakePublisher struct { + published []domain.NotificationRecord + delay time.Duration + err error +} + +func (f *fakePublisher) Publish(ctx context.Context, rec domain.NotificationRecord) error { + if f.delay > 0 { + select { + case <-time.After(f.delay): + case <-ctx.Done(): + return ctx.Err() + } + } + if f.err != nil { + return f.err + } + f.published = append(f.published, rec) + return nil +} + func TestManagerNotifyPersistsThenPublishes(t *testing.T) { st := &fakeStore{} hub := NewHub() @@ -53,6 +74,50 @@ func TestManagerNotifyPersistsThenPublishes(t *testing.T) { } } +func TestManagerNotifyReturnsPublisherErrors(t *testing.T) { + st := &fakeStore{} + pub := &fakePublisher{err: errors.New("publisher unavailable")} + mgr := New(Deps{ + Store: st, + Publisher: pub, + Clock: func() time.Time { return time.Now() }, + NewID: func() string { return "ntf_1" }, + }) + + err := mgr.Notify(context.Background(), Intent{Type: domain.NotificationNeedsInput, SessionID: "mer-1", ProjectID: "mer", SessionDisplayName: "checkout-flow"}) + if err == nil || !errors.Is(err, pub.err) { + t.Fatalf("Notify err = %v, want publisher error", err) + } + if len(st.rows) != 1 { + t.Fatalf("stored rows = %d, want 1", len(st.rows)) + } +} + +func TestManagerNotifyBoundsSlowPublisher(t *testing.T) { + st := &fakeStore{} + pub := &fakePublisher{delay: 200 * time.Millisecond} + mgr := New(Deps{ + Store: st, + Publisher: pub, + PublishTimeout: 20 * time.Millisecond, + Clock: func() time.Time { return time.Now() }, + NewID: func() string { return "ntf_1" }, + }) + + started := time.Now() + err := mgr.Notify(context.Background(), Intent{Type: domain.NotificationNeedsInput, SessionID: "mer-1", ProjectID: "mer", SessionDisplayName: "checkout-flow"}) + elapsed := time.Since(started) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("Notify err = %v, want deadline exceeded", err) + } + if elapsed >= 150*time.Millisecond { + t.Fatalf("Notify blocked for %s, want bounded publisher wait", elapsed) + } + if len(st.rows) != 1 { + t.Fatalf("stored rows = %d, want 1", len(st.rows)) + } +} + func TestManagerNotifyDuplicateDoesNotPublish(t *testing.T) { st := &fakeStore{duplicate: true} hub := NewHub()