|
| 1 | +import os |
| 2 | +import uuid |
| 3 | +import json |
| 4 | +import json |
| 5 | +import time |
| 6 | +import requests |
| 7 | +import jsonlines |
| 8 | +from datasets import load_dataset |
| 9 | +from sweagent.run.run_tests import run_tests |
| 10 | +from sweagent.run.run_tests import make_regression_spec |
| 11 | +from swebench.harness.run_evaluation import get_dataset_from_preds |
| 12 | +from swebench.harness.grading import get_eval_tests_report, get_logs_eval |
| 13 | +from swebench.harness.constants import (FAIL_TO_PASS, KEY_INSTANCE_ID, PASS_TO_PASS, TestStatus) |
| 14 | + |
| 15 | + |
| 16 | +MODEL_MAX_TOKENS = 100000 |
| 17 | + |
| 18 | +class LogExractAgent: |
| 19 | + """Example API caller class compatible with the code above""" |
| 20 | + def __init__(self, api_key, base_url, model_name, max_retries=3, retry_delay=2): |
| 21 | + self.api_key = api_key |
| 22 | + self.base_url = base_url |
| 23 | + self.model_name = model_name |
| 24 | + self.max_retries = max_retries |
| 25 | + self.retry_delay = retry_delay |
| 26 | + |
| 27 | + def _call_api(self, messages: list[dict], temperature: float = 0.3, top_p: float = 1.0) -> str: |
| 28 | + """Call API to get response""" |
| 29 | + payload = {'model': self.model_name, 'messages': messages, 'temperature': temperature, 'top_p': top_p} |
| 30 | + headers = {'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json'} |
| 31 | + api_url = f"{self.base_url.rstrip('/')}/chat/completions" |
| 32 | + retries = 0 |
| 33 | + while retries < self.max_retries: |
| 34 | + try: |
| 35 | + resp = requests.post(api_url, headers=headers, json=payload, timeout=120) |
| 36 | + resp.raise_for_status() |
| 37 | + result = resp.json() |
| 38 | + if 'data' in result and 'response' in result['data']: |
| 39 | + answer = result['data']['response']['choices'][0]['message']['content'] |
| 40 | + elif 'choices' in result: |
| 41 | + answer = result['choices'][0]['message']['content'] |
| 42 | + else: |
| 43 | + raise ValueError(f"Unexpected API response format: {result.keys()}") |
| 44 | + return answer |
| 45 | + except (ValueError, AttributeError, IndexError, requests.RequestException) as e: |
| 46 | + print(f"API call failed, retry {retries + 1}/{self.max_retries} (error: {str(e)})") |
| 47 | + retries += 1 |
| 48 | + if retries < self.max_retries: |
| 49 | + time.sleep(self.retry_delay) |
| 50 | + except Exception as e: |
| 51 | + print(f"Unknown error in API call: {e}") |
| 52 | + break |
| 53 | + return "" |
| 54 | + |
| 55 | +def rewrite_report(test_spec, input_folder_path, regression_tests, api_caller=None): |
| 56 | + """ |
| 57 | + Use LLM to analyze test logs and count the execution results of regression tests |
| 58 | + Args: |
| 59 | + test_spec: Test specification object |
| 60 | + input_folder_path: Input folder path |
| 61 | + regression_tests: List of regression tests |
| 62 | + api_caller: API caller object (must have _call_api method) |
| 63 | + |
| 64 | + Returns: |
| 65 | + dict: Dictionary containing success, failure, and skipped fields |
| 66 | + """ |
| 67 | + instance_id = test_spec.instance_id |
| 68 | + log_path = f"{input_folder_path}/test/{instance_id}/test_output.txt" |
| 69 | + try: |
| 70 | + with open(log_path, 'r', encoding='utf-8') as f: |
| 71 | + log_content = f.read() |
| 72 | + except FileNotFoundError: |
| 73 | + print(f"Log file not found: {log_path}") |
| 74 | + return {"success": [], "failure": [], "skipped": []} |
| 75 | + except Exception as e: |
| 76 | + print(f"Error reading log file: {e}") |
| 77 | + return {"success": [], "failure": [], "skipped": []} |
| 78 | + log_lines = log_content.split('\n') |
| 79 | + log_part = '\n'.join(log_lines[80:]) |
| 80 | + if not log_part.strip(): |
| 81 | + print(f"Log file is empty or last 1/3 is empty: {log_path}") |
| 82 | + return {"success": [], "failure": [], "skipped": []} |
| 83 | + regression_tests_str = '\n'.join(regression_tests) |
| 84 | + prompt = f"""You are a test log analysis expert. I need you to analyze the following test logs and count the execution results of regression tests. |
| 85 | +
|
| 86 | +**Regression Test List (total {len(regression_tests)} tests):** |
| 87 | +``` |
| 88 | +{regression_tests_str} |
| 89 | +``` |
| 90 | +
|
| 91 | +**Test Log:** |
| 92 | +``` |
| 93 | +{log_part} |
| 94 | +``` |
| 95 | +
|
| 96 | +**Task Requirements:** |
| 97 | +Please carefully analyze the log and determine the execution status for each test in the regression test list above: |
| 98 | +1. **success**: Test passed/succeeded (PASSED, OK, SUCCESS, etc.) |
| 99 | +2. **failure**: Test failed (FAILED, ERROR, ASSERTION ERROR, etc.) |
| 100 | +3. **skipped**: Test was skipped (SKIPPED, XFAIL, etc.) |
| 101 | +
|
| 102 | +**Output Format Requirements:** |
| 103 | +Please strictly follow the JSON format below without adding any other explanatory text: |
| 104 | +```json |
| 105 | +{{ |
| 106 | + "success": ["test_name1", "test_name2"], |
| 107 | + "failure": ["test_name3", "test_name4"], |
| 108 | + "skipped": ["test_name5"] |
| 109 | +}} |
| 110 | +``` |
| 111 | +
|
| 112 | +Notes: |
| 113 | +- Only count tests that appear in the regression test list |
| 114 | +- If a test is not found in the log, do not add it to any category |
| 115 | +- Ensure test names match exactly with those in the regression test list |
| 116 | +""" |
| 117 | + messages = [{"role": "system", "content": "You are a professional test log analysis assistant, skilled at extracting and classifying test results from test outputs."}, {"role": "user", "content": prompt}] |
| 118 | + if api_caller is None: |
| 119 | + print("Warning: api_caller not provided, cannot call LLM for analysis") |
| 120 | + return {"success": [], "failure": [], "skipped": []} |
| 121 | + try: |
| 122 | + response = api_caller._call_api(messages, temperature=0.1, top_p=0.95) |
| 123 | + if not response: |
| 124 | + print(f"LLM returned empty response: {instance_id}") |
| 125 | + return {"success": [], "failure": [], "skipped": []} |
| 126 | + response = response.strip() |
| 127 | + if '```json' in response: |
| 128 | + response = response.split('```json')[1].split('```')[0].strip() |
| 129 | + elif '```' in response: |
| 130 | + response = response.split('```')[1].split('```')[0].strip() |
| 131 | + result = json.loads(response) |
| 132 | + if not all(key in result for key in ["success", "failure", "skipped"]): |
| 133 | + print(f"LLM returned incorrect format: {result}") |
| 134 | + return {"success": [], "failure": [], "skipped": []} |
| 135 | + all_tests = set(result["success"] + result["failure"] + result["skipped"]) |
| 136 | + regression_tests_set = set(regression_tests) |
| 137 | + invalid_tests = all_tests - regression_tests_set |
| 138 | + if invalid_tests: |
| 139 | + print(f"Warning: LLM returned tests not in regression test list: {invalid_tests}") |
| 140 | + print(f"Instance {instance_id} analysis completed: " |
| 141 | + f"success={len(result['success'])}, " |
| 142 | + f"failure={len(result['failure'])}, " |
| 143 | + f"skipped={len(result['skipped'])}") |
| 144 | + return result |
| 145 | + except json.JSONDecodeError as e: |
| 146 | + print(f"Failed to parse JSON returned by LLM: {e}") |
| 147 | + print(f"LLM raw response: {response[:500]}...") |
| 148 | + return {"success": [], "failure": [], "skipped": []} |
| 149 | + except Exception as e: |
| 150 | + print(f"Error analyzing test log: {e}") |
| 151 | + return {"success": [], "failure": [], "skipped": []} |
| 152 | + |
| 153 | + |
| 154 | +def save_passing_tests(output_jsonl_path, input_folder_path, dataset): |
| 155 | + ds = load_dataset(dataset) |
| 156 | + with jsonlines.open(output_jsonl_path, mode="w") as writer: |
| 157 | + for entry in ds["test"]: |
| 158 | + instance_id = entry["instance_id"] |
| 159 | + log_path = f"{input_folder_path}/test/{instance_id}/test_output.txt" |
| 160 | + try: |
| 161 | + eval_sm, _ = get_logs_eval(log_path) |
| 162 | + except FileNotFoundError: |
| 163 | + print(f"File not found: {log_path}") |
| 164 | + continue |
| 165 | + successful_test = [] |
| 166 | + for test_name, status in eval_sm.items(): |
| 167 | + if status in [TestStatus.PASSED.value]: |
| 168 | + successful_test.append(test_name) |
| 169 | + if successful_test == []: |
| 170 | + print(f"{instance_id} didn't get any passing tests") |
| 171 | + result_entry = {"instance_id": instance_id, "tests_passing_in_original_repo": successful_test} |
| 172 | + writer.write(result_entry) |
| 173 | + |
| 174 | + |
| 175 | +def run_regression_for_each_instance(location_data, run_id): |
| 176 | + instance_ids = [location_data["original_id"]] |
| 177 | + patches = [location_data["submission"]] |
| 178 | + # print(f"Running tests for instance: {instance_id} with patch: {patch}") |
| 179 | + result = run_tests( |
| 180 | + location_data=[location_data], |
| 181 | + instance_ids=instance_ids, |
| 182 | + model_patches=patches, |
| 183 | + max_workers=1, |
| 184 | + run_id=run_id, |
| 185 | + timeout=3000, |
| 186 | + apply_model_patch=True, |
| 187 | + dataset_name="princeton-nlp/SWE-bench_Verified" |
| 188 | + ) |
| 189 | + return result |
| 190 | + |
| 191 | +def check_if_all_instances_pass(instance_to_plausible): |
| 192 | + all_passed = True |
| 193 | + not_passing_instances = [] |
| 194 | + for key, value in instance_to_plausible.items(): |
| 195 | + if not value: |
| 196 | + all_passed = False |
| 197 | + not_passing_instances.append(key) |
| 198 | + if all_passed: |
| 199 | + print("All the chosen regression tests pass in the base repository") |
| 200 | + else: |
| 201 | + print(f"One or more of the regression tests for instances {not_passing_instances} do not pass in the original repository") |
| 202 | + print(len(not_passing_instances)) |
| 203 | + |
| 204 | +def _run_regression(data, llm_config): |
| 205 | + instance_test_dict = {} |
| 206 | + instance_id = data["original_id"] |
| 207 | + test = data["tests_passing_in_original_repo"] |
| 208 | + instance_test_dict[instance_id] = test |
| 209 | + instance_ids = [data["original_id"]] |
| 210 | + patches = [data["submission"]] |
| 211 | + predictions = {} |
| 212 | + for idx, one_instance_id in enumerate(instance_ids): |
| 213 | + predictions[one_instance_id] = {"model_name_or_path": "test", "model_patch": patches[idx], "instance_id": one_instance_id} |
| 214 | + run_id = f"run_{instance_id}_{uuid.uuid4().hex}" |
| 215 | + instances = get_dataset_from_preds("princeton-nlp/SWE-bench_Verified", "test", instance_ids, predictions, run_id, False, True) |
| 216 | + no_f2p_instances = [] |
| 217 | + for instance in instances: |
| 218 | + revised_instance = instance.copy() |
| 219 | + revised_instance["FAIL_TO_PASS"] = "[]" |
| 220 | + revised_instance["PASS_TO_PASS"] = instance_test_dict.get(instance["instance_id"], "[]") |
| 221 | + no_f2p_instances.append(revised_instance) |
| 222 | + test_specs = list(map(make_regression_spec, no_f2p_instances)) |
| 223 | + test_spec_dict = {spec.instance_id: spec for spec in test_specs} |
| 224 | + run_regression_for_each_instance(data, run_id) |
| 225 | + api_caller = LogExractAgent(api_key=llm_config["api_key"], base_url=llm_config["api_base"], model_name=llm_config["model_name"]) |
| 226 | + regression_dict = {} |
| 227 | + instance_id = data["original_id"] |
| 228 | + if os.path.isfile(f"logs/run_evaluation/{run_id}/test/{instance_id}/report.json"): |
| 229 | + test_spec = test_spec_dict.get(instance_id) |
| 230 | + if test_spec: |
| 231 | + regression_dict[instance_id] = rewrite_report(test_spec, f"logs/run_evaluation/{run_id}", instance_test_dict[instance_id], api_caller=api_caller) |
| 232 | + else: |
| 233 | + regression_dict[instance_id] = {"success": [], "failure": [], "skipped": []} |
| 234 | + else: |
| 235 | + regression_dict[instance_id] = {"success": [], "failure": [], "skipped": []} |
| 236 | + print(f"regression_dict:\n{regression_dict}") |
| 237 | + return regression_dict |
0 commit comments