Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func runGateway() {
server.SetDB(pgStores.DB)
server.SetPolicyEngine(permPE)
server.SetPairingService(pgStores.Pairing)
server.SetBuiltinToolsStore(pgStores.BuiltinTools)
server.SetMessageBus(msgBus)
server.SetOAuthHandler(httpapi.NewOAuthHandler(pgStores.Providers, pgStores.ConfigSecrets, providerRegistry, msgBus))

Expand Down Expand Up @@ -333,7 +334,7 @@ func runGateway() {
httpapi.InitGatewayToken(cfg.Gateway.Token)
exportTokenStore := httpapi.InitExportTokenStore()
defer exportTokenStore.Stop()
agentsH, skillsH, tracesH, mcpH, channelInstancesH, providersH, builtinToolsH, pendingMessagesH, teamEventsH, secureCLIH, secureCLIGrantH, mcpUserCredsH := wireHTTP(pgStores, cfg.Agents.Defaults.Workspace, dataDir, bundledSkillsDir, msgBus, toolsReg, providerRegistry, modelReg, permPE.IsOwner, gatewayAddr, mcpToolLister)
agentsH, skillsH, tracesH, mcpH, channelInstancesH, providersH, builtinToolsH, pendingMessagesH, teamEventsH, secureCLIH, secureCLIGrantH, mcpUserCredsH := wireHTTP(pgStores, cfg.Agents.Defaults.Workspace, dataDir, bundledSkillsDir, msgBus, toolsReg, providerRegistry, modelReg, permPE.IsOwner, gatewayAddr, cfg.Gateway.Token, mcpToolLister)

// Wire dependencies for system prompt preview parity.
if agentsH != nil {
Expand Down
23 changes: 23 additions & 0 deletions cmd/gateway_agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ func buildEmbeddingProvider(
"provider", dbp.Name, "requested", es.Dimensions, "required", store.RequiredMemoryEmbeddingDimensions)
}

// Gemini native provider — uses its own embedding API (not OpenAI-compatible).
if dbp.ProviderType == store.ProviderGeminiNative {
apiKey := dbp.APIKey
if providerReg != nil {
if regProv, regErr := providerReg.Get(context.Background(), dbp.Name); regErr == nil {
if gp, ok := regProv.(interface{ APIKey() string }); ok && gp.APIKey() != "" {
apiKey = gp.APIKey()
}
}
}
if apiKey == "" {
slog.Warn("gemini embedding provider has no API key", "name", dbp.Name)
return nil
}
if model == "" {
model = memory.GeminiDefaultEmbeddingModel
}
ep := memory.NewGeminiEmbeddingProvider(dbp.Name, apiKey, apiBase, model)
ep.WithDimensions(dims)
slog.Info("gemini embedding provider configured", "name", dbp.Name, "model", model, "dims", dims)
return ep
}

// Try registry first for the actual API key / base (handles runtime-registered providers)
if providerReg != nil {
if regProv, regErr := providerReg.Get(context.Background(), dbp.Name); regErr == nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/gateway_http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func gatewayHTTPDoRaw(method, path string, body any) ([]byte, int, error) {
req.Header.Set("X-GoClaw-User-Id", "system")
if token := resolveGatewayToken(); token != "" {
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("X-GoClaw-User-Id", "system")
}

resp, err := httpClient.Do(req)
Expand Down
6 changes: 5 additions & 1 deletion cmd/gateway_http_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// wireHTTP creates HTTP handlers (agents + skills + traces + MCP + channel instances + providers + builtin tools + pending messages).
func wireHTTP(stores *store.Stores, defaultWorkspace, dataDir, bundledSkillsDir string, msgBus *bus.MessageBus, toolsReg *tools.Registry, providerReg *providers.Registry, modelReg providers.ModelRegistry, isOwner func(string) bool, gatewayAddr string, mcpToolLister httpapi.MCPToolLister) (*httpapi.AgentsHandler, *httpapi.SkillsHandler, *httpapi.TracesHandler, *httpapi.MCPHandler, *httpapi.ChannelInstancesHandler, *httpapi.ProvidersHandler, *httpapi.BuiltinToolsHandler, *httpapi.PendingMessagesHandler, *httpapi.TeamEventsHandler, *httpapi.SecureCLIHandler, *httpapi.SecureCLIGrantHandler, *httpapi.MCPUserCredentialsHandler) {
func wireHTTP(stores *store.Stores, defaultWorkspace, dataDir, bundledSkillsDir string, msgBus *bus.MessageBus, toolsReg *tools.Registry, providerReg *providers.Registry, modelReg providers.ModelRegistry, isOwner func(string) bool, gatewayAddr, gatewayToken string, mcpToolLister httpapi.MCPToolLister) (*httpapi.AgentsHandler, *httpapi.SkillsHandler, *httpapi.TracesHandler, *httpapi.MCPHandler, *httpapi.ChannelInstancesHandler, *httpapi.ProvidersHandler, *httpapi.BuiltinToolsHandler, *httpapi.PendingMessagesHandler, *httpapi.TeamEventsHandler, *httpapi.SecureCLIHandler, *httpapi.SecureCLIGrantHandler, *httpapi.MCPUserCredentialsHandler) {
var agentsH *httpapi.AgentsHandler
var skillsH *httpapi.SkillsHandler
var tracesH *httpapi.TracesHandler
Expand Down Expand Up @@ -70,6 +70,10 @@ func wireHTTP(stores *store.Stores, defaultWorkspace, dataDir, bundledSkillsDir
if stores.MCP != nil {
providersH.SetMCPServerLookup(buildMCPServerLookup(stores.MCP))
}
acpMCPData = buildACPMCPData(gatewayAddr, gatewayToken, stores.MCP)
providersH.SetProviderReloadFn(func(p *store.LLMProviderData) {
registerACPFromDB(providerReg, *p)
})
if stores.Tracing != nil {
providersH.SetTracingStore(stores.Tracing)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/gateway_managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,8 @@ func wireExtras(
// Unregister old instance (closes ProcessPool) then re-register
providerReg.Unregister(p.Name)
if p.Enabled {
acpMCPData = buildACPMCPData(loopbackAddr(appCfg.Gateway.Host, appCfg.Gateway.Port),
appCfg.Gateway.Token, stores.MCP)
registerACPFromDB(providerReg, *p)
}
})
Expand Down
118 changes: 74 additions & 44 deletions cmd/gateway_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,36 @@ import (
"github.com/nextlevelbuilder/goclaw/internal/tools"
)

// acpMCPData is the package-level MCP bridge config consumed by
// registerACPFromConfig and registerACPFromDB. Callers populate it via
// buildACPMCPData before invoking either register* — keeping these functions
// at their original 2-arg signatures (registry + cfg/p) and making
// hot-reload paths idempotent (they refresh this var before re-registering).
//
// Single-process gateway scope means a package-level var is acceptable here:
// gateway addr/token/MCPStore are fixed for the lifetime of the binary, and
// the four set-sites (startup config, startup DB iteration, two hot-reload
// closures) all derive identical values.
var acpMCPData *providers.MCPConfigData

// buildACPMCPData assembles the MCP bridge config consumed by ACP providers.
// Returns nil when no gateway addr is available, which makes downstream
// settings.MCPData nil and the ACP provider skip MCP server injection.
// mcpStore is optional — when non-nil, the AgentMCPLookup closure is attached
// so per-agent MCP servers are surfaced to the ACP subprocess at session/new
// time (DB-registered providers only; config-based providers run without
// per-agent MCP).
func buildACPMCPData(gatewayAddr, gatewayToken string, mcpStore store.MCPServerStore) *providers.MCPConfigData {
if gatewayAddr == "" {
return nil
}
data := providers.BuildCLIMCPConfigData(nil, gatewayAddr, gatewayToken)
if mcpStore != nil {
data.AgentMCPLookup = buildMCPServerLookup(mcpStore)
}
return data
}

// loopbackAddr normalizes a gateway address for local connections.
// CLI processes on the same machine can't connect to 0.0.0.0 on some OSes.
func loopbackAddr(host string, port int) string {
Expand All @@ -29,6 +59,7 @@ func loopbackAddr(host string, port int) string {
}

func registerProviders(registry *providers.Registry, cfg *config.Config, modelReg providers.ModelRegistry) {
gatewayAddr := loopbackAddr(cfg.Gateway.Host, cfg.Gateway.Port)
if cfg.Providers.Anthropic.APIKey != "" {
registry.Register(providers.NewAnthropicProvider(cfg.Providers.Anthropic.APIKey,
providers.WithAnthropicBaseURL(cfg.Providers.Anthropic.APIBase),
Expand Down Expand Up @@ -188,7 +219,6 @@ func registerProviders(registry *providers.Registry, cfg *config.Config, modelRe
opts = append(opts, providers.WithClaudeCLIPermMode(cfg.Providers.ClaudeCLI.PermMode))
}
// Build per-session MCP config: external MCP servers + GoClaw bridge
gatewayAddr := loopbackAddr(cfg.Gateway.Host, cfg.Gateway.Port)
mcpData := providers.BuildCLIMCPConfigData(cfg.Tools.McpServers, gatewayAddr, cfg.Gateway.Token)
opts = append(opts, providers.WithClaudeCLIMCPConfigData(mcpData))
// Enable GoClaw security hooks (shell deny patterns, path restrictions)
Expand All @@ -200,6 +230,7 @@ func registerProviders(registry *providers.Registry, cfg *config.Config, modelRe

// ACP provider (config-based) — orchestrates any ACP-compatible agent binary
if cfg.Providers.ACP.Binary != "" {
acpMCPData = buildACPMCPData(gatewayAddr, cfg.Gateway.Token, nil)
registerACPFromConfig(registry, cfg.Providers.ACP)
}
}
Expand Down Expand Up @@ -276,6 +307,7 @@ func registerProvidersFromDB(registry *providers.Registry, provStore store.Provi
slog.Warn("failed to load providers from DB", "error", err)
return
}
acpMCPData = buildACPMCPData(gatewayAddr, gatewayToken, mcpStore)
for _, p := range dbProviders {
// Claude CLI doesn't need API key
if !p.Enabled {
Expand Down Expand Up @@ -411,35 +443,43 @@ func registerProvidersFromDB(registry *providers.Registry, provStore store.Provi
}

// registerACPFromConfig registers an ACP provider from config file settings.
// All ACP options consume one shared *providers.ACPSettings populated from cfg;
// per-binary defaults (e.g. gemini's --include-directories) are applied inside
// the relevant With* option in the providers package. The MCP bridge config
// is read from the package-level acpMCPData (set by callers via
// buildACPMCPData before invocation).
func registerACPFromConfig(registry *providers.Registry, cfg config.ACPConfig) {
if _, err := exec.LookPath(cfg.Binary); err != nil {
slog.Warn("acp: binary not found, skipping", "binary", cfg.Binary, "error", err)
return
}
idleTTL := 5 * time.Minute
if cfg.IdleTTL != "" {
if d, err := time.ParseDuration(cfg.IdleTTL); err == nil {
idleTTL = d
}
}
workDir := cfg.WorkDir
if workDir == "" {
workDir = defaultACPWorkDir()
}
var opts []providers.ACPOption
if cfg.Model != "" {
opts = append(opts, providers.WithACPModel(cfg.Model))
}
if cfg.PermMode != "" {
opts = append(opts, providers.WithACPPermMode(cfg.PermMode))
settings := &providers.ACPSettings{
Binary: cfg.Binary,
Args: cfg.Args,
Model: cfg.Model,
PermMode: cfg.PermMode,
IdleTTL: cfg.IdleTTL,
WorkDir: cfg.WorkDir,
MCPData: acpMCPData,
}
registry.Register(providers.NewACPProvider(
cfg.Binary, cfg.Args, workDir, idleTTL, tools.DefaultDenyPatterns(), opts...,
settings.Binary, settings.Args, settings.WorkDirOrDefault(),
settings.IdleTTLOrDefault(5*time.Minute),
tools.DefaultDenyPatterns(),
providers.WithACPModel(settings),
providers.WithACPPermMode(settings),
providers.WithACPMCPConfigData(settings),
providers.WithIncludeDirectories(settings),
))
slog.Info("registered provider", "name", "acp", "binary", cfg.Binary)
slog.Info("registered provider", "name", "acp", "binary", cfg.Binary, "args", cfg.Args)
}

// registerACPFromDB registers an ACP provider from a DB provider row.
// registerACPFromDB registers an ACP provider from a DB row.
// Called at startup (via registerProvidersFromDB) and on hot-reload.
// DB JSONB unmarshals directly into providers.ACPSettings — the shared struct's
// json tags match the historic schema (args, idle_ttl, perm_mode, work_dir,
// include_directories). The MCP bridge config is read from the package-level
// acpMCPData (set by callers via buildACPMCPData before invocation).
func registerACPFromDB(registry *providers.Registry, p store.LLMProviderData) {
binary := p.APIBase // repurpose api_base as binary path
if binary == "" {
Expand All @@ -454,37 +494,27 @@ func registerACPFromDB(registry *providers.Registry, p store.LLMProviderData) {
slog.Warn("acp: binary not found, skipping", "binary", binary, "error", err)
return
}
// Parse settings JSONB for extra config
var settings struct {
Args []string `json:"args"`
IdleTTL string `json:"idle_ttl"`
PermMode string `json:"perm_mode"`
WorkDir string `json:"work_dir"`
settings := &providers.ACPSettings{
Name: p.Name,
Binary: binary,
Model: p.Name, // historical: provider name doubles as default agent/model
}
if p.Settings != nil {
if err := json.Unmarshal(p.Settings, &settings); err != nil {
if err := json.Unmarshal(p.Settings, settings); err != nil {
slog.Warn("acp: invalid settings JSON, using defaults", "name", p.Name, "error", err)
}
}
idleTTL := 5 * time.Minute
if settings.IdleTTL != "" {
if d, err := time.ParseDuration(settings.IdleTTL); err == nil {
idleTTL = d
}
}
workDir := settings.WorkDir
if workDir == "" {
workDir = defaultACPWorkDir()
}
settings.MCPData = acpMCPData
registry.RegisterForTenant(p.TenantID, providers.NewACPProvider(
binary, settings.Args, workDir, idleTTL, tools.DefaultDenyPatterns(),
providers.WithACPName(p.Name),
providers.WithACPModel(p.Name),
settings.Binary, settings.Args, settings.WorkDirOrDefault(),
settings.IdleTTLOrDefault(5*time.Minute),
tools.DefaultDenyPatterns(),
providers.WithACPName(settings),
providers.WithACPModel(settings),
providers.WithACPPermMode(settings),
providers.WithACPMCPConfigData(settings),
providers.WithIncludeDirectories(settings),
))
slog.Info("registered provider from DB", "name", p.Name, "type", "acp")
}

// defaultACPWorkDir returns the default workspace directory for ACP agents.
func defaultACPWorkDir() string {
return filepath.Join(config.ResolvedDataDirFromEnv(), "acp-workspaces")
}
16 changes: 16 additions & 0 deletions internal/channels/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ type BaseChannel struct {
approvedGroups sync.Map // chatID → true (in-memory cache for paired group approval)
pairingDebounce sync.Map // senderID → time.Time (debounce pairing reply sends)
requireMention bool

onDisconnect func() // optional: called on unexpected runtime disconnection for auto-retry
}

// NewBaseChannel creates a new BaseChannel with the given parameters.
Expand Down Expand Up @@ -469,6 +471,20 @@ func (c *BaseChannel) MarkDegraded(summary, detail string, kind ChannelFailureKi
c.setHealth(NewChannelHealth(ChannelHealthStateDegraded, summary, detail, kind, retryable))
}

// SetOnDisconnect sets a callback invoked when the channel disconnects unexpectedly at runtime.
// Used by Manager to wire auto-retry without the channel needing to know about Manager.
func (c *BaseChannel) SetOnDisconnect(fn func()) {
c.onDisconnect = fn
}

// NotifyDisconnect calls the onDisconnect callback if set.
// Channel implementations call this when polling/streaming stops unexpectedly.
func (c *BaseChannel) NotifyDisconnect() {
if c.onDisconnect != nil {
c.onDisconnect()
}
}

// MarkFailed records a startup or runtime failure.
func (c *BaseChannel) MarkFailed(summary, detail string, kind ChannelFailureKind, retryable bool) {
if summary == "" {
Expand Down
15 changes: 8 additions & 7 deletions internal/channels/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ type PendingHistory struct {
order []string // insertion order for LRU eviction

// Persistence (optional — nil means RAM-only)
channelName string
store store.PendingMessageStore
flushMu sync.Mutex
flushBuf []store.PendingMessage
flushSignal chan struct{}
stopCh chan struct{}
stopped chan struct{}
channelName string
store store.PendingMessageStore
flushMu sync.Mutex
flushBuf []store.PendingMessage
flushSignal chan struct{}
stopCh chan struct{}
stopped chan struct{}
flusherStarted bool // true after StartFlusher() — guards StopFlusher against unstarted state

// Tenant isolation for DB operations.
tenantID uuid.UUID
Expand Down
5 changes: 3 additions & 2 deletions internal/channels/history_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ func (ph *PendingHistory) StartFlusher() {
if ph.store == nil {
return
}
ph.flusherStarted = true
go ph.flushLoop()
}

// StopFlusher stops the background flusher and flushes remaining buffer. No-op if RAM-only.
// StopFlusher stops the background flusher and flushes remaining buffer. No-op if RAM-only or never started.
func (ph *PendingHistory) StopFlusher() {
if ph.store == nil {
if ph.store == nil || !ph.flusherStarted {
return
}
close(ph.stopCh)
Expand Down
Loading
Loading