Skip to content

Commit ae7f316

Browse files
DanielHashmiclaude
andcommitted
fix(phase-v): improve event publishing, logging, and WebSocket service reliability
- Add logging configuration and startup diagnostics to backend - Improve event publisher graceful handling when Dapr is unavailable - Enhance MCP server with better tool handling and error recovery - Fix WebSocket service task update handler for real-time sync - Update DashboardClient and ThemedChatWidget with UI improvements 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 3c37542 commit ae7f316

9 files changed

Lines changed: 263 additions & 97 deletions

File tree

CLAUDE.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,47 @@ Complete list of specialized agents available:
278278

279279
## Critical Rules
280280

281+
### Spec-Kit-Plus Compliance & Implementation Change Documentation
282+
283+
Given strict adherence to the spec-kit-plus methodology, the following rules govern how implementation changes are documented:
284+
285+
**1. Change Documentation Workflow**
286+
When directly requested to modify code or implementation details:
287+
288+
- **Prompt for Documentation First**: Before making changes, ask the user to document:
289+
- The specific deviation from spec.md, plan.md, tasks.md, or other design documents
290+
- The rationale/justification for the change
291+
- Impact on other components, user stories, or downstream tasks
292+
- Whether this is a temporary workaround or permanent fix
293+
294+
- **Update Tasks After Approval**: Once changes are approved and implemented:
295+
- Mark the original task as `[X]` complete
296+
- Add a new task documenting what was actually done vs. what was specified
297+
- Note any errors, issues, or workarounds encountered
298+
- Reference the divergence in task comments (e.g., "// Deviation from T005: Changed X due to Y")
299+
300+
**2. Progressive Specification Alignment**
301+
The goal is to progressively align specifications with actual implementation:
302+
303+
- **Minor Deviations**: Update task comments and continue
304+
- **Moderate Changes**: Create an ADR using `/sp.adr <title>` to document the architectural decision
305+
- **Major Scope Changes**: Update the relevant spec.md, plan.md, or tasks.md sections with user approval
306+
307+
**3. Preserving Core Requirements**
308+
- **NEVER modify** statements that represent core requirements from:
309+
- `.specify/memory/constitution.md` (core principles)
310+
- spec.md (user stories, success criteria, acceptance scenarios)
311+
- plan.md (architecture decisions, technical constraints)
312+
- Only modify specification-related files with explicit user approval
313+
- Document all changes, even approved ones, for traceability
314+
315+
**4. Why This Matters**
316+
Without systematic documentation:
317+
- Specifications become outdated and inaccurate
318+
- New team members cannot understand what was actually implemented
319+
- Future changes may inadvertently break working implementations
320+
- The gap between "what we planned" and "what we built" grows unmanageably
321+
281322
### Stateless Architecture
282323
- ALL state persisted to database
283324
- Store user message BEFORE agent runs

