-
Notifications
You must be signed in to change notification settings - Fork 258
Expand file tree
/
Copy pathevaluation.py
More file actions
89 lines (73 loc) · 2.63 KB
/
evaluation.py
File metadata and controls
89 lines (73 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from threading import Lock
from typing import Optional
from .containerized_eval import eval_string_script
# Get working directory
WORKING_DIR = Path(__file__).parent.parent
# program: str => Result
CACHE = dict()
CACHE_LOCK = Lock()
def cache_get(program: str) -> Optional[dict]:
if program in CACHE:
result = CACHE[program]
return result
else:
return None
def cache_set(program: str, result: dict):
if program in CACHE:
print("Setting already-existing cache")
CACHE[program] = result
def cached_eval_script(problem, index) -> dict:
# here prompt is already included in completions
program = problem["completions"][index] + "\n" + problem["tests"]
CACHE_LOCK.acquire(True)
cached = cache_get(program)
if cached is not None:
CACHE_LOCK.release()
return cached
else:
result_yaml = dict()
cache_set(program, result_yaml)
CACHE_LOCK.release()
result_dict = eval_string_script(problem["language"], program)
for k in result_dict.keys():
result_yaml[k] = result_dict[k]
result_yaml["timestamp"] = int(time.time())
return result_yaml
def get_test_results_json_path(
output_dir: str, problem_json_path: str, input_dir: Path
) -> Path:
suffixes = ".results.json"
problem_name = problem_json_path[: -len(".json")]
if input_dir:
raise ValueError("input dir given")
return Path(output_dir) / (
problem_json_path.relative_to(input_dir).parent / (problem_name + suffixes)
)
return Path(output_dir) / (problem_name + suffixes)
def evaluate_problem(
output_dir: str, problem_json_path: str, max_workers: int, input_dir: Path = None
):
with open(problem_json_path, "r") as f:
problem = json.load(f)
test_results_path = get_test_results_json_path(
output_dir, problem_json_path, input_dir
)
test_results_path.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
test_results = problem.copy()
del test_results["completions"]
test_results["results"] = []
num_problems = len(problem["completions"])
min_problem = len(test_results["results"])
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for j in executor.map(
lambda index: cached_eval_script(problem, index),
range(min_problem, num_problems),
):
test_results["results"].append(j)
with open(test_results_path, "w") as f:
f.write(json.dumps(test_results, indent=2))