Skip to content

Commit 1a90419

Browse files
authored
Merge pull request #79 from Prescott-Data/feat/healthy-checks
Feat/healthy checks
2 parents b33d7be + b56f9f0 commit 1a90419

30 files changed

Lines changed: 2191 additions & 28 deletions

CHANGELOG.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,21 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8-
## [0.2.1] - 2026-05-10
8+
## [0.2.4] - 2026-05-19
9+
10+
### Added
11+
- **Health Check Hardening**: Provider health cross-referencing prevents mass-expiration of connections during transient upstream outages.
12+
- **Bounded Concurrency**: Semaphore + WaitGroup pattern limits goroutine growth in both `HealthWorker` (max 10) and `ConnectionHealthWorker` (max 20).
13+
- **Graceful Shutdown**: `--worker-only` mode now handles `SIGINT`/`SIGTERM` for clean process lifecycle management.
14+
- **Frontend API**: New `GET /connections?workspace_id=` endpoint returns workspace-scoped connection summaries with health status.
15+
- **Token Health Status**: `GET /connections/{id}/token` response now includes `health_status` field.
16+
- **Database Index**: Partial index on `connections(status, last_health_check_at)` optimizes health check polling at scale.
17+
18+
### Fixed
19+
- `GET /providers/health` returns `[]` instead of `null` for empty provider lists.
20+
- Standardized logging: replaced `fmt.Printf` with `log.Printf` in background workers.
21+
22+
---
923

1024
### Changed
1125
- **Service Layer**: Refactored `connection_part2.go` into `credential.go`, separating credential capture, token refresh, and credential validation by responsibility.

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.2.3
1+
0.2.4

