99from steamship .agents .mixins .transports .transport import Transport
1010from steamship .agents .schema import EmitFunc , Metadata
1111from steamship .agents .service .agent_service import AgentService
12- from steamship .invocable import Config , InvocableResponse , InvocationContext , post
12+ from steamship .invocable import Config , InvocableResponse , post
1313from steamship .utils .kv_store import KeyValueStore
1414
15+ SETTINGS_KVSTORE_KEY = "telegram-transport"
16+
1517
1618class TelegramTransportConfig (Config ):
1719 bot_token : Optional [str ] = Field ("" , description = "The secret token for your Telegram bot" )
@@ -23,11 +25,9 @@ class TelegramTransportConfig(Config):
2325class TelegramTransport (Transport ):
2426 """Experimental base class to encapsulate a Telegram communication channel."""
2527
26- api_root : str
27- bot_token : str
28+ bot_token : Optional [str ]
2829 agent_service : AgentService
2930 config : TelegramTransportConfig
30- context : InvocationContext
3131
3232 def __init__ (
3333 self ,
@@ -37,29 +37,37 @@ def __init__(
3737 ):
3838 super ().__init__ (client = client )
3939 self .config = config
40- self .store = KeyValueStore (self .client , store_identifier = "_telegram_config" )
41- bot_token = (self .store .get ("bot_token" ) or {}).get ("token" )
42- self .bot_token = config .bot_token or bot_token
43- self .api_root = f"{ config .api_base } { self .bot_token } "
4440 self .agent_service = agent_service
41+ try :
42+ self .bot_token = self .get_telegram_access_token () or None
43+ except BaseException as e :
44+ logging .warning (e )
45+ self .bot_token = None
4546
4647 def instance_init (self ):
47- if self .bot_token :
48- self .api_root = f"{ self .config .api_base } { self .config .bot_token or self .bot_token } "
48+ if self .get_telegram_access_token ():
4949 try :
50- self ._instance_init ()
50+ self .telegram_connect_webhook ()
5151 except Exception : # noqa: S110
5252 pass
5353
54- def _instance_init (self ):
54+ @post ("telegram_connect_webhook" )
55+ def telegram_connect_webhook (self ):
56+ """Register this AgentService with Telegram."""
5557 webhook_url = self .agent_service .context .invocable_url + "telegram_respond"
5658
59+ api_root = self .get_api_root ()
60+ if not api_root :
61+ raise SteamshipError (
62+ message = "Unable to determine Telegram API root -- perhaps your bot token isn't set?"
63+ )
64+
5765 logging .info (
58- f"Setting Telegram webhook URL: { webhook_url } . Post is to { self . api_root } /setWebhook"
66+ f"Setting Telegram webhook URL: { webhook_url } . Post is to { api_root } /setWebhook"
5967 )
6068
6169 response = requests .get (
62- f"{ self . api_root } /setWebhook" ,
70+ f"{ api_root } /setWebhook" ,
6371 params = {
6472 "url" : webhook_url ,
6573 "allowed_updates" : ["message" ],
@@ -74,31 +82,38 @@ def _instance_init(self):
7482
7583 @post ("telegram_webhook_info" )
7684 def telegram_webhook_info (self ) -> dict :
77- return requests .get (self .api_root + "/getWebhookInfo" ).json ()
78-
79- @post ("connect_telegram" )
80- def connect_telegram (self , bot_token : str ):
81- self .store .set ("bot_token" , {"token" : bot_token })
82- self .bot_token = bot_token
85+ api_root = self .get_api_root ()
86+ if not api_root :
87+ raise SteamshipError (
88+ message = "Unable to fetch Telegram API info -- perhaps your bot token isn't set?"
89+ )
8390
84- try :
85- self .instance_init ()
86- return "OK"
87- except Exception as e :
88- return f"Could not set webhook for bot. Exception: { e } "
91+ return requests .get (api_root + "/getWebhookInfo" ).json ()
8992
9093 @post ("telegram_disconnect_webhook" )
9194 def telegram_disconnect_webhook (self , * args , ** kwargs ):
9295 """Unsubscribe from Telegram updates."""
93- requests .get (f"{ self .api_root } /deleteWebhook" )
96+ api_root = self .get_api_root ()
97+ if not api_root :
98+ raise SteamshipError (
99+ message = "Unable to disconnect from Telegram -- perhaps your bot token isn't set?"
100+ )
101+
102+ requests .get (f"{ api_root } /deleteWebhook" )
94103
95104 def _send (self , blocks : [Block ], metadata : Metadata ):
96105 """Send a response to the Telegram chat."""
106+ api_root = self .get_api_root ()
107+ if not api_root :
108+ raise SteamshipError (
109+ message = "Unable to send to Telegram -- perhaps your bot token isn't set?"
110+ )
111+
97112 for block in blocks :
98113 chat_id = block .chat_id
99114 if block .is_text () or block .text :
100115 params = {"chat_id" : int (chat_id ), "text" : block .text }
101- requests .get (f"{ self . api_root } /sendMessage" , params = params )
116+ requests .get (f"{ api_root } /sendMessage" , params = params )
102117 elif block .is_image () or block .is_audio () or block .is_video ():
103118 if block .is_image ():
104119 suffix = "sendPhoto"
@@ -115,7 +130,7 @@ def _send(self, blocks: [Block], metadata: Metadata):
115130 temp_file .write (_bytes )
116131 temp_file .seek (0 )
117132 resp = requests .post (
118- url = f"{ self . api_root } /{ suffix } ?chat_id={ chat_id } " ,
133+ url = f"{ api_root } /{ suffix } ?chat_id={ chat_id } " ,
119134 files = {key : temp_file },
120135 )
121136 if resp .status_code != 200 :
@@ -129,12 +144,16 @@ def _send(self, blocks: [Block], metadata: Metadata):
129144 )
130145
131146 def _get_file (self , file_id : str ) -> Dict [str , Any ]:
132- return requests .get (f"{ self .api_root } /getFile" , params = {"file_id" : file_id }).json ()[
133- "result"
134- ]
147+ api_root = self .get_api_root ()
148+ if not api_root :
149+ raise SteamshipError (
150+ message = "Unable to get Telegram file -- perhaps your bot token isn't set?"
151+ )
152+
153+ return requests .get (f"{ api_root } /getFile" , params = {"file_id" : file_id }).json ()["result" ]
135154
136155 def _get_file_url (self , file_id : str ) -> str :
137- return f"https://api.telegram.org/file/bot{ self .bot_token } /{ self ._get_file (file_id )['file_path' ]} "
156+ return f"https://api.telegram.org/file/bot{ self .get_telegram_access_token () } /{ self ._get_file (file_id )['file_path' ]} "
138157
139158 def _download_file (self , file_id : str ):
140159 result = requests .get (self ._get_file_url (file_id ))
@@ -233,3 +252,90 @@ def telegram_respond(self, **kwargs) -> InvocableResponse[str]:
233252 self .send ([response ])
234253 # Even if we do nothing, make sure we return ok
235254 return InvocableResponse (string = "OK" )
255+
256+ @post ("set_telegram_access_token" )
257+ def set_telegram_access_token (self , token : str ) -> InvocableResponse [str ]:
258+ """Set the telegram access token."""
259+ existing_token = self .get_telegram_access_token ()
260+ if existing_token :
261+ try :
262+ self .telegram_disconnect_webhook ()
263+ except BaseException as e :
264+ # Note: we don't want to fully fail here, because that would mean that a bot user that had some
265+ # other error relating to disconnecting would never be able to RE-connect to a new bot.
266+ logging .error (e )
267+
268+ kv = KeyValueStore (
269+ client = self .agent_service .client , store_identifier = self .setting_store_key ()
270+ )
271+ kv .set ("telegram_token" , {"token" : token })
272+
273+ # Now attempt to modify the connection in Telegram
274+ self .bot_token = token
275+ try :
276+ self .telegram_connect_webhook ()
277+ return InvocableResponse (string = "OK" )
278+ except Exception as e :
279+ raise SteamshipError (message = f"Could not set Telegram Webhook. Exception: { e } " )
280+
281+ def get_api_root (self ) -> Optional [str ]:
282+ """Return the API root"""
283+ bot_token = self .get_telegram_access_token ()
284+ api_base = self .config .api_base
285+
286+ # Ensure we have an API Base
287+ if not api_base :
288+ raise SteamshipError (message = "Missing Telegram API Base" )
289+
290+ # Ensure it ends in a trailing slash
291+ if api_base [- 1 ] != "/" :
292+ api_base += "/"
293+
294+ if bot_token :
295+ if ".steamship.run/" in api_base :
296+ # This is a special case for our testing pipeline -- it contains a mock Telegram server.
297+ return api_base
298+ else :
299+ return f"{ api_base } { bot_token } "
300+ else :
301+ return None
302+
303+ def setting_store_key (self ):
304+ return f"{ SETTINGS_KVSTORE_KEY } -{ self .agent_service .context .invocable_instance_handle } "
305+
306+ def get_telegram_access_token (self ) -> Optional [str ]:
307+ """Return the Telegram Access token, which permits the agent to post to Telegram."""
308+
309+ # Warning: This can't be an 'is not None' check since the config system uses an empty string to represent None
310+ if self .bot_token :
311+ return self .bot_token
312+
313+ _dynamically_set_token = None
314+ _fallback_token = None
315+
316+ # Prefer the dynamically set token if available
317+ kv = KeyValueStore (
318+ client = self .agent_service .client , store_identifier = self .setting_store_key ()
319+ )
320+ v = kv .get ("telegram_token" )
321+ if v :
322+ _dynamically_set_token = v .get ("token" , None )
323+
324+ # Fall back on the config-provided one
325+ if self .config :
326+ _fallback_token = self .config .bot_token
327+
328+ _return_token = _dynamically_set_token or _fallback_token
329+
330+ # Cache it to avoid another KV Store lookup and return
331+ self .bot_token = _return_token
332+ return _return_token
333+
334+ @post ("is_telegram_token_set" )
335+ def is_telegram_token_set (self ) -> InvocableResponse [bool ]:
336+ """Return whether the Telegram token has been set as a way for a remote UI to check status."""
337+
338+ # Warning: This can't be an 'is not None' check since the config system uses an empty string to represent None
339+ if not self .get_telegram_access_token ():
340+ return InvocableResponse (json = False )
341+ return InvocableResponse (json = True )
0 commit comments