Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions backend/internal/notify/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

const defaultPublishTimeout = 2 * time.Second

// Store is the write-side notification persistence boundary.
type Store interface {
CreateNotification(ctx context.Context, rec domain.NotificationRecord) (domain.NotificationRecord, bool, error)
Expand All @@ -29,23 +31,34 @@ type Intent = ports.NotificationIntent
// Manager validates lifecycle intents, enriches them into stored rows, persists
// unread notifications, and publishes newly inserted rows to live subscribers.
type Manager struct {
store Store
publisher Publisher
clock func() time.Time
newID func() string
store Store
publisher Publisher
publishTimeout time.Duration
clock func() time.Time
newID func() string
}

// Deps configures a Manager.
type Deps struct {
Store Store
Publisher Publisher
Clock func() time.Time
NewID func() string
Store Store
Publisher Publisher
PublishTimeout time.Duration
Clock func() time.Time
NewID func() string
}

// New constructs a write-side notification manager.
func New(d Deps) *Manager {
m := &Manager{store: d.Store, publisher: d.Publisher, clock: d.Clock, newID: d.NewID}
m := &Manager{
store: d.Store,
publisher: d.Publisher,
publishTimeout: d.PublishTimeout,
clock: d.Clock,
newID: d.NewID,
}
if m.publishTimeout <= 0 {
m.publishTimeout = defaultPublishTimeout
}
if m.clock == nil {
m.clock = time.Now
}
Expand Down Expand Up @@ -76,8 +89,28 @@ func (m *Manager) Notify(ctx context.Context, intent Intent) error {
if !inserted || m.publisher == nil {
return nil
}
if err := m.publisher.Publish(ctx, created); err != nil {
if err := m.publish(ctx, created); err != nil {
return fmt.Errorf("notify publish: %w", err)
}
return nil
}

func (m *Manager) publish(ctx context.Context, rec domain.NotificationRecord) error {
if m.publishTimeout <= 0 {
return m.publisher.Publish(ctx, rec)
}
pubCtx, cancel := context.WithTimeout(ctx, m.publishTimeout)
defer cancel()

errCh := make(chan error, 1)
go func() {
errCh <- m.publisher.Publish(pubCtx, rec)
}()

select {
case err := <-errCh:
return err
case <-pubCtx.Done():
return pubCtx.Err()
}
}
65 changes: 65 additions & 0 deletions backend/internal/notify/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@ func (f *fakeStore) CreateNotification(_ context.Context, rec domain.Notificatio
return rec, true, nil
}

type fakePublisher struct {
published []domain.NotificationRecord
delay time.Duration
err error
}

func (f *fakePublisher) Publish(ctx context.Context, rec domain.NotificationRecord) error {
if f.delay > 0 {
select {
case <-time.After(f.delay):
case <-ctx.Done():
return ctx.Err()
}
}
if f.err != nil {
return f.err
}
f.published = append(f.published, rec)
return nil
}

func TestManagerNotifyPersistsThenPublishes(t *testing.T) {
st := &fakeStore{}
hub := NewHub()
Expand Down Expand Up @@ -53,6 +74,50 @@ func TestManagerNotifyPersistsThenPublishes(t *testing.T) {
}
}

func TestManagerNotifyReturnsPublisherErrors(t *testing.T) {
st := &fakeStore{}
pub := &fakePublisher{err: errors.New("publisher unavailable")}
mgr := New(Deps{
Store: st,
Publisher: pub,
Clock: func() time.Time { return time.Now() },
NewID: func() string { return "ntf_1" },
})

err := mgr.Notify(context.Background(), Intent{Type: domain.NotificationNeedsInput, SessionID: "mer-1", ProjectID: "mer", SessionDisplayName: "checkout-flow"})
if err == nil || !errors.Is(err, pub.err) {
t.Fatalf("Notify err = %v, want publisher error", err)
}
if len(st.rows) != 1 {
t.Fatalf("stored rows = %d, want 1", len(st.rows))
}
}

func TestManagerNotifyBoundsSlowPublisher(t *testing.T) {
st := &fakeStore{}
pub := &fakePublisher{delay: 200 * time.Millisecond}
mgr := New(Deps{
Store: st,
Publisher: pub,
PublishTimeout: 20 * time.Millisecond,
Clock: func() time.Time { return time.Now() },
NewID: func() string { return "ntf_1" },
})

started := time.Now()
err := mgr.Notify(context.Background(), Intent{Type: domain.NotificationNeedsInput, SessionID: "mer-1", ProjectID: "mer", SessionDisplayName: "checkout-flow"})
elapsed := time.Since(started)
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("Notify err = %v, want deadline exceeded", err)
}
if elapsed >= 150*time.Millisecond {
t.Fatalf("Notify blocked for %s, want bounded publisher wait", elapsed)
}
if len(st.rows) != 1 {
t.Fatalf("stored rows = %d, want 1", len(st.rows))
}
}

func TestManagerNotifyDuplicateDoesNotPublish(t *testing.T) {
st := &fakeStore{duplicate: true}
hub := NewHub()
Expand Down
Loading