11#!/usr/bin/env python3
22"""Run end-to-end agent scenarios for CI proof.
33
4- This script is intended for CI environments where you *want proof* that the
5- Copilot SDK can run real scenarios (network + auth required).
6-
7- It runs a small, deterministic set of scenarios and prints a short transcript.
4+ This script runs all sample files as E2E scenarios to prove they work with the
5+ Copilot SDK in CI environments (network + auth required).
86
97Modes
108-----
11- - Copilot mode (default): uses your Copilot CLI auth.
9+ - Copilot mode (default): uses your Copilot CLI auth or COPILOT_GITHUB_TOKEN .
1210- OpenAI mode: uses BYOK provider config with OPENAI_API_KEY.
1311
1412Notes
1513-----
1614- These are E2E checks, not unit tests.
17- - Keep prompts short to reduce cost/latency.
15+ - Each sample is run with appropriate test inputs.
16+ - Interactive samples are skipped in CI.
1817"""
1918
2019from __future__ import annotations
2120
2221import argparse
2322import asyncio
23+ import importlib .util
2424import os
2525import sys
26+ import tempfile
2627from dataclasses import dataclass
28+ from pathlib import Path
2729
2830from copilot import CopilotClient
2931
@@ -50,69 +52,158 @@ def _provider_config(provider: str) -> dict | None:
5052 raise ValueError (f"Unknown provider: { provider } " )
5153
5254
53- async def scenario_ping (client : CopilotClient ) -> ScenarioResult :
54- try :
55- pong = await client .ping ("ci" )
56- return ScenarioResult ("ping" , True , f"protocol={ pong .protocolVersion } " )
57- except Exception as e :
58- return ScenarioResult ("ping" , False , str (e ))
59-
60-
61- async def scenario_single_prompt (client : CopilotClient , * , model : str , provider_cfg : dict | None ) -> ScenarioResult :
55+ async def run_sample_module (sample_path : Path , test_inputs : dict | None = None ) -> ScenarioResult :
56+ """Import and run a sample module's main() function."""
57+ name = sample_path .stem
58+
6259 try :
63- cfg : dict = {"model" : model }
64- if provider_cfg is not None :
65- cfg ["provider" ] = provider_cfg
66- session = await client .create_session (cfg )
60+ # Load module
61+ spec = importlib .util .spec_from_file_location (name , sample_path )
62+ if not spec or not spec .loader :
63+ return ScenarioResult (name , False , "Failed to load module" )
64+
65+ module = importlib .util .module_from_spec (spec )
66+
67+ # Inject test inputs via sys.argv if needed
68+ original_argv = sys .argv .copy ()
69+ original_stdin = sys .stdin
70+ original_stdout = sys .stdout
71+ original_stderr = sys .stderr
72+
6773 try :
68- timeout_s = float (os .getenv ("COPILOT_E2E_TIMEOUT" , "60" ))
69- resp = await asyncio .wait_for (
70- session .send_and_wait ({
71- "prompt" : "Reply with exactly: OK" ,
72- }),
73- timeout = timeout_s ,
74- )
75- text = (resp .data .content or "" ).strip ()
76- ok = text .startswith ("OK" )
77- return ScenarioResult ("single_prompt" , ok , f"response={ text [:80 ]!r} " )
74+ # Set clean argv for the sample
75+ if test_inputs and "argv" in test_inputs :
76+ sys .argv = [str (sample_path )] + test_inputs ["argv" ]
77+ else :
78+ sys .argv = [str (sample_path )]
79+
80+ # Redirect output to suppress sample output
81+ import io
82+ sys .stdout = io .StringIO ()
83+ sys .stderr = io .StringIO ()
84+ sys .stdin = io .StringIO () # Prevent waiting for input
85+
86+ spec .loader .exec_module (module )
87+
88+ # Run main() if it exists
89+ if hasattr (module , "main" ):
90+ timeout_s = float (os .getenv ("COPILOT_E2E_TIMEOUT" , "45" ))
91+ await asyncio .wait_for (module .main (), timeout = timeout_s )
92+ return ScenarioResult (name , True , "OK" )
93+ else :
94+ return ScenarioResult (name , False , "No main() function found" )
7895 finally :
79- await session .destroy ()
96+ sys .argv = original_argv
97+ sys .stdin = original_stdin
98+ sys .stdout = original_stdout
99+ sys .stderr = original_stderr
100+
101+ except asyncio .TimeoutError :
102+ return ScenarioResult (name , False , "Timeout" )
103+ except (KeyboardInterrupt , EOFError ):
104+ return ScenarioResult (name , False , "Interrupted/EOF" )
105+ except SystemExit as e :
106+ # Some samples use sys.exit() for usage errors
107+ if e .code == 0 :
108+ return ScenarioResult (name , True , "OK" )
109+ return ScenarioResult (name , False , f"Exit code { e .code } " )
80110 except Exception as e :
81- return ScenarioResult ("single_prompt" , False , str (e ))
111+ error_msg = str (e )[:80 ]
112+ return ScenarioResult (name , False , error_msg )
82113
83114
84115async def run (provider : str , model : str ) -> int :
85- provider_cfg = _provider_config (provider )
86-
87- client_opts : dict = {}
88- # For unattended CI runs with the Copilot provider, prefer a token-based auth path.
89- # The Copilot SDK client supports `github_token` for non-interactive authentication.
90- github_token = os .getenv ("COPILOT_GITHUB_TOKEN" )
91- if provider == "copilot" and github_token :
92- client_opts ["github_token" ] = github_token
93-
94- client = CopilotClient (client_opts or None )
95- await client .start ()
96- try :
116+ """Run all sample scenarios."""
117+ print ("=" * 80 )
118+ print ("E2E Agent Scenarios - Comprehensive Sample Suite" )
119+ print (f" Provider: { provider } " )
120+ print (f" Model: { model } " )
121+ print ("=" * 80 )
122+ print ()
123+
124+ # Find all sample files
125+ samples_dir = Path (__file__ ).parent .parent / "samples"
126+ sample_files = sorted (samples_dir .glob ("*.py" ))
127+
128+ # Prepare test inputs for samples that need them
129+ test_inputs = {}
130+
131+ # Create temporary demo files for samples that need file inputs
132+ with tempfile .TemporaryDirectory () as tmpdir :
133+ tmppath = Path (tmpdir )
134+
135+ # Demo OpenAPI spec for api_test_generator
136+ demo_spec = tmppath / "demo_api.json"
137+ demo_spec .write_text ('{"openapi":"3.0.0","paths":{"/users":{"get":{}}}}' )
138+ test_inputs ["api_test_generator" ] = {"argv" : [str (demo_spec )]}
139+
140+ # Demo log file for log_analyzer
141+ demo_log = tmppath / "demo.log"
142+ demo_log .write_text ("2026-02-08 INFO Application started\n 2026-02-08 ERROR Connection failed\n " )
143+ test_inputs ["log_analyzer" ] = {"argv" : [str (demo_log )]}
144+
145+ # Demo file for file_summarizer
146+ demo_file = tmppath / "demo.txt"
147+ demo_file .write_text ("This is a demo file for testing the file summarizer." )
148+ test_inputs ["file_summarizer" ] = {"argv" : [str (demo_file )]}
149+
150+ # Demo code file for code_reviewer
151+ demo_code = tmppath / "demo.py"
152+ demo_code .write_text ("def greet(): return 'hello'" )
153+ test_inputs ["code_reviewer" ] = {"argv" : [str (demo_code )]}
154+
155+ # Test data generator needs schema + count
156+ test_inputs ["test_data_generator" ] = {"argv" : ["user" , "5" , "json" ]}
157+
158+ # Skip samples that can't run in CI
159+ skip_samples = {
160+ "interactive_chat" , # Requires stdin
161+ "multi_turn_agent" , # Interactive prompts
162+ "playwright_agent" , # Requires browser setup + URL arg
163+ }
164+
97165 results : list [ScenarioResult ] = []
98- results .append (await scenario_ping (client ))
99- results .append (await scenario_single_prompt (client , model = model , provider_cfg = provider_cfg ))
100-
101- print ("Agent scenarios" )
102- print (f" provider: { provider } " )
103- print (f" model : { model } " )
104- print ()
105-
106- failed = 0
107- for r in results :
108- status = "PASS" if r .ok else "FAIL"
109- print (f"- { r .name :14} { status } { r .details } " )
110- if not r .ok :
111- failed += 1
112-
113- return 0 if failed == 0 else 1
114- finally :
115- await client .stop ()
166+
167+ for sample_file in sample_files :
168+ if sample_file .stem .startswith ("_" ):
169+ continue # Skip private/demo files
170+
171+ if sample_file .stem in skip_samples :
172+ results .append (ScenarioResult (
173+ sample_file .stem ,
174+ True , # Mark as pass but skipped
175+ "SKIP - interactive/requires manual setup"
176+ ))
177+ continue
178+
179+ print (f"Running { sample_file .stem } ..." , end = " " , flush = True )
180+ inputs = test_inputs .get (sample_file .stem )
181+ result = await run_sample_module (sample_file , inputs )
182+ results .append (result )
183+
184+ status = "PASS" if result .ok else "FAIL"
185+ print (status )
186+ print ("RESULTS" )
187+ print ("=" * 80 )
188+ print ()
189+
190+ passed = 0
191+ failed = 0
192+ for r in results :
193+ status = "PASS" if r .ok else "FAIL"
194+ marker = "+" if r .ok else "!"
195+ print (f"{ marker } { r .name :25} { status :6} { r .details } " )
196+ if r .ok :
197+ passed += 1
198+ else :
199+ failed += 1
200+
201+ print ()
202+ print ("=" * 80 )
203+ print (f"Summary: { passed } passed, { failed } failed out of { len (results )} scenarios" )
204+ print ("=" * 80 )
205+
206+ return 0 if failed == 0 else 1
116207
117208
118209def main () -> int :
0 commit comments