diff --git a/internal/channels/telegram/channel.go b/internal/channels/telegram/channel.go index 6fb95e427d..902f746cee 100644 --- a/internal/channels/telegram/channel.go +++ b/internal/channels/telegram/channel.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "net" "net/http" "net/url" "strings" @@ -24,6 +25,8 @@ type Channel struct { bot *telego.Bot config config.TelegramConfig httpClient *http.Client + transport *http.Transport + ipv4Once sync.Once // guards enableIPv4Only to prevent data race pairingService store.PairingStore agentStore store.AgentStore // for agent key lookup (nil if not configured) configPermStore store.ConfigPermissionStore // for group file writer management (nil if not configured) @@ -64,19 +67,29 @@ func New(cfg config.TelegramConfig, msgBus *bus.MessageBus, pairingSvc store.Pai opts = append(opts, telego.WithAPIServer(cfg.APIServer)) } - httpClient := &http.Client{ - Timeout: 30 * time.Second, - } + // Isolate transport per account: prevents cross-bot connection pool contention + // and allows per-account IPv4 fallback without affecting other bots. + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConnsPerHost = 64 // default 2 is too low for high-concurrency bots if cfg.Proxy != "" { proxyURL, parseErr := url.Parse(cfg.Proxy) if parseErr != nil { return nil, fmt.Errorf("invalid proxy URL %q: %w", cfg.Proxy, parseErr) } - transport := http.DefaultTransport.(*http.Transport).Clone() transport.Proxy = http.ProxyURL(proxyURL) - httpClient.Transport = transport } + + httpClient := &http.Client{ + Timeout: 30 * time.Second, + Transport: transport, + } + // Apply ForceIPv4 at init if configured (explicit, predictable, no runtime heuristic). + if cfg.ForceIPv4 { + applyIPv4Dialer(transport) + slog.Info("telegram: forced IPv4 for account via config") + } + opts = append(opts, telego.WithHTTPClient(httpClient)) bot, err := telego.NewBot(cfg.Token, opts...) @@ -102,6 +115,7 @@ func New(cfg config.TelegramConfig, msgBus *bus.MessageBus, pairingSvc store.Pai bot: bot, config: cfg, httpClient: httpClient, + transport: transport, pairingService: pairingSvc, agentStore: agentStore, configPermStore: configPermStore, @@ -143,12 +157,15 @@ func (c *Channel) Start(ctx context.Context) error { // Register bot menu commands with retry. go func() { commands := DefaultMenuCommands() + syncCtx, cancel := context.WithTimeout(pollCtx, probeOverallTimeout) + defer cancel() + for attempt := 1; attempt <= 3; attempt++ { - if err := c.SyncMenuCommands(pollCtx, commands); err != nil { + if err := c.SyncMenuCommands(syncCtx, commands); err != nil { slog.Warn("failed to sync telegram menu commands", "error", err, "attempt", attempt) if attempt < 3 { select { - case <-pollCtx.Done(): + case <-syncCtx.Done(): return case <-time.After(time.Duration(attempt*5) * time.Second): } @@ -258,10 +275,35 @@ func (c *Channel) Stop(_ context.Context) error { slog.Warn("telegram polling goroutine did not exit within timeout") } } - return nil } +// applyIPv4Dialer forces a transport to use IPv4 only by overriding DialContext. +func applyIPv4Dialer(t *http.Transport) { + dialer := &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + } + t.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + if network == "tcp" { + network = "tcp4" + } + return dialer.DialContext(ctx, network, addr) + } +} + +// enableIPv4Only forces the bot's transport to use IPv4 only for all future +// requests. Safe to call from multiple goroutines concurrently (uses sync.Once). +func (c *Channel) enableIPv4Only() { + if c == nil || c.transport == nil { + return + } + c.ipv4Once.Do(func() { + applyIPv4Dialer(c.transport) + slog.Info("telegram: enabled sticky IPv4 fallback", "bot", c.bot.Username()) + }) +} + // parseChatID converts a string chat ID to int64. func parseChatID(chatIDStr string) (int64, error) { var id int64 diff --git a/internal/channels/telegram/constants.go b/internal/channels/telegram/constants.go index 41fbff311f..0ee8bbb6db 100644 --- a/internal/channels/telegram/constants.go +++ b/internal/channels/telegram/constants.go @@ -12,4 +12,10 @@ const ( // pairingReplyDebounce is the minimum interval between pairing replies to the same user. pairingReplyDebounce = 60 * time.Second + + // sendOverallTimeout is the maximum duration for a multi-retry send sequence. + sendOverallTimeout = 60 * time.Second + + // probeOverallTimeout is the maximum duration for initial bot status check and command sync. + probeOverallTimeout = 60 * time.Second ) diff --git a/internal/channels/telegram/factory.go b/internal/channels/telegram/factory.go index bcbd9a9a15..e379606ff4 100644 --- a/internal/channels/telegram/factory.go +++ b/internal/channels/telegram/factory.go @@ -34,6 +34,7 @@ type telegramInstanceConfig struct { MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` // deprecated: use media_max_mb LinkPreview *bool `json:"link_preview,omitempty"` BlockReply *bool `json:"block_reply,omitempty"` + ForceIPv4 bool `json:"force_ipv4,omitempty"` AllowFrom []string `json:"allow_from,omitempty"` } @@ -99,6 +100,7 @@ func buildChannel(name string, creds json.RawMessage, cfg json.RawMessage, MediaMaxBytes: resolveMediaMaxBytes(ic), LinkPreview: ic.LinkPreview, BlockReply: ic.BlockReply, + ForceIPv4: ic.ForceIPv4, } // DB instances default to "pairing" for groups (secure by default). diff --git a/internal/channels/telegram/send.go b/internal/channels/telegram/send.go index 0e4a1503ac..66c21d5631 100644 --- a/internal/channels/telegram/send.go +++ b/internal/channels/telegram/send.go @@ -54,10 +54,13 @@ func isRetryableNetworkErr(err error) bool { // retrySend wraps a Telegram send call with retry logic for transient network errors. // Parse errors are NOT retried (handled by caller's HTML fallback). // resetFn is called before each retry (e.g. to seek file handles back to start). Can be nil. -func retrySend(ctx context.Context, name string, resetFn func(), fn func() error) error { +func (c *Channel) retrySend(ctx context.Context, name string, resetFn func(), fn func(context.Context) error) error { + ctx, cancel := context.WithTimeout(ctx, sendOverallTimeout) + defer cancel() + var err error for attempt := 1; attempt <= sendMaxRetries; attempt++ { - err = fn() + err = fn(ctx) if err == nil { return nil } @@ -68,6 +71,14 @@ func retrySend(ctx context.Context, name string, resetFn func(), fn func() error if !isRetryableNetworkErr(err) || attempt == sendMaxRetries { return err } + + // If we hit a network-level connectivity issue (likely IPv6 routing), + // arm sticky IPv4 fallback. Only triggers on "unreachable" — not timeouts + // (which can be rate-limiting) or DNS errors (unrelated to IPv6). + if strings.Contains(err.Error(), "unreachable") { + c.enableIPv4Only() + } + slog.Warn("telegram send retry", "func", name, "attempt", attempt, "max", sendMaxRetries, "error", err) if resetFn != nil { @@ -290,7 +301,7 @@ func (c *Channel) sendHTMLWithDepth(ctx context.Context, chatID int64, htmlConte } } - err := retrySend(ctx, "sendMessage", nil, func() error { + err := c.retrySend(ctx, "sendMessage", nil, func(ctx context.Context) error { _, e := c.bot.SendMessage(ctx, tgMsg) return e }) @@ -382,7 +393,7 @@ func (c *Channel) sendPhoto(ctx context.Context, chatID telego.ChatID, filePath, params.ReplyParameters = &telego.ReplyParameters{MessageID: replyTo, AllowSendingWithoutReply: true} } - err = retrySend(ctx, "sendPhoto", func() { file.Seek(0, 0) }, func() error { + err = c.retrySend(ctx, "sendPhoto", func() { file.Seek(0, 0) }, func(ctx context.Context) error { _, e := c.bot.SendPhoto(ctx, params) return e }) @@ -425,7 +436,7 @@ func (c *Channel) sendVideo(ctx context.Context, chatID telego.ChatID, filePath, params.ReplyParameters = &telego.ReplyParameters{MessageID: replyTo, AllowSendingWithoutReply: true} } - err = retrySend(ctx, "sendVideo", func() { file.Seek(0, 0) }, func() error { + err = c.retrySend(ctx, "sendVideo", func() { file.Seek(0, 0) }, func(ctx context.Context) error { _, e := c.bot.SendVideo(ctx, params) return e }) @@ -468,7 +479,7 @@ func (c *Channel) sendAudio(ctx context.Context, chatID telego.ChatID, filePath, params.ReplyParameters = &telego.ReplyParameters{MessageID: replyTo, AllowSendingWithoutReply: true} } - err = retrySend(ctx, "sendAudio", func() { file.Seek(0, 0) }, func() error { + err = c.retrySend(ctx, "sendAudio", func() { file.Seek(0, 0) }, func(ctx context.Context) error { _, e := c.bot.SendAudio(ctx, params) return e }) @@ -511,7 +522,7 @@ func (c *Channel) sendDocument(ctx context.Context, chatID telego.ChatID, filePa params.ReplyParameters = &telego.ReplyParameters{MessageID: replyTo, AllowSendingWithoutReply: true} } - err = retrySend(ctx, "sendDocument", func() { file.Seek(0, 0) }, func() error { + err = c.retrySend(ctx, "sendDocument", func() { file.Seek(0, 0) }, func(ctx context.Context) error { _, e := c.bot.SendDocument(ctx, params) return e }) diff --git a/internal/config/config_channels.go b/internal/config/config_channels.go index b8af42cd8a..0d2e1dfb8f 100644 --- a/internal/config/config_channels.go +++ b/internal/config/config_channels.go @@ -41,6 +41,7 @@ type TelegramConfig struct { MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` // max media download size in bytes (default 20MB) LinkPreview *bool `json:"link_preview,omitempty"` // enable URL previews in messages (default true) BlockReply *bool `json:"block_reply,omitempty"` // override gateway block_reply (nil = inherit) + ForceIPv4 bool `json:"force_ipv4,omitempty"` // force IPv4 for all Telegram API requests (use when IPv6 routing is broken) // Optional STT (Speech-to-Text) pipeline for voice/audio inbound messages. // When stt_proxy_url is set, audio/voice messages are transcribed before being forwarded to the agent.