Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions internal/channels/telegram/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"net"
"net/http"
"net/url"
"strings"
Expand All @@ -24,6 +25,8 @@ type Channel struct {
bot *telego.Bot
config config.TelegramConfig
httpClient *http.Client
transport *http.Transport
ipv4Once sync.Once // guards enableIPv4Only to prevent data race
pairingService store.PairingStore
agentStore store.AgentStore // for agent key lookup (nil if not configured)
configPermStore store.ConfigPermissionStore // for group file writer management (nil if not configured)
Expand Down Expand Up @@ -64,19 +67,29 @@ func New(cfg config.TelegramConfig, msgBus *bus.MessageBus, pairingSvc store.Pai
opts = append(opts, telego.WithAPIServer(cfg.APIServer))
}

httpClient := &http.Client{
Timeout: 30 * time.Second,
}
// Isolate transport per account: prevents cross-bot connection pool contention
// and allows per-account IPv4 fallback without affecting other bots.
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConnsPerHost = 64 // default 2 is too low for high-concurrency bots

if cfg.Proxy != "" {
proxyURL, parseErr := url.Parse(cfg.Proxy)
if parseErr != nil {
return nil, fmt.Errorf("invalid proxy URL %q: %w", cfg.Proxy, parseErr)
}
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.Proxy = http.ProxyURL(proxyURL)
httpClient.Transport = transport
}

httpClient := &http.Client{
Timeout: 30 * time.Second,
Transport: transport,
}
// Apply ForceIPv4 at init if configured (explicit, predictable, no runtime heuristic).
if cfg.ForceIPv4 {
applyIPv4Dialer(transport)
slog.Info("telegram: forced IPv4 for account via config")
}

opts = append(opts, telego.WithHTTPClient(httpClient))

bot, err := telego.NewBot(cfg.Token, opts...)
Expand All @@ -102,6 +115,7 @@ func New(cfg config.TelegramConfig, msgBus *bus.MessageBus, pairingSvc store.Pai
bot: bot,
config: cfg,
httpClient: httpClient,
transport: transport,
pairingService: pairingSvc,
agentStore: agentStore,
configPermStore: configPermStore,
Expand Down Expand Up @@ -143,12 +157,15 @@ func (c *Channel) Start(ctx context.Context) error {
// Register bot menu commands with retry.
go func() {
commands := DefaultMenuCommands()
syncCtx, cancel := context.WithTimeout(pollCtx, probeOverallTimeout)
defer cancel()

for attempt := 1; attempt <= 3; attempt++ {
if err := c.SyncMenuCommands(pollCtx, commands); err != nil {
if err := c.SyncMenuCommands(syncCtx, commands); err != nil {
slog.Warn("failed to sync telegram menu commands", "error", err, "attempt", attempt)
if attempt < 3 {
select {
case <-pollCtx.Done():
case <-syncCtx.Done():
return
case <-time.After(time.Duration(attempt*5) * time.Second):
}
Expand Down Expand Up @@ -258,10 +275,35 @@ func (c *Channel) Stop(_ context.Context) error {
slog.Warn("telegram polling goroutine did not exit within timeout")
}
}

return nil
}

// applyIPv4Dialer forces a transport to use IPv4 only by overriding DialContext.
func applyIPv4Dialer(t *http.Transport) {
dialer := &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}
t.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
if network == "tcp" {
network = "tcp4"
}
return dialer.DialContext(ctx, network, addr)
}
}

// enableIPv4Only forces the bot's transport to use IPv4 only for all future
// requests. Safe to call from multiple goroutines concurrently (uses sync.Once).
func (c *Channel) enableIPv4Only() {
if c == nil || c.transport == nil {
return
}
c.ipv4Once.Do(func() {
applyIPv4Dialer(c.transport)
slog.Info("telegram: enabled sticky IPv4 fallback", "bot", c.bot.Username())
})
}

// parseChatID converts a string chat ID to int64.
func parseChatID(chatIDStr string) (int64, error) {
var id int64
Expand Down
6 changes: 6 additions & 0 deletions internal/channels/telegram/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ const (

// pairingReplyDebounce is the minimum interval between pairing replies to the same user.
pairingReplyDebounce = 60 * time.Second

// sendOverallTimeout is the maximum duration for a multi-retry send sequence.
sendOverallTimeout = 60 * time.Second

// probeOverallTimeout is the maximum duration for initial bot status check and command sync.
probeOverallTimeout = 60 * time.Second
)
2 changes: 2 additions & 0 deletions internal/channels/telegram/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type telegramInstanceConfig struct {
MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` // deprecated: use media_max_mb
LinkPreview *bool `json:"link_preview,omitempty"`
BlockReply *bool `json:"block_reply,omitempty"`
ForceIPv4 bool `json:"force_ipv4,omitempty"`
AllowFrom []string `json:"allow_from,omitempty"`
}

Expand Down Expand Up @@ -99,6 +100,7 @@ func buildChannel(name string, creds json.RawMessage, cfg json.RawMessage,
MediaMaxBytes: resolveMediaMaxBytes(ic),
LinkPreview: ic.LinkPreview,
BlockReply: ic.BlockReply,
ForceIPv4: ic.ForceIPv4,
}

// DB instances default to "pairing" for groups (secure by default).
Expand Down
25 changes: 18 additions & 7 deletions internal/channels/telegram/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ func isRetryableNetworkErr(err error) bool {
// retrySend wraps a Telegram send call with retry logic for transient network errors.
// Parse errors are NOT retried (handled by caller's HTML fallback).
// resetFn is called before each retry (e.g. to seek file handles back to start). Can be nil.
func retrySend(ctx context.Context, name string, resetFn func(), fn func() error) error {
func (c *Channel) retrySend(ctx context.Context, name string, resetFn func(), fn func(context.Context) error) error {
ctx, cancel := context.WithTimeout(ctx, sendOverallTimeout)
defer cancel()

var err error
for attempt := 1; attempt <= sendMaxRetries; attempt++ {
err = fn()
err = fn(ctx)
if err == nil {
return nil
}
Expand All @@ -68,6 +71,14 @@ func retrySend(ctx context.Context, name string, resetFn func(), fn func() error
if !isRetryableNetworkErr(err) || attempt == sendMaxRetries {
return err
}

// If we hit a network-level connectivity issue (likely IPv6 routing),
// arm sticky IPv4 fallback. Only triggers on "unreachable" — not timeouts
// (which can be rate-limiting) or DNS errors (unrelated to IPv6).
if strings.Contains(err.Error(), "unreachable") {
c.enableIPv4Only()
}

slog.Warn("telegram send retry",
"func", name, "attempt", attempt, "max", sendMaxRetries, "error", err)
if resetFn != nil {
Expand Down Expand Up @@ -290,7 +301,7 @@ func (c *Channel) sendHTMLWithDepth(ctx context.Context, chatID int64, htmlConte
}
}

err := retrySend(ctx, "sendMessage", nil, func() error {
err := c.retrySend(ctx, "sendMessage", nil, func(ctx context.Context) error {
_, e := c.bot.SendMessage(ctx, tgMsg)
return e
})
Expand Down Expand Up @@ -382,7 +393,7 @@ func (c *Channel) sendPhoto(ctx context.Context, chatID telego.ChatID, filePath,
params.ReplyParameters = &telego.ReplyParameters{MessageID: replyTo, AllowSendingWithoutReply: true}
}

err = retrySend(ctx, "sendPhoto", func() { file.Seek(0, 0) }, func() error {
err = c.retrySend(ctx, "sendPhoto", func() { file.Seek(0, 0) }, func(ctx context.Context) error {
_, e := c.bot.SendPhoto(ctx, params)
return e
})
Expand Down Expand Up @@ -425,7 +436,7 @@ func (c *Channel) sendVideo(ctx context.Context, chatID telego.ChatID, filePath,
params.ReplyParameters = &telego.ReplyParameters{MessageID: replyTo, AllowSendingWithoutReply: true}
}

err = retrySend(ctx, "sendVideo", func() { file.Seek(0, 0) }, func() error {
err = c.retrySend(ctx, "sendVideo", func() { file.Seek(0, 0) }, func(ctx context.Context) error {
_, e := c.bot.SendVideo(ctx, params)
return e
})
Expand Down Expand Up @@ -468,7 +479,7 @@ func (c *Channel) sendAudio(ctx context.Context, chatID telego.ChatID, filePath,
params.ReplyParameters = &telego.ReplyParameters{MessageID: replyTo, AllowSendingWithoutReply: true}
}

err = retrySend(ctx, "sendAudio", func() { file.Seek(0, 0) }, func() error {
err = c.retrySend(ctx, "sendAudio", func() { file.Seek(0, 0) }, func(ctx context.Context) error {
_, e := c.bot.SendAudio(ctx, params)
return e
})
Expand Down Expand Up @@ -511,7 +522,7 @@ func (c *Channel) sendDocument(ctx context.Context, chatID telego.ChatID, filePa
params.ReplyParameters = &telego.ReplyParameters{MessageID: replyTo, AllowSendingWithoutReply: true}
}

err = retrySend(ctx, "sendDocument", func() { file.Seek(0, 0) }, func() error {
err = c.retrySend(ctx, "sendDocument", func() { file.Seek(0, 0) }, func(ctx context.Context) error {
_, e := c.bot.SendDocument(ctx, params)
return e
})
Expand Down
1 change: 1 addition & 0 deletions internal/config/config_channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type TelegramConfig struct {
MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` // max media download size in bytes (default 20MB)
LinkPreview *bool `json:"link_preview,omitempty"` // enable URL previews in messages (default true)
BlockReply *bool `json:"block_reply,omitempty"` // override gateway block_reply (nil = inherit)
ForceIPv4 bool `json:"force_ipv4,omitempty"` // force IPv4 for all Telegram API requests (use when IPv6 routing is broken)

// Optional STT (Speech-to-Text) pipeline for voice/audio inbound messages.
// When stt_proxy_url is set, audio/voice messages are transcribed before being forwarded to the agent.
Expand Down
Loading