Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.2.1] - 2026-05-10
## [0.2.4] - 2026-05-19

### Added
- **Health Check Hardening**: Provider health cross-referencing prevents mass-expiration of connections during transient upstream outages.
- **Bounded Concurrency**: Semaphore + WaitGroup pattern limits goroutine growth in both `HealthWorker` (max 10) and `ConnectionHealthWorker` (max 20).
- **Graceful Shutdown**: `--worker-only` mode now handles `SIGINT`/`SIGTERM` for clean process lifecycle management.
- **Frontend API**: New `GET /connections?workspace_id=` endpoint returns workspace-scoped connection summaries with health status.
- **Token Health Status**: `GET /connections/{id}/token` response now includes `health_status` field.
- **Database Index**: Partial index on `connections(status, last_health_check_at)` optimizes health check polling at scale.

### Fixed
- `GET /providers/health` returns `[]` instead of `null` for empty provider lists.
- Standardized logging: replaced `fmt.Printf` with `log.Printf` in background workers.

---

### Changed
- **Service Layer**: Refactored `connection_part2.go` into `credential.go`, separating credential capture, token refresh, and credential validation by responsibility.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.3
0.2.4
181 changes: 181 additions & 0 deletions docs/healthchecks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Health Checks

The Nexus Broker continuously monitors integration health across two dimensions: **provider-level** (is the upstream API alive?) and **connection-level** (is this user's credential still valid?). Both run as background workers inside the broker process.

---

## Background Workers

### HealthWorker — Provider-Level (5-minute interval)

Probes every registered OAuth2 provider by sending a synthetic `invalid_grant` request to its `token_url`. This deliberately bad request tells us whether the provider's API is reachable and responding to OAuth traffic without requiring a real user credential.

| Provider Response | Status Set |
|-------------------|------------|
| `400 Bad Request` or `401 Unauthorized` | `healthy` — API is alive and rejecting correctly |
| `5xx Server Error` | `unhealthy` — API is down |
| `200 OK` (unexpected for invalid grant) | `degraded` — API behaving abnormally |
| Network error / timeout | `unhealthy` |
| No `token_url` configured | `unknown` |

For non-OAuth2 providers (API key, basic auth), the worker makes a `HEAD` request to `user_info_endpoint` or `api_base_url`. Any non-5xx response is treated as `healthy`.

**Concurrency:** max 10 providers checked concurrently (semaphore + WaitGroup).

---

### ConnectionHealthWorker — Connection-Level (1-minute interval)

Validates individual user connections in batches of 100 on a fixed ticker, prioritising those never checked or longest overdue. Each check has a 15-second timeout. A shared `http.Client` is reused across checks for connection pooling.

| Auth Type | Check Method |
|-----------|-------------|
| `oauth2` | Attempt a background token refresh via `ConnectionService.Refresh` |
| `api_key` | Decrypt credential, extract `api_key` field, `GET` to `user_info_endpoint` using provider's configured `AuthHeader` |
| `basic_auth` | Decrypt credential, extract `username`/`password`, `GET` to `user_info_endpoint` with `Authorization: Basic` |
| No endpoint configured | Mark `unknown` |

**OAuth2 status code handling:** The worker inspects `RefreshResponse.StatusCode` to distinguish definitive credential errors from transient failures:

| Upstream Status | `health_status` set | `connection.status` changed? |
|-----------------|--------------------|-----------------------------|
| Refresh succeeds | `healthy` | No |
| 400 / 401 (invalid_grant, revoked) | `expired` | Yes → `expired` (if provider healthy) |
| 403 (scope issue) | `degraded` | No |
| 5xx (upstream error) | `unhealthy` | No |
| Network error / nil response | `degraded` | No |

**Provider shielding:** Before expiring a connection, the worker cross-references the upstream provider's `health_status`. If the provider is `unhealthy` or `degraded`, the connection is marked `unhealthy` (retriable) instead of `expired` (terminal). This prevents mass-expiration during transient upstream outages.

**Error handling:** If `UpdateStatus` fails when expiring a connection, the worker logs the error and skips the `health_status` write to avoid leaving the connection in an inconsistent state.

**Concurrency:** max 20 connections checked concurrently (semaphore + WaitGroup).

---

## `health_status` Values

Both `provider_profiles` and `connections` share the same status vocabulary:

| Value | Meaning |
|-------|---------|
| `healthy` | Last check passed |
| `unhealthy` | Last check failed — retriable (transient upstream or provider-shielded) |
| `degraded` | Partial failure — scope issues, network errors, or internal errors where credential validity is unknown |
| `expired` | Credential confirmed invalid (400/401) — user must re-authenticate |
| `unknown` | Not yet checked, or not enough information to check |

---

## API Endpoints

### `GET /providers/health`
Returns the health status of all registered providers. No credentials are included.

```http
GET /providers/health
Authorization: X-API-Key <key>
```

```json
[
{
"id": "uuid",
"name": "google",
"health_status": "healthy",
"last_health_check_at": "2026-05-19T07:00:00Z",
"health_message": ""
},
{
"id": "uuid",
"name": "stripe",
"health_status": "unhealthy",
"last_health_check_at": "2026-05-19T07:05:00Z",
"health_message": "upstream returned 503"
}
Comment on lines +80 to +95
]
```

Returns `[]` (not `null`) when no providers exist.

---

### `GET /connections?workspace_id={workspace_id}`
Returns all non-pending connections for a workspace with health status. No credentials or tokens are included.

```http
GET /connections?workspace_id=ws-123
Authorization: X-API-Key <key>
```

```json
[
{
"id": "uuid",
"provider_id": "uuid",
"provider_name": "google",
"auth_type": "oauth2",
"status": "active",
"scopes": ["email", "calendar.read"],
"health_status": "healthy",
"last_health_check_at": "2026-05-19T07:00:00Z",
"created_at": "2026-05-01T00:00:00Z",
"updated_at": "2026-05-19T07:00:00Z"
}
]
```

**Use case:** Rendering a connections dashboard with live health indicators.

---

### `GET /connections/{id}/token` (enhanced)
The existing token endpoint now includes `health_status` in its response alongside credentials and strategy.

```json
{
"strategy": { "type": "oauth2" },
"credentials": { "access_token": "..." },
"health_status": "healthy"
}
```

**Use case:** Showing an inline warning or re-auth prompt when consuming a credential.

---

## Worker Mode

Health workers run inside the standard broker process. For deployments that need to separate HTTP serving from background polling, pass `--worker-only` to the binary:

```bash
nexus-broker --worker-only
```

In this mode, the HTTP server does not start. The process listens for `SIGINT`/`SIGTERM` and cancels the worker context, signalling in-flight checks to stop. Note: the current implementation does not explicitly wait for worker goroutines to complete before exiting.

The same Docker image and environment variables are used — just override the container command.

---

## Database Schema

```sql
-- provider_profiles
ALTER TABLE provider_profiles
ADD COLUMN last_health_check_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN health_status VARCHAR(50) DEFAULT 'unknown',
ADD COLUMN health_message TEXT;

-- connections
ALTER TABLE connections
ADD COLUMN last_health_check_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN health_status VARCHAR(50) DEFAULT 'unknown';

-- Performance index for GetForHealthCheck query
CREATE INDEX IF NOT EXISTS idx_connections_health_check
ON connections (status, last_health_check_at ASC NULLS FIRST)
WHERE status = 'active';
```

Migrations: `13_add_provider_health.sql`, `14_add_connection_health.sql`, `15_add_connection_health_index.sql`.
21 changes: 20 additions & 1 deletion docs/services/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ To ensure agents never face a "cold start" due to expired tokens:
- It performs background refreshes using stored Refresh Tokens.
- If a refresh fails permanently (e.g., user revoked access), it transitions the connection to `attention_required`.

### 5. Audit Subsystem
### 5. Health Monitoring
The Broker runs two background workers to continuously monitor integration health:
- **`HealthWorker`** (5-min interval): Probes all registered OAuth2 providers using a synthetic `invalid_grant` request to their `token_url`. A `400`/`401` response confirms the provider is alive; a `5xx` marks it `unhealthy`.
- **`ConnectionHealthWorker`** (1-min interval): Validates each active user connection by attempting a token refresh (OAuth2) or a lightweight API call (API key/basic auth). Uses **provider-shielding** to avoid falsely expiring connections during upstream outages.
- Both workers use bounded concurrency (semaphore + WaitGroup) to prevent goroutine exhaustion.
- In `--worker-only` mode, the binary listens for `SIGINT`/`SIGTERM` for graceful shutdown.

### 6. Audit Subsystem
Every control-plane mutation is recorded in the `audit_events` table via the `audit.Service`:
- **`provider.created`** — logged on every successful `POST /providers` call.
- **`provider.updated`** — logged on `PUT` and `PATCH` mutations.
Expand All @@ -42,6 +49,18 @@ Audit events capture the **caller IP** (respecting `X-Forwarded-For`), **User-Ag

See the [Audit Log Reference](../reference/audit-log.md) for how to query events.

## Key API Endpoints

| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/providers/health` | Provider health dashboard (all providers, no credentials) |
| `GET` | `/connections?workspace_id=` | All connections for a workspace with health status |
| `GET` | `/connections/{id}/token` | Resolve credentials + `health_status` for a specific connection |
| `POST` | `/connections/{id}/refresh` | Force a token refresh |
| `GET` | `/connections/resolve` | Resolve by `workspace_id` + `provider_name` |

See [Health Checks Architecture](../healthchecks.md) for details on the monitoring system.

## Environment Variables

| Variable | Description | Default |
Expand Down
3 changes: 2 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ nav:
- Agent Identity: concepts/agent-identity.md
- Security Model: concepts/security-model.md
- Client Libraries: concepts/client-libraries.md
- Health Checks: healthchecks.md
- Getting Started:
- Deploy in Five Minutes: getting-started/quickstart.md
- Configuration: getting-started/configuration.md
Expand Down Expand Up @@ -122,7 +123,7 @@ nav:


extra:
version: "0.2.3"
version: "0.2.4"
social:
- icon: material/web
link: https://developers.prescottdata.io
Expand Down
52 changes: 43 additions & 9 deletions nexus-broker/cmd/nexus-broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/Prescott-Data/nexus-framework/nexus-broker/internal/audit"
Expand All @@ -25,9 +27,17 @@ import (
var Version = "dev"

func main() {
if len(os.Args) > 1 && (os.Args[1] == "-v" || os.Args[1] == "--version") {
log.Printf("Nexus Broker version: %s", Version)
os.Exit(0)
isWorkerOnly := false
if len(os.Args) > 1 {
for _, arg := range os.Args[1:] {
if arg == "-v" || arg == "--version" {
log.Printf("Nexus Broker version: %s", Version)
os.Exit(0)
}
if arg == "--worker-only" {
isWorkerOnly = true
}
}
}

cfg, err := config.Load()
Expand Down Expand Up @@ -92,6 +102,7 @@ func main() {
Audit: auditSvc,
})
auditHandler := handlers.NewAuditHandler(db)
connectionsHandler := handlers.NewConnectionsHandler(connSvc)

router := srv.Router()
router.Get("/auth/callback", callbackHandler.Handle)
Expand All @@ -107,6 +118,7 @@ func main() {
protected.Route("/providers", func(r chi.Router) {
r.Post("/", providersHandler.Register)
r.Get("/", providersHandler.List)
r.Get("/health", providersHandler.Health)
r.Get("/metadata", providersHandler.Metadata)
r.Get("/by-name/{name}", providersHandler.GetByName)
r.Delete("/by-name/{name}", providersHandler.DeleteByName)
Expand All @@ -116,6 +128,7 @@ func main() {
r.Delete("/{id}", providersHandler.Delete)
})
protected.Post("/auth/consent-spec", consentHandler.GetSpec)
protected.Get("/connections", connectionsHandler.List)
protected.Get("/connections/resolve", callbackHandler.ResolveToken)
protected.Get("/connections/{connectionID}/token", callbackHandler.GetToken)
protected.Post("/connections/{connectionID}/refresh", callbackHandler.Refresh)
Comment on lines 118 to 134
Expand All @@ -126,14 +139,35 @@ func main() {
defer cleanupCancel()
go handlers.StartOrphanTokenCleanup(cleanupCtx, db, 1*time.Hour)

// Start provider health worker (polls every 5m)
healthWorker := provider.NewHealthWorker(store, 5*time.Minute)
go healthWorker.Start(cleanupCtx)

// Start connection health worker (polls every 1m)
// The store implements ProviderHealthLookup via GetHealthStatus(uuid.UUID)
connHealthWorker := service.NewConnectionHealthWorker(connRepo, connSvc, store, 1*time.Minute)
go connHealthWorker.Start(cleanupCtx)

// Start connection health gauge (polls every 30s)
telemetry.NewConnectionGaugeCollector(connRepo, 30*time.Second)

log.Printf("Starting OAuth Broker server on port %s", cfg.Port)
log.Printf("Version: %s", Version)
log.Printf("Base URL: %s", cfg.BaseURL)

if err := srv.Start(); err != nil {
log.Fatal("Server failed to start:", err)
if isWorkerOnly {
log.Printf("Starting Nexus Broker in WORKER-ONLY mode")
log.Printf("Version: %s", Version)

// Wait for OS signal for graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigCh
log.Printf("Received signal %v, shutting down workers...", sig)
cleanupCancel()
} else {
Comment on lines +154 to +164
log.Printf("Starting OAuth Broker server on port %s", cfg.Port)
log.Printf("Version: %s", Version)
log.Printf("Base URL: %s", cfg.BaseURL)

if err := srv.Start(); err != nil {
log.Fatal("Server failed to start:", err)
}
}
}
17 changes: 17 additions & 0 deletions nexus-broker/internal/domain/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Connection struct {
ReturnURL string
Status string
ExpiresAt time.Time
LastHealthCheckAt sql.NullTime
HealthStatus string
}

// ConnectionWithProvider joins connection and basic provider info
Expand All @@ -31,6 +33,21 @@ type ConnectionWithProvider struct {
ProviderParams *json.RawMessage
}

// ConnectionSummary is a lightweight view of a connection for frontend listing.
// It deliberately omits credentials and internal fields.
type ConnectionSummary struct {
ID uuid.UUID `json:"id"`
ProviderID uuid.UUID `json:"provider_id"`
ProviderName string `json:"provider_name"`
AuthType string `json:"auth_type"`
Status string `json:"status"`
Scopes []string `json:"scopes"`
HealthStatus string `json:"health_status"`
LastHealthCheckAt *time.Time `json:"last_health_check_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

// Token represents an encrypted token at rest
type Token struct {
ConnectionID uuid.UUID
Expand Down
Loading
Loading