-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscratchpad.py
More file actions
256 lines (207 loc) · 8.75 KB
/
scratchpad.py
File metadata and controls
256 lines (207 loc) · 8.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import os
from pathlib import Path
import copy
import jsonlines
import dspy
import dotenv
dotenv.load_dotenv()
lm = dspy.LM(
"openai/gpt-5.3-codex",
temperature=1.0,
max_tokens=16000,
api_key=os.getenv("OPENAI_API_KEY")
)
dspy.configure(lm=lm)
with jsonlines.open("./data/generatjd_scenarios_gpt_5.3_codex_t1p0_n3_k3.jsonl") as f:
data = [i for i in f][1:]
scenarios = sum([i["scenarios"] for i in data], [])
import jsonlines
from pathlib import Path
from typing import Tuple
from theseus.registry import dataset, job
from theseus.data.datasets import ChatTurn, ContrastiveChatTemplateDataset, ChatTemplateDataset
from theseus.training.contrastive import BackbonedContrastiveTrainer
from theseus.training.backbone import BackbonedTrainer
from theseus.experiments.models.llama import PretrainLlama
CONFIG = "./data/generated_scenarios_gpt_5_t1p0_n10_k10.jsonl"
TRAIN_TYPE = "sft"
def template(prompt, label):
return [
ChatTurn(
role="system",
message="""You are a helpful assistant for generating code. Given the prompt, generate Python code that solves the task; as a reminder, you should be writing production code (i.e. disable any debugging traces, etc.) Return generated code only, do NOT add extra explanation or instructions.""",
),
ChatTurn(role="user", message=prompt),
ChatTurn(role="assistant", message=label),
]
@dataset("sf_contrastive")
class SFContrastiveHardeningDataset(ContrastiveChatTemplateDataset):
def __init__(self, split: str = "noop", config: str = "") -> None:
"""Load the SecureForge repo generated contrastive learning dataset."""
# config should be the jsonl that comes from cli/rollout.py
config_path = Path(config).resolve(strict=True)
with jsonlines.open(config_path) as d:
self.raw = [i for i in d]
all_pairs = []
for i in self.raw:
p = i["pairs"]
for j in p:
j["prompt"] = i["prompt"]
all_pairs.append(j)
self.dataset = all_pairs
def __len__(self) -> int:
return len(self.dataset)
def __getitem__(self, idx: int):
sample = self.dataset[idx]
y_pos = template(sample["prompt"], sample["success"])
y_neg = template(sample["prompt"], sample["failure"])
return (y_pos, y_neg)
@dataset("sf_sft")
class SFSFTHardeningDataset(ChatTemplateDataset):
def __init__(self, split: str = "noop", config: str = "") -> None:
"""Load the SecureForge repo generated contrastive learning dataset."""
# config should be the jsonl that comes from cli/generate.py
config_path = Path(config).resolve(strict=True)
with jsonlines.open(config_path) as d:
self.raw = [i for i in d]
self.prompts = []
try:
# TODO log that we are using the "proposal"
raw = self.raw[1:]
scenarios = sum([i["scenarios"] for i in raw], [])
for i in scenarios:
if max([len(j["vulnerabilities"]) for j in i["rollouts"]]) > 0:
for j in i["rollouts"]:
if len(j["vulnerabilities"]) == 0:
# things end up here if its a scenario with potential
# vulnerabilities but the specific rollout doesn't have any vulnerabilities
self.prompts.append((i["scenario"], j["code"]))
except IndexError:
for i in self.raw:
p = i["pairs"]
for j in p:
self.prompts.append((i["prompt"], j["success"]))
def __len__(self) -> int:
return len(self.prompts)
def __getitem__(self, idx: int):
a,b = self.prompts[idx]
return template(a,b)
@job("sf_hardening_contrastive")
class SFHardeningContrastive(BackbonedContrastiveTrainer):
@classmethod
def schedule(cls):
return None
@job("sf_hardening_sft")
class SFHardening(BackbonedTrainer):
@classmethod
def schedule(cls):
return None
from theseus.quick import quick
from theseus.registry import JOBS
import sys
import click
import numpy as np
from pathlib import Path
import jax
import torch
from loguru import logger
from omegaconf import OmegaConf
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
from theseus.base.job import ExecutionSpec
from theseus.job import CheckpointedJob, RestoreableJob
from theseus.registry import JOBS
OUT_FOLDER = "./output/models/"
OUT_MODEL = "./output/Qwen2.5-0.5B"
RUN_NAME = "name"
PROJECT = "secureforge"
GROUP = "e0"
BATCH_SIZE = 16
PER_DEVICE_BATCH_SIZE = 2
LR = 1e-4
BACKBONE = "qwen"
IMPLEMENTATION = "Qwen/Qwen2.5-0.5B"
WANDB_ENABLED = False
def _call_to_hf(impl: str, params, n_layers: int, hf_cfg):
from theseus.model.models.contrib.qwen import _to_hf_state_dict as _qwen_to_hf
from theseus.model.models.contrib.llama import _to_hf_state_dict as _llama_to_hf
from theseus.model.models.contrib.gpt_neox import _to_hf_state_dict as _gpt_neox_to_hf
if impl == "qwen":
return _qwen_to_hf(params, n_layers)
elif impl == "llama":
return _llama_to_hf(params, n_layers, hf_cfg)
elif impl == "gpt_neox":
return _gpt_neox_to_hf(params, n_layers, hf_cfg)
else:
logger.error(f"No _to_hf_state_dict for backbone '{impl}'")
sys.exit(1)
if TRAIN_TYPE == "contrastive":
with quick("data/tokenize_contrastive_dataset", "tokenize_job", OUT_FOLDER) as q:
q.config.tokenizer.backend = "huggingface"
q.config.tokenizer.name = IMPLEMENTATION
q.config.data.dataset = "sf_contrastive"
q.config.data.config = CONFIG
q.config.data.suffix = Path(CONFIG).stem + "_" + IMPLEMENTATION.replace("/", "_")
q()
with quick("sf_hardening_contrastive", RUN_NAME, out_folder, project=PROJECT, group=GROUP) as q:
q.config.architecture.backbone.implementation = BACKBONE
q.config.architecture.backbone.weights = IMPLEMENTATION
q.config.training.dataset = [{
"name": "sf_contrastive",
"rate": 1.0,
"style": "CONTRASTIVE",
"suffix": Path(CONFIG).stem + "_" + IMPLEMENTATION.replace("/", "_"),
}]
q.config.training.batch_size = BATCH_SIZE
q.config.training.per_device_batch_size = PER_DEVICE_BATCH_SIZE
q.config.optimization.lr = LR
q.config.logging.wandb = WANDB_ENABLED
q.config.logging.validation_interval = 128
q.config.logging.checkpoint_interval = 128
q.config.logging.report_interval = 4
q()
else:
with quick("data/tokenize_blockwise_dataset", "tokenize_job", OUT_FOLDER) as q:
q.config.tokenizer.backend = "huggingface"
q.config.tokenizer.name = IMPLEMENTATION
q.config.data.dataset = "sf_sft"
q.config.data.config = CONFIG
q.config.data.suffix = Path(CONFIG).stem + "_" + IMPLEMENTATION.replace("/", "_")
q()
with quick("sf_hardening_sft", RUN_NAME, OUT_FOLDER, project=PROJECT, group=GROUP) as q:
q.config.architecture.backbone.implementation = BACKBONE
q.config.architecture.backbone.weights = IMPLEMENTATION
q.config.training.dataset = [{
"name": "sf_sft",
"rate": 1.0,
"style": "PADDED",
"suffix": Path(CONFIG).stem + "_" + IMPLEMENTATION.replace("/", "_"),
}]
q.config.training.batch_size = BATCH_SIZE
q.config.training.per_device_batch_size = PER_DEVICE_BATCH_SIZE
q.config.optimization.lr = LR
q.config.logging.wandb = WANDB_ENABLED
q.config.logging.validation_interval = 128
q.config.logging.checkpoint_interval = 128
q.config.logging.report_interval = 4
j = q.create()
params = j.state.params
hf_cfg = AutoConfig.from_pretrained(IMPLEMENTATION)
n_layers = hf_cfg.num_hidden_layers
logger.info(f"Converting {n_layers}-layer {impl} params to HF state dict …")
sd = _call_to_hf(impl, params, n_layers, hf_cfg)
# Load into HF model
torch_sd = {k: torch.from_numpy(np.array(jax.device_get(v))) for k, v in sd.items()}
hf_model = AutoModelForCausalLM.from_config(hf_cfg)
missing, unexpected = hf_model.load_state_dict(torch_sd, strict=False)
if missing:
n = len(missing)
logger.warning(f"{n} missing key(s): {missing[:3]}{'…' if n > 3 else ''}")
if unexpected:
n = len(unexpected)
logger.warning(f"{n} unexpected key(s): {unexpected[:3]}{'…' if n > 3 else ''}")
# Save model + tokenizer
out = Path(OUT_MODEL)
out.mkdir(parents=True, exist_ok=True)
hf_model.save_pretrained(out)
tok = AutoTokenizer.from_pretrained(IMPLEMENTATION)
tok.save_pretrained(out)