docs/healthchecks.md

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# Health Checks
2+
3+
The Nexus Broker continuously monitors integration health across two dimensions: **provider-level** (is the upstream API alive?) and **connection-level** (is this user's credential still valid?). Both run as background workers inside the broker process.
4+
5+
---
6+
7+
## Background Workers
8+
9+
### HealthWorker — Provider-Level (5-minute interval)
10+
11+
Probes every registered OAuth2 provider by sending a synthetic `invalid_grant` request to its `token_url`. This deliberately bad request tells us whether the provider's API is reachable and responding to OAuth traffic without requiring a real user credential.
12+
13+
| Provider Response | Status Set |
14+
|-------------------|------------|
15+
| `400 Bad Request` or `401 Unauthorized` | `healthy` — API is alive and rejecting correctly |
16+
| `5xx Server Error` | `unhealthy` — API is down |
17+
| `200 OK` (unexpected for invalid grant) | `degraded` — API behaving abnormally |
18+
| Network error / timeout | `unhealthy` |
19+
| No `token_url` configured | `unknown` |
20+
21+
For non-OAuth2 providers (API key, basic auth), the worker makes a `HEAD` request to `user_info_endpoint` or `api_base_url`. Any non-5xx response is treated as `healthy`.
22+
23+
**Concurrency:** max 10 providers checked concurrently (semaphore + WaitGroup).
24+
25+
---
26+
27+
### ConnectionHealthWorker — Connection-Level (1-minute interval)
28+
29+
Validates individual user connections in batches of 100 on a fixed ticker, prioritising those never checked or longest overdue. Each check has a 15-second timeout. A shared `http.Client` is reused across checks for connection pooling.
30+
31+
| Auth Type | Check Method |
32+
|-----------|-------------|
33+
| `oauth2` | Attempt a background token refresh via `ConnectionService.Refresh` |
34+
| `api_key` | Decrypt credential, extract `api_key` field, `GET` to `user_info_endpoint` using provider's configured `AuthHeader` |
35+
| `basic_auth` | Decrypt credential, extract `username`/`password`, `GET` to `user_info_endpoint` with `Authorization: Basic` |
36+
| No endpoint configured | Mark `unknown` |
37+
38+
**OAuth2 status code handling:** The worker inspects `RefreshResponse.StatusCode` to distinguish definitive credential errors from transient failures:
39+
40+
| Upstream Status | `health_status` set | `connection.status` changed? |
41+
|-----------------|--------------------|-----------------------------|
42+
| Refresh succeeds | `healthy` | No |
43+
| 400 / 401 (invalid_grant, revoked) | `expired` | Yes → `expired` (if provider healthy) |
44+
| 403 (scope issue) | `degraded` | No |
45+
| 5xx (upstream error) | `unhealthy` | No |
46+
| Network error / nil response | `degraded` | No |
47+
48+
**Provider shielding:** Before expiring a connection, the worker cross-references the upstream provider's `health_status`. If the provider is `unhealthy` or `degraded`, the connection is marked `unhealthy` (retriable) instead of `expired` (terminal). This prevents mass-expiration during transient upstream outages.
49+
50+
**Error handling:** If `UpdateStatus` fails when expiring a connection, the worker logs the error and skips the `health_status` write to avoid leaving the connection in an inconsistent state.
51+
52+
**Concurrency:** max 20 connections checked concurrently (semaphore + WaitGroup).
53+
54+
---
55+
56+
## `health_status` Values
57+
58+
Both `provider_profiles` and `connections` share the same status vocabulary:
59+
60+
| Value | Meaning |
61+
|-------|---------|
62+
| `healthy` | Last check passed |
63+
| `unhealthy` | Last check failed — retriable (transient upstream or provider-shielded) |
64+
| `degraded` | Partial failure — scope issues, network errors, or internal errors where credential validity is unknown |
65+
| `expired` | Credential confirmed invalid (400/401) — user must re-authenticate |
66+
| `unknown` | Not yet checked, or not enough information to check |
67+
68+
---
69+
70+
## API Endpoints
71+
72+
### `GET /providers/health`
73+
Returns the health status of all registered providers. No credentials are included.
74+
75+
```http
76+
GET /providers/health
77+
Authorization: X-API-Key <key>
78+
```
79+
80+
```json
81+
[
82+
{
83+
"id": "uuid",
84+
"name": "google",
85+
"health_status": "healthy",
86+
"last_health_check_at": "2026-05-19T07:00:00Z",
87+
"health_message": ""
88+
},
89+
{
90+
"id": "uuid",
91+
"name": "stripe",
92+
"health_status": "unhealthy",
93+
"last_health_check_at": "2026-05-19T07:05:00Z",
94+
"health_message": "upstream returned 503"
95+
}
96+
]
97+
```
98+
99+
Returns `[]` (not `null`) when no providers exist.
100+
101+
---
102+
103+
### `GET /connections?workspace_id={workspace_id}`
104+
Returns all non-pending connections for a workspace with health status. No credentials or tokens are included.
105+
106+
```http
107+
GET /connections?workspace_id=ws-123
108+
Authorization: X-API-Key <key>
109+
```
110+
111+
```json
112+
[
113+
{
114+
"id": "uuid",
115+
"provider_id": "uuid",
116+
"provider_name": "google",
117+
"auth_type": "oauth2",
118+
"status": "active",
119+
"scopes": ["email", "calendar.read"],
120+
"health_status": "healthy",
121+
"last_health_check_at": "2026-05-19T07:00:00Z",
122+
"created_at": "2026-05-01T00:00:00Z",
123+
"updated_at": "2026-05-19T07:00:00Z"
124+
}
125+
]
126+
```
127+
128+
**Use case:** Rendering a connections dashboard with live health indicators.
129+
130+
---
131+
132+
### `GET /connections/{id}/token` (enhanced)
133+
The existing token endpoint now includes `health_status` in its response alongside credentials and strategy.
134+
135+
```json
136+
{
137+
"strategy": { "type": "oauth2" },
138+
"credentials": { "access_token": "..." },
139+
"health_status": "healthy"
140+
}
141+
```
142+
143+
**Use case:** Showing an inline warning or re-auth prompt when consuming a credential.
144+
145+
---
146+
147+
## Worker Mode
148+
149+
Health workers run inside the standard broker process. For deployments that need to separate HTTP serving from background polling, pass `--worker-only` to the binary:
150+
151+
```bash
152+
nexus-broker --worker-only
153+
```
154+
155+
In this mode, the HTTP server does not start. The process listens for `SIGINT`/`SIGTERM` and cancels the worker context, signalling in-flight checks to stop. Note: the current implementation does not explicitly wait for worker goroutines to complete before exiting.
156+
157+
The same Docker image and environment variables are used — just override the container command.
158+
159+
---
160+
161+
## Database Schema
162+
163+
```sql
164+
-- provider_profiles
165+
ALTER TABLE provider_profiles
166+
ADD COLUMN last_health_check_at TIMESTAMP WITH TIME ZONE,
167+
ADD COLUMN health_status VARCHAR(50) DEFAULT 'unknown',
168+
ADD COLUMN health_message TEXT;
169+
170+
-- connections
171+
ALTER TABLE connections
172+
ADD COLUMN last_health_check_at TIMESTAMP WITH TIME ZONE,
173+
ADD COLUMN health_status VARCHAR(50) DEFAULT 'unknown';
174+
175+
-- Performance index for GetForHealthCheck query
176+
CREATE INDEX IF NOT EXISTS idx_connections_health_check
177+
ON connections (status, last_health_check_at ASC NULLS FIRST)
178+
WHERE status = 'active';
179+
```
180+
181+
Migrations: `13_add_provider_health.sql`, `14_add_connection_health.sql`, `15_add_connection_health_index.sql`.

docs/services/broker.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,14 @@ To ensure agents never face a "cold start" due to expired tokens:
2828
- It performs background refreshes using stored Refresh Tokens.
2929
- If a refresh fails permanently (e.g., user revoked access), it transitions the connection to `attention_required`.
3030

31-
### 5. Audit Subsystem
31+
### 5. Health Monitoring
32+
The Broker runs two background workers to continuously monitor integration health:
33+
- **`HealthWorker`** (5-min interval): Probes all registered OAuth2 providers using a synthetic `invalid_grant` request to their `token_url`. A `400`/`401` response confirms the provider is alive; a `5xx` marks it `unhealthy`.
34+
- **`ConnectionHealthWorker`** (1-min interval): Validates each active user connection by attempting a token refresh (OAuth2) or a lightweight API call (API key/basic auth). Uses **provider-shielding** to avoid falsely expiring connections during upstream outages.
35+
- Both workers use bounded concurrency (semaphore + WaitGroup) to prevent goroutine exhaustion.
36+
- In `--worker-only` mode, the binary listens for `SIGINT`/`SIGTERM` for graceful shutdown.
37+
38+
### 6. Audit Subsystem
3239
Every control-plane mutation is recorded in the `audit_events` table via the `audit.Service`:
3340
- **`provider.created`** — logged on every successful `POST /providers` call.
3441
- **`provider.updated`** — logged on `PUT` and `PATCH` mutations.
@@ -42,6 +49,18 @@ Audit events capture the **caller IP** (respecting `X-Forwarded-For`), **User-Ag
4249

4350
See the [Audit Log Reference](../reference/audit-log.md) for how to query events.
4451

52+
## Key API Endpoints
53+
54+
| Method | Path | Description |
55+
|--------|------|-------------|
56+
| `GET` | `/providers/health` | Provider health dashboard (all providers, no credentials) |
57+
| `GET` | `/connections?workspace_id=` | All connections for a workspace with health status |
58+
| `GET` | `/connections/{id}/token` | Resolve credentials + `health_status` for a specific connection |
59+
| `POST` | `/connections/{id}/refresh` | Force a token refresh |
60+
| `GET` | `/connections/resolve` | Resolve by `workspace_id` + `provider_name` |
61+
62+
See [Health Checks Architecture](../healthchecks.md) for details on the monitoring system.
63+
4564
## Environment Variables
4665

4766
| Variable | Description | Default |

mkdocs.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ nav:
9595
- Agent Identity: concepts/agent-identity.md
9696
- Security Model: concepts/security-model.md
9797
- Client Libraries: concepts/client-libraries.md
98+
- Health Checks: healthchecks.md
9899
- Getting Started:
99100
- Deploy in Five Minutes: getting-started/quickstart.md
100101
- Configuration: getting-started/configuration.md
@@ -122,7 +123,7 @@ nav:
122123

123124

124125
extra:
125-
version: "0.2.3"
126+
version: "0.2.4"
126127
social:
127128
- icon: material/web
128129
link: https://developers.prescottdata.io

nexus-broker/cmd/nexus-broker/main.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"log"
66
"os"
7+
"os/signal"
8+
"syscall"
79
"time"
810

911
"github.com/Prescott-Data/nexus-framework/nexus-broker/internal/audit"
@@ -25,9 +27,17 @@ import (
2527
var Version = "dev"
2628

2729
func main() {
28-
if len(os.Args) > 1 && (os.Args[1] == "-v" || os.Args[1] == "--version") {
29-
log.Printf("Nexus Broker version: %s", Version)
30-
os.Exit(0)
30+
isWorkerOnly := false
31+
if len(os.Args) > 1 {
32+
for _, arg := range os.Args[1:] {
33+
if arg == "-v" || arg == "--version" {
34+
log.Printf("Nexus Broker version: %s", Version)
35+
os.Exit(0)
36+
}
37+
if arg == "--worker-only" {
38+
isWorkerOnly = true
39+
}
40+
}
3141
}
3242

3343
cfg, err := config.Load()
@@ -92,6 +102,7 @@ func main() {
92102
Audit: auditSvc,
93103
})
94104
auditHandler := handlers.NewAuditHandler(db)
105+
connectionsHandler := handlers.NewConnectionsHandler(connSvc)
95106

96107
router := srv.Router()
97108
router.Get("/auth/callback", callbackHandler.Handle)
@@ -107,6 +118,7 @@ func main() {
107118
protected.Route("/providers", func(r chi.Router) {
108119
r.Post("/", providersHandler.Register)
109120
r.Get("/", providersHandler.List)
121+
r.Get("/health", providersHandler.Health)
110122
r.Get("/metadata", providersHandler.Metadata)
111123
r.Get("/by-name/{name}", providersHandler.GetByName)
112124
r.Delete("/by-name/{name}", providersHandler.DeleteByName)
@@ -116,6 +128,7 @@ func main() {
116128
r.Delete("/{id}", providersHandler.Delete)
117129
})
118130
protected.Post("/auth/consent-spec", consentHandler.GetSpec)
131+
protected.Get("/connections", connectionsHandler.List)
119132
protected.Get("/connections/resolve", callbackHandler.ResolveToken)
120133
protected.Get("/connections/{connectionID}/token", callbackHandler.GetToken)
121134
protected.Post("/connections/{connectionID}/refresh", callbackHandler.Refresh)
@@ -126,14 +139,35 @@ func main() {
126139
defer cleanupCancel()
127140
go handlers.StartOrphanTokenCleanup(cleanupCtx, db, 1*time.Hour)
128141

142+
// Start provider health worker (polls every 5m)
143+
healthWorker := provider.NewHealthWorker(store, 5*time.Minute)
144+
go healthWorker.Start(cleanupCtx)
145+
146+
// Start connection health worker (polls every 1m)
147+
// The store implements ProviderHealthLookup via GetHealthStatus(uuid.UUID)
148+
connHealthWorker := service.NewConnectionHealthWorker(connRepo, connSvc, store, 1*time.Minute)
149+
go connHealthWorker.Start(cleanupCtx)
150+
129151
// Start connection health gauge (polls every 30s)
130152
telemetry.NewConnectionGaugeCollector(connRepo, 30*time.Second)
131153

132-
log.Printf("Starting OAuth Broker server on port %s", cfg.Port)
133-
log.Printf("Version: %s", Version)
134-
log.Printf("Base URL: %s", cfg.BaseURL)
135-
136-
if err := srv.Start(); err != nil {
137-
log.Fatal("Server failed to start:", err)
154+
if isWorkerOnly {
155+
log.Printf("Starting Nexus Broker in WORKER-ONLY mode")
156+
log.Printf("Version: %s", Version)
157+
158+
// Wait for OS signal for graceful shutdown
159+
sigCh := make(chan os.Signal, 1)
160+
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
161+
sig := <-sigCh
162+
log.Printf("Received signal %v, shutting down workers...", sig)
163+
cleanupCancel()
164+
} else {
165+
log.Printf("Starting OAuth Broker server on port %s", cfg.Port)
166+
log.Printf("Version: %s", Version)
167+
log.Printf("Base URL: %s", cfg.BaseURL)
168+
169+
if err := srv.Start(); err != nil {
170+
log.Fatal("Server failed to start:", err)
171+
}
138172
}
139173
}

nexus-broker/internal/domain/models.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ type Connection struct {
1818
ReturnURL string
1919
Status string
2020
ExpiresAt time.Time
21+
LastHealthCheckAt sql.NullTime
22+
HealthStatus string
2123
}
2224

2325
// ConnectionWithProvider joins connection and basic provider info
@@ -31,6 +33,21 @@ type ConnectionWithProvider struct {
3133
ProviderParams *json.RawMessage
3234
}
3335

36+
// ConnectionSummary is a lightweight view of a connection for frontend listing.
37+
// It deliberately omits credentials and internal fields.
38+
type ConnectionSummary struct {
39+
ID uuid.UUID `json:"id"`
40+
ProviderID uuid.UUID `json:"provider_id"`
41+
ProviderName string `json:"provider_name"`
42+
AuthType string `json:"auth_type"`
43+
Status string `json:"status"`
44+
Scopes []string `json:"scopes"`
45+
HealthStatus string `json:"health_status"`
46+
LastHealthCheckAt *time.Time `json:"last_health_check_at,omitempty"`
47+
CreatedAt time.Time `json:"created_at"`
48+
UpdatedAt time.Time `json:"updated_at"`
49+
}
50+
3451
// Token represents an encrypted token at rest
3552
type Token struct {
3653
ConnectionID uuid.UUID

0 commit comments

Comments
 (0)