backend/main.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""FastAPI application entry point for LifeStepsAI backend."""
22
import asyncio
3+
import logging
34
import os
45
from contextlib import asynccontextmanager
56
from pathlib import Path
@@ -21,6 +22,13 @@
2122

2223
load_dotenv()
2324

25+
# Configure logging
26+
logging.basicConfig(
27+
level=logging.INFO,
28+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
29+
)
30+
logger = logging.getLogger(__name__)
31+
2432
# CORS settings - support multiple origins from CORS_ORIGINS env var
2533
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:3000")
2634
CORS_ORIGINS_ENV = os.getenv("CORS_ORIGINS", "")
@@ -43,6 +51,13 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
4351
# Startup: Create database tables
4452
create_db_and_tables()
4553

54+
# Log configuration for event publishing
55+
dapr_http_port = os.getenv("DAPR_HTTP_PORT", "3500")
56+
websocket_url = os.getenv("WEBSOCKET_SERVICE_URL", "http://localhost:8004")
57+
logger.info(f"Backend starting...")
58+
logger.info(f" DAPR_HTTP_PORT: {dapr_http_port}")
59+
logger.info(f" WEBSOCKET_SERVICE_URL: {websocket_url}")
60+
4661
# Start notification polling in background
4762
notification_task = asyncio.create_task(notification_polling_loop())
4863

backend/src/chatbot/mcp_agent.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ def __init__(self, provider: str | None = None, model: str | None = None):
182182
# Explicitly pass critical env vars to subprocess
183183
"DATABASE_URL": os.getenv("DATABASE_URL", ""),
184184
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""),
185+
"WEBSOCKET_SERVICE_URL": os.getenv("WEBSOCKET_SERVICE_URL", "http://localhost:8004"),
185186
},
186187
},
187188
client_session_timeout_seconds=30.0,

backend/src/mcp_server/server.py

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,56 @@ def get_db_session():
5252
return Session(engine)
5353

5454

55+
def publish_event_sync(event_type: str, task: any, user_id: str, changes: list = None, task_before: dict = None):
56+
"""Publish event synchronously for MCP tools.
57+
58+
Used in sync MCP tools - runs the async event publishing in a thread
59+
with its own event loop. This works even when called from within
60+
an async context (like MCP tools called by OpenAI Agents SDK).
61+
62+
Args:
63+
event_type: Event type (created, updated, completed, deleted)
64+
task: Task SQLModel instance
65+
user_id: User who performed the action
66+
changes: List of field changes (for update events)
67+
task_before: Task state before changes (for update events)
68+
"""
69+
from src.services.event_publisher import publish_task_event
70+
import threading
71+
72+
try:
73+
# Create a new thread with its own event loop to run async publishing
74+
# This works even when called from within an async context
75+
result = [None]
76+
exception = [None]
77+
78+
def run_in_thread():
79+
try:
80+
loop = asyncio.new_event_loop()
81+
asyncio.set_event_loop(loop)
82+
result[0] = loop.run_until_complete(
83+
publish_task_event(event_type, task, user_id, changes, task_before)
84+
)
85+
loop.close()
86+
except Exception as e:
87+
exception[0] = e
88+
89+
thread = threading.Thread(target=run_in_thread, daemon=True)
90+
thread.start()
91+
thread.join(timeout=10) # Wait up to 10 seconds
92+
93+
if exception[0]:
94+
raise exception[0]
95+
96+
if result[0]:
97+
logger.debug(f"Event published synchronously: task.{event_type}")
98+
else:
99+
logger.warning(f"Event publishing returned False: task.{event_type}")
100+
except Exception as e:
101+
# Log error but don't fail the tool
102+
logger.error(f"Failed to publish event task.{event_type}: {e}", exc_info=True)
103+
104+
55105
def fire_and_forget_event(coro):
56106
"""Run an async coroutine in the background (fire-and-forget).
57107
@@ -138,9 +188,8 @@ def add_task(
138188
session.commit()
139189
session.refresh(task)
140190

141-
# Phase V: Publish task.created event (fire-and-forget)
142-
from src.services.event_publisher import publish_task_event
143-
fire_and_forget_event(publish_task_event("created", task, user_id))
191+
# Phase V: Publish task.created event synchronously (before returning)
192+
publish_event_sync("created", task, user_id)
144193

145194
# Calculate urgency for display
146195
urgency = calculate_urgency(task.due_date, task.timezone) if task.due_date else None
@@ -252,15 +301,10 @@ def complete_task(
252301
session.refresh(updated_task)
253302

254303
# Phase V: Publish event based on completion state change
255-
from src.services.event_publisher import publish_task_event, task_to_dict
256304
if updated_task.completed and not was_completed:
257-
fire_and_forget_event(publish_task_event("completed", updated_task, user_id))
305+
publish_event_sync("completed", updated_task, user_id)
258306
elif not updated_task.completed and was_completed:
259-
fire_and_forget_event(publish_task_event(
260-
"updated", updated_task, user_id,
261-
changes=["completed"],
262-
task_before=task_to_dict(task)
263-
))
307+
publish_event_sync("updated", updated_task, user_id, changes=["completed"])
264308

265309
return {
266310
"task_id": updated_task.id,
@@ -307,9 +351,8 @@ def delete_task(
307351
task_service.delete_task(task_id, user_id)
308352
session.commit()
309353

310-
# Phase V: Publish task.deleted event with task snapshot
311-
from src.services.event_publisher import publish_task_event
312-
fire_and_forget_event(publish_task_event("deleted", task_snapshot, user_id))
354+
# Phase V: Publish task.deleted event with task snapshot (synchronous)
355+
publish_event_sync("deleted", task_snapshot, user_id)
313356

314357
return {
315358
"task_id": task_id,
@@ -404,12 +447,8 @@ def update_task(
404447
session.commit()
405448
session.refresh(updated_task)
406449

407-
# Phase V: Publish task.updated event with before/after state
408-
fire_and_forget_event(publish_task_event(
409-
"updated", updated_task, user_id,
410-
changes=changes,
411-
task_before=task_before_dict
412-
))
450+
# Phase V: Publish task.updated event with before/after state (synchronous)
451+
publish_event_sync("updated", updated_task, user_id, changes, task_before_dict)
413452

414453
# Calculate urgency for display
415454
urgency = calculate_urgency(updated_task.due_date, updated_task.timezone) if updated_task.due_date else None

backend/src/services/event_publisher.py

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -180,64 +180,71 @@ async def publish_task_event(
180180
# Create CloudEvents envelope
181181
cloud_event = create_cloud_event(event_type, event_data)
182182

183-
# Publish to both task-events (audit, recurring) and task-updates (websocket)
183+
# Track success across all publish attempts
184+
success = False
185+
184186
async with httpx.AsyncClient(timeout=5.0) as client:
185-
# Publish to task-events topic via Dapr
186-
response = await client.post(
187-
f"{DAPR_PUBLISH_URL}/{TOPIC_TASK_EVENTS}",
188-
json=cloud_event,
189-
headers={
190-
"Content-Type": "application/cloudevents+json",
191-
},
192-
)
187+
# Try to publish to Dapr (if running in Kubernetes)
188+
try:
189+
# Publish to task-events topic via Dapr
190+
response = await client.post(
191+
f"{DAPR_PUBLISH_URL}/{TOPIC_TASK_EVENTS}",
192+
json=cloud_event,
193+
headers={
194+
"Content-Type": "application/cloudevents+json",
195+
},
196+
)
193197

194-
if response.status_code not in (200, 204):
195-
logger.warning(
196-
f"Failed to publish to {TOPIC_TASK_EVENTS}: "
197-
f"status={response.status_code}, body={response.text}"
198+
if response.status_code not in (200, 204):
199+
logger.warning(
200+
f"Failed to publish to {TOPIC_TASK_EVENTS}: "
201+
f"status={response.status_code}, body={response.text}"
202+
)
203+
else:
204+
success = True
205+
206+
# Publish to task-updates topic via Dapr (for real-time sync)
207+
response_updates = await client.post(
208+
f"{DAPR_PUBLISH_URL}/{TOPIC_TASK_UPDATES}",
209+
json=cloud_event,
210+
headers={
211+
"Content-Type": "application/cloudevents+json",
212+
},
198213
)
199214

200-
# Publish to task-updates topic via Dapr (for real-time sync)
201-
response_updates = await client.post(
202-
f"{DAPR_PUBLISH_URL}/{TOPIC_TASK_UPDATES}",
203-
json=cloud_event,
204-
headers={
205-
"Content-Type": "application/cloudevents+json",
206-
},
207-
)
215+
if response_updates.status_code not in (200, 204):
216+
logger.warning(
217+
f"Failed to publish to {TOPIC_TASK_UPDATES}: "
218+
f"status={response_updates.status_code}, body={response_updates.text}"
219+
)
220+
else:
221+
success = True
208222

209-
if response_updates.status_code not in (200, 204):
210-
logger.warning(
211-
f"Failed to publish to {TOPIC_TASK_UPDATES}: "
212-
f"status={response_updates.status_code}, body={response_updates.text}"
213-
)
223+
logger.debug(f"Published to Dapr pub/sub: task.{event_type}")
224+
225+
except httpx.ConnectError:
226+
# Dapr sidecar not running (local dev without Kubernetes)
227+
logger.debug(f"Dapr sidecar not available (expected in local dev)")
214228

215-
# Also publish directly to WebSocket service for local dev (no Dapr)
229+
# ALWAYS try direct WebSocket service publish (for local dev without Dapr)
216230
try:
217231
ws_response = await client.post(
218232
f"{WEBSOCKET_SERVICE_URL}/api/events/task-updates",
219233
json=cloud_event,
220234
timeout=3.0,
221235
)
222236
if ws_response.status_code == 200:
223-
logger.debug("Published event directly to WebSocket service")
237+
logger.info(f"Published task.{event_type} to WebSocket service: task_id={task.id}, user_id={user_id}")
238+
success = True
239+
else:
240+
logger.warning(f"WebSocket service returned {ws_response.status_code}: {ws_response.text}")
224241
except httpx.ConnectError:
225-
# WebSocket service not running - skip
226-
pass
242+
# WebSocket service not running
243+
logger.warning(f"WebSocket service not available at {WEBSOCKET_SERVICE_URL}")
227244
except Exception as ws_err:
228-
logger.debug(f"Failed to publish to WebSocket service: {ws_err}")
245+
logger.error(f"Failed to publish to WebSocket service: {ws_err}")
229246

230-
logger.info(
231-
f"Published task.{event_type} event: task_id={task.id}, user_id={user_id}"
232-
)
233-
return True
234-
235-
except httpx.ConnectError:
236-
# Dapr sidecar not running (local dev without Kubernetes)
237-
logger.debug(
238-
f"Dapr sidecar not available, skipping event publish: task.{event_type}"
239-
)
240-
return False
247+
return success
241248

242249
except Exception as e:
243250
# Log error but don't fail the API call

frontend/app/dashboard/DashboardClient.tsx

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,23 +102,59 @@ export default function DashboardClient({ session: initialSession }: DashboardCl
102102
const { updateName, updateImage } = useProfileUpdate();
103103
const { isSyncing, pendingCount, lastError } = useSyncQueue();
104104

105-
// Phase V: WebSocket for real-time task updates
105+
// Phase V: WebSocket for real-time task updates with optimistic updates
106106
const { connectionState, isConnected } = useWebSocket({
107-
onTaskCreated: useCallback(() => {
108-
// Revalidate tasks when a new task is created from another tab/device
109-
revalidateTasks();
107+
onTaskCreated: useCallback((taskData: Record<string, unknown>) => {
108+
// Optimistically add the new task to local state
109+
revalidateTasks(
110+
(currentTasks) => {
111+
if (!currentTasks) return currentTasks;
112+
const newTask = taskData as Task;
113+
// Check if task already exists (avoid duplicates)
114+
if (currentTasks.some(t => t.id === newTask.id)) {
115+
return currentTasks;
116+
}
117+
return [newTask, ...currentTasks];
118+
},
119+
{ revalidate: false } // Don't refetch from server
120+
);
110121
}, [revalidateTasks]),
111-
onTaskUpdated: useCallback(() => {
112-
// Revalidate tasks when a task is updated from another tab/device
113-
revalidateTasks();
122+
onTaskUpdated: useCallback((taskData: Record<string, unknown>) => {
123+
// Optimistically update the task in local state
124+
revalidateTasks(
125+
(currentTasks) => {
126+
if (!currentTasks) return currentTasks;
127+
const updatedTask = taskData as Task;
128+
return currentTasks.map(t =>
129+
t.id === updatedTask.id ? updatedTask : t
130+
);
131+
},
132+
{ revalidate: false } // Don't refetch from server
133+
);
114134
}, [revalidateTasks]),
115-
onTaskCompleted: useCallback(() => {
116-
// Revalidate tasks when a task is completed from another tab/device
117-
revalidateTasks();
135+
onTaskCompleted: useCallback((taskData: Record<string, unknown>) => {
136+
// Optimistically update completion status in local state
137+
revalidateTasks(
138+
(currentTasks) => {
139+
if (!currentTasks) return currentTasks;
140+
const completedTask = taskData as Task;
141+
return currentTasks.map(t =>
142+
t.id === completedTask.id ? { ...t, completed: completedTask.completed } : t
143+
);
144+
},
145+
{ revalidate: false } // Don't refetch from server
146+
);
118147
}, [revalidateTasks]),
119-
onTaskDeleted: useCallback(() => {
120-
// Revalidate tasks when a task is deleted from another tab/device
121-
revalidateTasks();
148+
onTaskDeleted: useCallback((taskData: Record<string, unknown>) => {
149+
// Optimistically remove the task from local state
150+
revalidateTasks(
151+
(currentTasks) => {
152+
if (!currentTasks) return currentTasks;
153+
const deletedTask = taskData as Task;
154+
return currentTasks.filter(t => t.id !== deletedTask.id);
155+
},
156+
{ revalidate: false } // Don't refetch from server
157+
);
122158
}, [revalidateTasks]),
123159
});
124160

0 commit comments

Comments
 (0)