Skip to content

Commit a74a8df

Browse files
authored
Merge pull request #1029 from hchen2020/master
Support reconnect in Twilio
2 parents c1d0eb1 + 99d3104 commit a74a8df

10 files changed

Lines changed: 131 additions & 21 deletions

File tree

src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ namespace BotSharp.Abstraction.Realtime.Models;
33
public class RealtimeHubConnection
44
{
55
public string StreamId { get; set; } = null!;
6+
public string UserSessionId {get;set;} = null!;
67
public string? LastAssistantItemId { get; set; } = null!;
78
public long LatestMediaTimestamp { get; set; }
89
public long? ResponseStartTimestamp { get; set; }

src/Infrastructure/BotSharp.Core.Realtime/Hooks/RealtimeConversationHook.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ public async Task OnFunctionExecuted(RoleDialogModel message)
5757
var instruction = await hub.Completer.UpdateSession(hub.HubConn);
5858
await hub.Completer.InsertConversationItem(message);
5959

60-
if (message.StopCompletion)
60+
if (string.IsNullOrEmpty(message.Content))
61+
{
62+
return;
63+
}
64+
else if (message.StopCompletion)
6165
{
6266
await hub.Completer.TriggerModelInference($"Say to user: \"{message.Content}\"");
6367
}

src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,14 @@ public async Task ConnectToModel(Func<string, Task>? responseToUser = null, Func
2828
convService.SetConversationId(_conn.ConversationId, []);
2929
var conversation = await convService.GetConversation(_conn.ConversationId);
3030

31-
var agentService = _services.GetRequiredService<IAgentService>();
32-
var agent = await agentService.LoadAgent(conversation.AgentId);
33-
_conn.CurrentAgentId = agent.Id;
34-
3531
var routing = _services.GetRequiredService<IRoutingService>();
36-
routing.Context.Push(agent.Id);
32+
var agentService = _services.GetRequiredService<IAgentService>();
33+
var agent = await agentService.LoadAgent(_conn.CurrentAgentId);
3734

3835
var storage = _services.GetRequiredService<IConversationStorage>();
3936
var dialogs = convService.GetDialogHistory();
40-
if (dialogs.Count == 0)
41-
{
42-
dialogs.Add(new RoleDialogModel(AgentRole.User, "Hi"));
43-
storage.Append(_conn.ConversationId, dialogs.First());
44-
}
45-
4637
routing.Context.SetDialogs(dialogs);
47-
routing.Context.SetMessageId(_conn.ConversationId, dialogs.Last().MessageId);
38+
routing.Context.SetMessageId(_conn.ConversationId, Guid.Empty.ToString());
4839

4940
var states = _services.GetRequiredService<IConversationStateService>();
5041
var settings = _services.GetRequiredService<RealtimeModelSettings>();

src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,20 @@ private async Task ReceiveMessage(RealtimeHubConnection conn,
187187
else if (response.Type == "response.done")
188188
{
189189
_logger.LogInformation($"{response.Type}: {receivedText}");
190-
var messages = await OnResponsedDone(conn, receivedText);
191-
onModelResponseDone(messages);
190+
var data = JsonSerializer.Deserialize<ResponseDone>(receivedText).Body;
191+
if (data.Status != "completed")
192+
{
193+
if (data.StatusDetails.Type == "incomplete" && data.StatusDetails.Reason == "max_output_tokens")
194+
{
195+
onInterruptionDetected();
196+
await TriggerModelInference("Response user concisely");
197+
}
198+
}
199+
else
200+
{
201+
var messages = await OnResponsedDone(conn, receivedText);
202+
onModelResponseDone(messages);
203+
}
192204
}
193205
else if (response.Type == "conversation.item.created")
194206
{
@@ -295,6 +307,8 @@ await HookEmitter.Emit<IContentGeneratingHook>(_services, async hook =>
295307

296308
await SendEventToModel(sessionUpdate);
297309

310+
await Task.Delay(300);
311+
298312
return instruction;
299313
}
300314

@@ -561,6 +575,10 @@ public async Task<List<RoleDialogModel>> OnResponsedDone(RealtimeHubConnection c
561575
if (data.Status != "completed")
562576
{
563577
_logger.LogError(data.StatusDetails.ToString());
578+
/*if (data.StatusDetails.Type == "incomplete" && data.StatusDetails.Reason == "max_output_tokens")
579+
{
580+
await TriggerModelInference("Response user concisely");
581+
}*/
564582
return [];
565583
}
566584

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using BotSharp.Plugin.Twilio.Models;
2+
using Microsoft.AspNetCore.Mvc;
3+
4+
namespace BotSharp.Plugin.Twilio.Controllers;
5+
6+
public class TwilioReconnectController : TwilioController
7+
{
8+
private readonly TwilioSetting _settings;
9+
private readonly IServiceProvider _services;
10+
private readonly ILogger _logger;
11+
12+
public TwilioReconnectController(IServiceProvider services, TwilioSetting settings, ILogger<TwilioReconnectController> logger)
13+
{
14+
_services = services;
15+
_settings = settings;
16+
_logger = logger;
17+
}
18+
19+
[ValidateRequest]
20+
[HttpPost("twilio/stream/reconnect")]
21+
public async Task<TwiMLResult> Reconnect(ConversationalVoiceRequest request)
22+
{
23+
var response = new VoiceResponse();
24+
var connect = new Connect();
25+
var host = _settings.CallbackHost.Split("://").Last();
26+
connect.Stream(url: $"wss://{host}/twilio/stream/{request.AgentId}/{request.ConversationId}");
27+
response.Pause(1);
28+
response.Append(connect);
29+
return TwiML(response);
30+
}
31+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using BotSharp.Abstraction.Routing;
2+
using Task = System.Threading.Tasks.Task;
3+
using Twilio.Rest.Api.V2010.Account;
4+
using BotSharp.Plugin.Twilio.Interfaces;
5+
6+
namespace BotSharp.Plugin.Twilio.Hooks;
7+
8+
public class TwilioConversationHook : ConversationHookBase, IConversationHook
9+
{
10+
private readonly IServiceProvider _services;
11+
private readonly TwilioSetting _setting;
12+
private readonly ILogger _logger;
13+
14+
public TwilioConversationHook(IServiceProvider services,
15+
TwilioSetting setting,
16+
ILogger<TwilioConversationHook> logger)
17+
{
18+
_services = services;
19+
_setting = setting;
20+
_logger = logger;
21+
}
22+
23+
public override async Task OnFunctionExecuted(RoleDialogModel message)
24+
{
25+
var hooks = _services.GetServices<ITwilioSessionHook>();
26+
foreach (var hook in hooks)
27+
{
28+
if (await hook.ShouldReconnect(message))
29+
{
30+
var states = _services.GetRequiredService<IConversationStateService>();
31+
var sid = states.GetState("twilio_call_sid");
32+
33+
var routing = _services.GetRequiredService<IRoutingService>();
34+
var conversationId = routing.Context.ConversationId;
35+
var processUrl = $"{_setting.CallbackHost}/twilio/stream/reconnect?agent-id={message.CurrentAgentId}&conversation-id={conversationId}";
36+
37+
// Save all states before reconnect
38+
states.Save();
39+
40+
CallResource.Update(
41+
pathSid: sid,
42+
url: new Uri(processUrl));
43+
44+
break;
45+
}
46+
}
47+
}
48+
}

src/Plugins/BotSharp.Plugin.Twilio/Interfaces/ITwilioSessionHook.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,5 +79,13 @@ Task OnAgentHangUp(ConversationalVoiceRequest request)
7979
/// <param name="response"></param>
8080
/// <returns></returns>
8181
Task OnAgentTransferring(ConversationalVoiceRequest request, TwilioSetting settings)
82-
=> Task.CompletedTask;
82+
=> Task.CompletedTask;
83+
84+
/// <summary>
85+
/// Allow Twilio to reconnect when it's in streaming mode.
86+
/// </summary>
87+
/// <param name="message"></param>
88+
/// <returns></returns>
89+
Task<bool> ShouldReconnect(RoleDialogModel message)
90+
=> Task.FromResult(false);
8391
}

src/Plugins/BotSharp.Plugin.Twilio/Services/TwilioService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ public VoiceResponse ReturnBidirectionalMediaStreamsInstructions(ConversationalV
240240

241241
var connect = new Connect();
242242
var host = _settings.CallbackHost.Split("://").Last();
243-
connect.Stream(url: $"wss://{host}/twilio/stream/{conversationId}");
243+
connect.Stream(url: $"wss://{host}/twilio/stream/{agent.Id}/{conversationId}");
244244
response.Append(connect);
245245

246246
return response;

src/Plugins/BotSharp.Plugin.Twilio/TwilioPlugin.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using BotSharp.Abstraction.Settings;
2+
using BotSharp.Plugin.Twilio.Hooks;
23
using BotSharp.Plugin.Twilio.Interfaces;
34
using BotSharp.Plugin.Twilio.OutboundPhoneCallHandler.Hooks;
45
using BotSharp.Plugin.Twilio.Services;
@@ -32,5 +33,6 @@ public void RegisterDI(IServiceCollection services, IConfiguration config)
3233
services.AddHostedService<TwilioMessageQueueService>();
3334
services.AddTwilioRequestValidation();
3435
services.AddScoped<IAgentUtilityHook, OutboundPhoneCallHandlerUtilityHook>();
36+
services.AddScoped<IConversationHook, TwilioConversationHook>();
3537
}
3638
}

src/Plugins/BotSharp.Plugin.Twilio/TwilioStreamMiddleware.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ public async Task Invoke(HttpContext httpContext)
3434
if (httpContext.WebSockets.IsWebSocketRequest)
3535
{
3636
var services = httpContext.RequestServices;
37-
var conversationId = request.Path.Value.Split("/").Last();
37+
var parts = request.Path.Value.Split("/");
38+
var agentId = parts[3];
39+
var conversationId = parts[4];
3840
using WebSocket webSocket = await httpContext.WebSockets.AcceptWebSocketAsync();
3941
try
4042
{
41-
await HandleWebSocket(services, conversationId, webSocket);
43+
await HandleWebSocket(services, agentId, conversationId, webSocket);
4244
}
4345
catch (Exception ex)
4446
{
@@ -51,12 +53,13 @@ public async Task Invoke(HttpContext httpContext)
5153
await _next(httpContext);
5254
}
5355

54-
private async Task HandleWebSocket(IServiceProvider services, string conversationId, WebSocket webSocket)
56+
private async Task HandleWebSocket(IServiceProvider services, string agentId, string conversationId, WebSocket webSocket)
5557
{
5658
var settings = services.GetRequiredService<RealtimeModelSettings>();
5759
var hub = services.GetRequiredService<IRealtimeHub>();
5860
var conn = hub.SetHubConnection(conversationId);
59-
61+
conn.CurrentAgentId = agentId;
62+
6063
// load conversation and state
6164
var convService = services.GetRequiredService<IConversationService>();
6265
convService.SetConversationId(conversationId, []);
@@ -67,6 +70,9 @@ private async Task HandleWebSocket(IServiceProvider services, string conversatio
6770
}
6871
convService.States.Save();
6972

73+
var routing = services.GetRequiredService<IRoutingService>();
74+
routing.Context.Push(agentId);
75+
7076
var buffer = new byte[1024 * 32];
7177
WebSocketReceiveResult result;
7278

@@ -136,6 +142,7 @@ await hub.ConnectToModel(async data =>
136142
case "start":
137143
eventType = "user_connected";
138144
var startResponse = JsonSerializer.Deserialize<StreamEventStartResponse>(receivedText);
145+
conn.UserSessionId = startResponse.Body.CallSid;
139146
data = JsonSerializer.Serialize(startResponse.Body.CustomParameters);
140147
conn.ResetStreamState();
141148
break;

0 commit comments

Comments
 (0)