Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 69 additions & 45 deletions nexus-broker/internal/service/connection_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,37 @@ import (
"github.com/Prescott-Data/nexus-framework/nexus-broker/internal/service"
)

func runWorkerUntilSignal(t *testing.T, worker *service.ConnectionHealthWorker, done <-chan struct{}) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
workerDone := make(chan struct{})

go func() {
defer close(workerDone)
worker.Start(ctx)
}()

select {
case <-done:
case <-time.After(2 * time.Second):
cancel()
select {
case <-workerDone:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for health worker to stop after signal timeout")
}
t.Fatal("timed out waiting for health worker signal")
Comment on lines +32 to +39
}
Comment on lines +30 to +40

cancel()

select {
case <-workerDone:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for health worker to stop")
}
}

// Add missing mock methods to MockConnectionRepository
func (m *MockConnectionRepository) GetForHealthCheck(ctx context.Context, limit int) ([]*domain.ConnectionWithProvider, error) {
args := m.Called(ctx, limit)
Expand Down Expand Up @@ -126,15 +157,14 @@ func TestConnectionHealthWorker_OAuth2_Healthy(t *testing.T) {
mockSvc.On("Refresh", mock.Anything, connID).Return(&service.RefreshResponse{}, nil).Once()

// Should update health to healthy
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "healthy").Return(nil).Once()
done := make(chan struct{})
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "healthy").
Run(func(args mock.Arguments) { close(done) }).
Return(nil).Once()
Comment on lines +160 to +163

worker := service.NewConnectionHealthWorker(mockRepo, mockSvc, mockHealth, 10*time.Millisecond)

ctx, cancel := context.WithCancel(context.Background())
go worker.Start(ctx)

time.Sleep(50 * time.Millisecond) // Give it time to run at least once
cancel()

runWorkerUntilSignal(t, worker, done)

mockRepo.AssertExpectations(t)
mockSvc.AssertExpectations(t)
Expand Down Expand Up @@ -169,15 +199,14 @@ func TestConnectionHealthWorker_OAuth2_Expired(t *testing.T) {
mockRepo.On("UpdateStatus", mock.Anything, connID, "expired").Return(nil).Once()

// Should update health to expired
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "expired").Return(nil).Once()
done := make(chan struct{})
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "expired").
Run(func(args mock.Arguments) { close(done) }).
Return(nil).Once()
Comment on lines +202 to +205

worker := service.NewConnectionHealthWorker(mockRepo, mockSvc, mockHealth, 10*time.Millisecond)

ctx, cancel := context.WithCancel(context.Background())
go worker.Start(ctx)

time.Sleep(50 * time.Millisecond)
cancel()

runWorkerUntilSignal(t, worker, done)

mockRepo.AssertExpectations(t)
mockSvc.AssertExpectations(t)
Expand Down Expand Up @@ -213,15 +242,14 @@ func TestConnectionHealthWorker_OAuth2_ProviderDown_ShieldsExpiration(t *testing

// Should NOT call UpdateStatus (no expiration)
// Should update health to "unhealthy" instead of "expired"
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "unhealthy").Return(nil).Once()
done := make(chan struct{})
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "unhealthy").
Run(func(args mock.Arguments) { close(done) }).
Return(nil).Once()
Comment on lines +245 to +248

worker := service.NewConnectionHealthWorker(mockRepo, mockSvc, mockHealth, 10*time.Millisecond)

ctx, cancel := context.WithCancel(context.Background())
go worker.Start(ctx)

time.Sleep(50 * time.Millisecond)
cancel()
runWorkerUntilSignal(t, worker, done)

mockRepo.AssertExpectations(t)
mockSvc.AssertExpectations(t)
Expand Down Expand Up @@ -267,15 +295,14 @@ func TestConnectionHealthWorker_APIKey_Expired(t *testing.T) {
mockRepo.On("UpdateStatus", mock.Anything, connID, "expired").Return(nil).Once()

// Should update health to expired
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "expired").Return(nil).Once()
done := make(chan struct{})
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "expired").
Run(func(args mock.Arguments) { close(done) }).
Return(nil).Once()
Comment on lines +298 to +301

worker := service.NewConnectionHealthWorker(mockRepo, mockSvc, mockHealth, 10*time.Millisecond)

ctx, cancel := context.WithCancel(context.Background())
go worker.Start(ctx)

time.Sleep(50 * time.Millisecond)
cancel()

runWorkerUntilSignal(t, worker, done)

mockRepo.AssertExpectations(t)
mockSvc.AssertExpectations(t)
Expand Down Expand Up @@ -304,15 +331,14 @@ func TestConnectionHealthWorker_OAuth2_Upstream5xx_MarksUnhealthy(t *testing.T)

// Should set health_status to "unhealthy", NOT "expired"
// Should NOT call UpdateStatus — connection status stays "active"
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "unhealthy").Return(nil).Once()
done := make(chan struct{})
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "unhealthy").
Run(func(args mock.Arguments) { close(done) }).
Return(nil).Once()
Comment on lines +334 to +337

worker := service.NewConnectionHealthWorker(mockRepo, mockSvc, mockHealth, 10*time.Millisecond)

ctx, cancel := context.WithCancel(context.Background())
go worker.Start(ctx)

time.Sleep(50 * time.Millisecond)
cancel()
runWorkerUntilSignal(t, worker, done)

mockRepo.AssertExpectations(t)
mockSvc.AssertExpectations(t)
Expand Down Expand Up @@ -340,15 +366,14 @@ func TestConnectionHealthWorker_OAuth2_403_MarksDegraded(t *testing.T) {
mockSvc.On("Refresh", mock.Anything, connID).Return(&service.RefreshResponse{StatusCode: 403}, errors.New("forbidden")).Once()

// Should set health_status to "degraded", NOT "expired"
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "degraded").Return(nil).Once()
done := make(chan struct{})
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "degraded").
Run(func(args mock.Arguments) { close(done) }).
Return(nil).Once()
Comment on lines +369 to +372

worker := service.NewConnectionHealthWorker(mockRepo, mockSvc, mockHealth, 10*time.Millisecond)

ctx, cancel := context.WithCancel(context.Background())
go worker.Start(ctx)

time.Sleep(50 * time.Millisecond)
cancel()
runWorkerUntilSignal(t, worker, done)

mockRepo.AssertExpectations(t)
mockSvc.AssertExpectations(t)
Expand Down Expand Up @@ -376,15 +401,14 @@ func TestConnectionHealthWorker_OAuth2_NetworkError_MarksDegraded(t *testing.T)
mockSvc.On("Refresh", mock.Anything, connID).Return((*service.RefreshResponse)(nil), errors.New("connection refused")).Once()

// Should set health_status to "degraded" (we don't know if credential is valid)
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "degraded").Return(nil).Once()
done := make(chan struct{})
mockRepo.On("UpdateHealthStatus", mock.Anything, connID, "degraded").
Run(func(args mock.Arguments) { close(done) }).
Return(nil).Once()
Comment on lines +404 to +407

worker := service.NewConnectionHealthWorker(mockRepo, mockSvc, mockHealth, 10*time.Millisecond)

ctx, cancel := context.WithCancel(context.Background())
go worker.Start(ctx)

time.Sleep(50 * time.Millisecond)
cancel()
runWorkerUntilSignal(t, worker, done)

mockRepo.AssertExpectations(t)
mockSvc.AssertExpectations(t)
Expand Down
Loading