Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE agent_sessions DROP COLUMN IF EXISTS api_token;
ALTER TABLE agent_sessions DROP COLUMN IF EXISTS api_token_expires_at;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE agent_sessions ADD COLUMN api_token TEXT;
ALTER TABLE agent_sessions ADD COLUMN api_token_expires_at TIMESTAMPTZ;
80 changes: 73 additions & 7 deletions pkg/agents/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,61 @@ package agents
import (
"context"
"fmt"
"time"

"github.com/superplanehq/superplane/pkg/jwt"
pb "github.com/superplanehq/superplane/pkg/protos/agents"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

const agentTokenTTL = 1 * time.Hour

// Service implements the agent business logic.
type Service struct {
Client *Client
Store *Store
Client *Client
Store *Store
JWTSigner *jwt.Signer
BaseURL string // SuperPlane API base URL for CLI config
}

func NewService(client *Client, store *Store) *Service {
return &Service{Client: client, Store: store}
func NewService(client *Client, store *Store, jwtSigner *jwt.Signer, baseURL string) *Service {
return &Service{
Client: client,
Store: store,
JWTSigner: jwtSigner,
BaseURL: baseURL,
}
}

// GenerateAgentToken creates a short-lived scoped token for the agent CLI.
func (s *Service) GenerateAgentToken(orgID, userID string) (string, time.Time, error) {
expiresAt := time.Now().Add(agentTokenTTL)

token, err := s.JWTSigner.GenerateScopedToken(jwt.ScopedTokenClaims{
Subject: userID,
OrgID: orgID,
Purpose: "agent",
Scopes: []string{"canvases:read", "canvases:write", "integrations:read", "components:read"},
}, agentTokenTTL)
if err != nil {
return "", time.Time{}, fmt.Errorf("generate agent token: %w", err)
}

return token, expiresAt, nil
}

// CreateAgentChat returns the existing session or creates a new one.
func (s *Service) CreateAgentChat(ctx context.Context, orgID, userID, canvasID string) (*pb.CreateAgentChatResponse, error) {
// Check if session already exists
existing, err := s.Store.FindSession(orgID, userID, canvasID)
if err == nil {
_ = existing
// Refresh token if expired or missing
if err := s.refreshTokenIfNeeded(existing); err != nil {
return nil, status.Errorf(codes.Internal, "failed to refresh agent token: %v", err)
}

return &pb.CreateAgentChatResponse{
Url: fmt.Sprintf("/api/v1/agents/chats/%s/stream", canvasID),
}, nil
Expand All @@ -38,23 +70,57 @@ func (s *Service) CreateAgentChat(ctx context.Context, orgID, userID, canvasID s
}

// Store it
_, err = s.Store.CreateSession(orgID, userID, canvasID, session.ID)
stored, err := s.Store.CreateSession(orgID, userID, canvasID, session.ID)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to store session: %v", err)
}

// Generate and store scoped token
token, expiresAt, err := s.GenerateAgentToken(orgID, userID)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to generate agent token: %v", err)
}

if err := s.Store.UpdateAPIToken(stored.ID, token, expiresAt); err != nil {
return nil, status.Errorf(codes.Internal, "failed to store agent token: %v", err)
}

return &pb.CreateAgentChatResponse{
Url: fmt.Sprintf("/api/v1/agents/chats/%s/stream", canvasID),
}, nil
}

// refreshTokenIfNeeded regenerates the token if it's expired or missing.
func (s *Service) refreshTokenIfNeeded(session *ChatSession) error {
needsRefresh := session.APIToken == nil ||
*session.APIToken == "" ||
session.APITokenExpiresAt == nil ||
time.Now().After(*session.APITokenExpiresAt)

if !needsRefresh {
return nil
}

token, expiresAt, err := s.GenerateAgentToken(session.OrganizationID, session.UserID)
if err != nil {
return err
}

return s.Store.UpdateAPIToken(session.ID, token, expiresAt)
}

// ResumeAgentChat returns the stream URL for an existing session.
func (s *Service) ResumeAgentChat(ctx context.Context, orgID, userID, canvasID string) (*pb.ResumeAgentChatResponse, error) {
_, err := s.Store.FindSession(orgID, userID, canvasID)
session, err := s.Store.FindSession(orgID, userID, canvasID)
if err != nil {
return nil, status.Error(codes.NotFound, "no session found for this canvas")
}

// Refresh token on resume
if err := s.refreshTokenIfNeeded(session); err != nil {
return nil, status.Errorf(codes.Internal, "failed to refresh agent token: %v", err)
}

return &pb.ResumeAgentChatResponse{
Url: fmt.Sprintf("/api/v1/agents/chats/%s/stream", canvasID),
}, nil
Expand Down
22 changes: 16 additions & 6 deletions pkg/agents/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (

// ChatSession represents a persisted agent chat session.
type ChatSession struct {
ID string `gorm:"column:id;primaryKey"`
OrganizationID string `gorm:"column:organization_id"`
UserID string `gorm:"column:user_id"`
CanvasID string `gorm:"column:canvas_id"`
AnthropicSessionID string `gorm:"column:anthropic_session_id"`
CreatedAt time.Time `gorm:"column:created_at"`
ID string `gorm:"column:id;primaryKey"`
OrganizationID string `gorm:"column:organization_id"`
UserID string `gorm:"column:user_id"`
CanvasID string `gorm:"column:canvas_id"`
AnthropicSessionID string `gorm:"column:anthropic_session_id"`
APIToken *string `gorm:"column:api_token"`
APITokenExpiresAt *time.Time `gorm:"column:api_token_expires_at"`
CreatedAt time.Time `gorm:"column:created_at"`
}

func (ChatSession) TableName() string { return "agent_sessions" }
Expand Down Expand Up @@ -100,3 +102,11 @@ func (s *Store) ListMessages(sessionID string) ([]ChatMessage, error) {
}
return messages, nil
}

// UpdateAPIToken sets the scoped API token and expiry on a session.
func (s *Store) UpdateAPIToken(sessionID, token string, expiresAt time.Time) error {
return s.db.Model(&ChatSession{}).Where("id = ?", sessionID).Updates(map[string]any{
"api_token": token,
"api_token_expires_at": expiresAt,
}).Error
}
110 changes: 75 additions & 35 deletions pkg/agents/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (

// StreamHandler handles SSE streaming for agent chats.
type StreamHandler struct {
client *Client
store *Store
client *Client
store *Store
baseURL string // SuperPlane API URL for CLI config
}

func NewStreamHandler(client *Client, store *Store) *StreamHandler {
return &StreamHandler{client: client, store: store}
func NewStreamHandler(client *Client, store *Store, baseURL string) *StreamHandler {
return &StreamHandler{client: client, store: store, baseURL: baseURL}
}

// streamRequest is the POST body from the frontend.
Expand Down Expand Up @@ -78,11 +79,12 @@ func (h *StreamHandler) HandleStream(w http.ResponseWriter, r *http.Request, org
// Send run_started
writeSSE(w, flusher, map[string]any{"type": "run_started", "model": "claude-sonnet-4-6"})

// Build prompt with canvas context
prompt := body.Question
if body.AgentContext.Mode == "build" && body.AgentContext.CanvasVersion != "" {
prompt = fmt.Sprintf("[Canvas version: %s]\n\n%s", body.AgentContext.CanvasVersion, body.Question)
// Build prompt with canvas context and CLI credentials
msgCount := 1 // We just stored the current user message
if existingMsgs, _ := h.store.ListMessages(session.ID); len(existingMsgs) > 0 {
msgCount = len(existingMsgs)
}
prompt := h.buildPrompt(session, body, canvasID, msgCount)

// Count existing events before sending (to skip old turns when streaming)
existingEvents, _ := h.client.ListEvents(r.Context(), session.AnthropicSessionID, 200)
Expand Down Expand Up @@ -132,50 +134,40 @@ func (h *StreamHandler) pollAndStream(ctx context.Context, w http.ResponseWriter
case <-time.After(2 * time.Second):
}

// Check session status
session, err := h.client.GetSession(ctx, sessionID)
if err != nil {
log.WithError(err).Error("failed to poll session")
continue
}

// Get events
// Get events first
events, err := h.client.ListEvents(ctx, sessionID, 200)
if err != nil {
log.WithError(err).Error("failed to list events")
continue
}

// Stream new events
// Stream new events and detect completion from event types
sawIdle := false
sawFailed := false
for _, event := range events.Data {
if seenEventIDs[event.ID] {
continue
}
seenEventIDs[event.ID] = true

// Detect session completion from events themselves
if event.Type == "session.status_idle" {
sawIdle = true
continue
}
if event.Type == "session.status_failed" {
sawFailed = true
continue
}

text := h.streamEvent(w, flusher, event)
if text != "" {
assistantContent += text
}
}

// Check if done
if session.Status == "idle" && session.Usage.OutputTokens > 0 {
// Final fetch to catch any events that arrived between our list and status check
finalEvents, err := h.client.ListEvents(ctx, sessionID, 200)
if err == nil {
for _, event := range finalEvents.Data {
if seenEventIDs[event.ID] {
continue
}
seenEventIDs[event.ID] = true
text := h.streamEvent(w, flusher, event)
if text != "" {
assistantContent += text
}
}
}

// Only consider done AFTER processing all events in this batch
if sawIdle {
if assistantContent != "" {
writeSSE(w, flusher, map[string]any{"type": "final_answer", "output": assistantContent})
}
Expand All @@ -184,7 +176,7 @@ func (h *StreamHandler) pollAndStream(ctx context.Context, w http.ResponseWriter
return assistantContent
}

if session.Status == "failed" {
if sawFailed {
writeSSE(w, flusher, map[string]any{"type": "run_failed", "error": "agent session failed"})
writeSSE(w, flusher, map[string]any{"type": "done"})
return assistantContent
Expand Down Expand Up @@ -231,6 +223,54 @@ func extractText(event Event) string {
return strings.Join(parts, "")
}

// buildPrompt constructs the message to send to Anthropic, injecting CLI setup when needed.
func (h *StreamHandler) buildPrompt(session *ChatSession, body streamRequest, canvasID string, msgCount int) string {
var parts []string

// Inject CLI credentials if token is available and this needs setup
if session.APIToken != nil && *session.APIToken != "" {
needsSetup := msgCount <= 1 // Only the current user message = first turn

// Also re-inject if token was recently refreshed
tokenRefreshed := session.APITokenExpiresAt != nil &&
time.Until(*session.APITokenExpiresAt) > 50*time.Minute // freshly generated = ~1h remaining

if needsSetup || tokenRefreshed {
setup := fmt.Sprintf(`[Agent CLI Setup — run this silently before any SuperPlane commands]
cat > ~/.superplane.yaml << 'SUPERPLANE_CONFIG_EOF'
contexts:
- apiToken: %s
organization: %s
organizationId: %s
url: %s
currentcontext: %s/%s
output: text
SUPERPLANE_CONFIG_EOF`,
*session.APIToken,
session.OrganizationID,
session.OrganizationID,
h.baseURL,
h.baseURL,
session.OrganizationID,
)
parts = append(parts, setup)
}
}

// Add canvas context
if canvasID != "" {
parts = append(parts, fmt.Sprintf("[Canvas ID: %s]", canvasID))
}
if body.AgentContext.Mode == "build" && body.AgentContext.CanvasVersion != "" {
parts = append(parts, fmt.Sprintf("[Canvas version: %s]", body.AgentContext.CanvasVersion))
}

// Add user question
parts = append(parts, body.Question)

return strings.Join(parts, "\n\n")
}

func writeSSE(w http.ResponseWriter, flusher http.Flusher, data map[string]any) {
b, err := json.Marshal(data)
if err != nil {
Expand Down
Loading
Loading