Skip to content

Commit 6b21eeb

Browse files
committed
refactor(a365): Refactor to Worker pattern and enhance test coverage
- Refactor front-end to Worker pattern with DI support - Add log_level and notification_workflow features - Add AuthenticationRef support for telemetry token resolution - Implement thread-safe token cache with proactive refresh - Add comprehensive test coverage (105 tests, +4 new test files) - Remove duplicate test files and improve test reliability - Improve error handling across all modules - Modernize type hints and remove redundant comments Signed-off-by: afourniernv <afournier@nvidia.com>
1 parent 6435904 commit 6b21eeb

20 files changed

+1630
-1634
lines changed

packages/nvidia_nat_a365/src/nat/plugins/a365/front_end/exceptions.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
"""Backward compatibility: Import exceptions from shared module."""
1818

19-
# Import from shared exceptions module
2019
from nat.plugins.a365.exceptions import (
2120
A365AuthenticationError,
2221
A365ConfigurationError,

packages/nvidia_nat_a365/src/nat/plugins/a365/front_end/plugin.py

Lines changed: 69 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ async def run(self) -> None:
4848
5. Starts the Microsoft Agents SDK server
4949
6. Handles cleanup on shutdown
5050
"""
51-
# Import and validate Microsoft Agents SDK dependencies
5251
try:
5352
# Try importing start_server from hosting.aiohttp (most common)
5453
try:
@@ -71,107 +70,100 @@ async def run(self) -> None:
7170
original_error=e
7271
) from e
7372

74-
# Configure logging level
7573
log_level = LOG_LEVELS.get(self.front_end_config.log_level.upper(), logging.INFO)
7674
setup_logging(log_level)
7775
logger.setLevel(log_level)
7876

79-
# Initialize worker (needed for cleanup in finally block)
77+
# Initialize worker and session managers (needed for cleanup in finally block)
8078
worker = None
79+
session_manager = None
80+
notification_session_manager = None
8181

82-
# Build the workflow
83-
async with WorkflowBuilder.from_config(config=self.full_config) as builder:
84-
# Create session manager for executing workflows (default workflow)
85-
session_manager = await SessionManager.create(
86-
config=self.full_config,
87-
shared_builder=builder
88-
)
89-
90-
# Create separate session manager for notifications if configured
91-
notification_session_manager = session_manager
92-
if self.front_end_config.notification_workflow:
93-
logger.info(
94-
f"Creating separate session manager for notifications with entry_function='{self.front_end_config.notification_workflow}'"
95-
)
96-
notification_session_manager = await SessionManager.create(
82+
try:
83+
async with WorkflowBuilder.from_config(config=self.full_config) as builder:
84+
session_manager = await SessionManager.create(
9785
config=self.full_config,
98-
shared_builder=builder,
99-
entry_function=self.front_end_config.notification_workflow
86+
shared_builder=builder
10087
)
10188

102-
# Get worker instance (allows for custom workers via config)
103-
worker = self._get_worker_instance()
89+
# Create separate session manager for notifications if configured
90+
notification_session_manager = session_manager
91+
if self.front_end_config.notification_workflow:
92+
logger.info(
93+
f"Creating separate session manager for notifications with entry_function='{self.front_end_config.notification_workflow}'"
94+
)
95+
notification_session_manager = await SessionManager.create(
96+
config=self.full_config,
97+
shared_builder=builder,
98+
entry_function=self.front_end_config.notification_workflow
99+
)
104100

105-
# Create Microsoft Agents SDK application
106-
agent_app, connection_manager = await worker.create_agent_application()
101+
# Get worker instance (allows for custom workers via config)
102+
worker = self._get_worker_instance()
107103

108-
# Set up notification handlers if enabled
109-
if self.front_end_config.enable_notifications:
110-
await worker.setup_notification_handlers(
111-
agent_app=agent_app,
112-
session_manager=notification_session_manager
113-
)
104+
agent_app, connection_manager = await worker.create_agent_application()
114105

115-
# Set up message handler for regular chat messages
116-
await worker.setup_message_handler(
117-
agent_app=agent_app,
118-
session_manager=session_manager
119-
)
106+
if self.front_end_config.enable_notifications:
107+
await worker.setup_notification_handlers(
108+
agent_app=agent_app,
109+
session_manager=notification_session_manager
110+
)
120111

121-
# Set up error handler
122-
worker.setup_error_handler(agent_app)
112+
await worker.setup_message_handlers(
113+
agent_app=agent_app,
114+
session_manager=session_manager
115+
)
123116

124-
logger.info(
125-
f"Starting Microsoft Agent 365 server on "
126-
f"{self.front_end_config.host}:{self.front_end_config.port}"
127-
)
117+
worker.setup_error_handlers(agent_app)
128118

129-
# Start the server
130-
try:
131-
await start_server(
132-
agent_application=agent_app,
133-
auth_configuration=connection_manager.get_default_connection_configuration(),
134-
host=self.front_end_config.host,
135-
port=self.front_end_config.port,
119+
logger.info(
120+
f"Starting Microsoft Agent 365 server on "
121+
f"{self.front_end_config.host}:{self.front_end_config.port}"
136122
)
137-
except Exception as e:
138-
error_msg = str(e).lower()
139-
if "address already in use" in error_msg or "port" in error_msg:
140-
raise A365SDKError(
141-
f"Failed to start server: port {self.front_end_config.port} may already be in use. "
142-
f"Try a different port or stop the process using this port.",
143-
sdk_component="start_server",
144-
original_error=e
145-
) from e
146-
elif "permission" in error_msg or "access" in error_msg:
147-
raise A365SDKError(
148-
f"Failed to start server: insufficient permissions to bind to "
149-
f"{self.front_end_config.host}:{self.front_end_config.port}",
150-
sdk_component="start_server",
151-
original_error=e
152-
) from e
153-
else:
154-
raise A365SDKError(
155-
f"Failed to start server: {str(e)}",
156-
sdk_component="start_server",
157-
original_error=e
158-
) from e
159123

124+
try:
125+
await start_server(
126+
agent_application=agent_app,
127+
auth_configuration=connection_manager.get_default_connection_configuration(),
128+
host=self.front_end_config.host,
129+
port=self.front_end_config.port,
130+
)
131+
except Exception as e:
132+
error_msg = str(e).lower()
133+
if "address already in use" in error_msg or "port" in error_msg:
134+
raise A365SDKError(
135+
f"Failed to start server: port {self.front_end_config.port} may already be in use. "
136+
f"Try a different port or stop the process using this port.",
137+
sdk_component="start_server",
138+
original_error=e
139+
) from e
140+
elif "permission" in error_msg or "access" in error_msg:
141+
raise A365SDKError(
142+
f"Failed to start server: insufficient permissions to bind to "
143+
f"{self.front_end_config.host}:{self.front_end_config.port}",
144+
sdk_component="start_server",
145+
original_error=e
146+
) from e
147+
else:
148+
raise A365SDKError(
149+
f"Failed to start server: {str(e)}",
150+
sdk_component="start_server",
151+
original_error=e
152+
) from e
160153
finally:
161-
# Cleanup session managers
162-
try:
163-
await session_manager.shutdown()
164-
except Exception as e:
165-
logger.error(f"Error cleaning up default session manager: {e}")
154+
if session_manager is not None:
155+
try:
156+
await session_manager.shutdown()
157+
except Exception as e:
158+
logger.error(f"Error cleaning up default session manager: {e}")
166159

167160
# If notification session manager is different, clean it up too
168-
if notification_session_manager is not session_manager:
161+
if notification_session_manager is not None and notification_session_manager is not session_manager:
169162
try:
170163
await notification_session_manager.shutdown()
171164
except Exception as e:
172165
logger.error(f"Error cleaning up notification session manager: {e}")
173166

174-
# Cleanup worker resources
175167
if worker is not None:
176168
try:
177169
await worker.cleanup()
@@ -187,13 +179,11 @@ def _get_worker_instance(self):
187179
"""
188180
from nat.plugins.a365.front_end.worker import A365FrontEndPluginWorker
189181

190-
# Check if custom worker class is specified
191182
if self.front_end_config.runner_class:
192183
module_name, class_name = self.front_end_config.runner_class.rsplit(".", 1)
193184
module = importlib.import_module(module_name)
194185
worker_class = getattr(module, class_name)
195186
return worker_class(self.full_config)
196187

197-
# Use default worker
198188
return A365FrontEndPluginWorker(self.full_config)
199189

packages/nvidia_nat_a365/src/nat/plugins/a365/front_end/register.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,9 @@ async def a365_front_end(config: A365FrontEndConfig, full_config):
3939
Raises:
4040
ValueError: If app_password is not provided in config or A365_APP_PASSWORD environment variable
4141
"""
42-
# Load app_password from environment variable if not set in config
4342
if not config.app_password:
4443
set_secret_from_env(config, "app_password", "A365_APP_PASSWORD")
4544

46-
# Validate that app_password is set
4745
if not config.app_password:
4846
raise ValueError(
4947
"app_password must be provided in the configuration or in the environment variable `A365_APP_PASSWORD`"

0 commit comments

Comments
 (0)