Skip to content

Commit f25ed6f

Browse files
committed
For simulation
1 parent 5953c84 commit f25ed6f

11 files changed

Lines changed: 1293 additions & 90 deletions

File tree

experiments/k8s.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import os
2+
import json
3+
from itertools import product
4+
import subprocess
5+
import time
6+
7+
# ==== CONFIGURATION ====
8+
9+
10+
11+
algos = ["S3FIFO", "ARC"]
12+
params_list = [
13+
{"fifo_size_ratio": 0.1, "move_to_main_threshold": 2},
14+
{"fifo_size_ratio": 0.2, "move_to_main_threshold": 3},
15+
]
16+
image = "myrepo/cachesim:latest" # Replace with your image
17+
volume_path = "/mnt/moofs"
18+
19+
# ==== GENERATE JOB YAML ====
20+
os.makedirs("jobs", exist_ok=True)
21+
job_template = """
22+
apiVersion: batch/v1
23+
kind: Job
24+
metadata:
25+
name: simulate-{job_id}
26+
spec:
27+
template:
28+
spec:
29+
containers:
30+
- name: simulator
31+
image: {image}
32+
command:
33+
- "python"
34+
- "simulate.py"
35+
- "--trace"
36+
- "{trace}"
37+
- "--algo"
38+
- "{algo}"
39+
- "--params"
40+
- '{params}'
41+
volumeMounts:
42+
- name: moofs
43+
mountPath: /mnt/moofs
44+
restartPolicy: Never
45+
volumes:
46+
- name: moofs
47+
hostPath:
48+
path: {volume_path}
49+
type: Directory
50+
backoffLimit: 1
51+
"""
52+
53+
job_map = [] # List of (job_id, job_name)
54+
job_id = 0
55+
for trace, algo, params in product(traces, algos, params_list):
56+
job_name = f"simulate-{job_id}"
57+
yaml = job_template.format(
58+
job_id=job_id,
59+
trace=trace,
60+
algo=algo,
61+
params=json.dumps(params),
62+
image=image,
63+
volume_path=volume_path,
64+
)
65+
with open(f"jobs/job-{job_id}.yaml", "w") as f:
66+
f.write(yaml)
67+
job_map.append((job_id, job_name))
68+
job_id += 1
69+
70+
# ==== APPLY JOBS ====
71+
for job_id, job_name in job_map:
72+
subprocess.run(["kubectl", "apply", "-f", f"jobs/job-{job_id}.yaml"])
73+
74+
# ==== WAIT FOR COMPLETION ====
75+
def all_jobs_done():
76+
result = subprocess.run(["kubectl", "get", "jobs", "-o", "json"], capture_output=True, text=True)
77+
jobs = json.loads(result.stdout)["items"]
78+
completed = sum(1 for job in jobs if job["status"].get("succeeded") == 1)
79+
return completed == len(job_map)
80+
81+
print("Waiting for jobs to complete...")
82+
while not all_jobs_done():
83+
time.sleep(5)
84+
85+
# ==== GATHER LOGS ====
86+
results = []
87+
for job_id, job_name in job_map:
88+
pod_name = subprocess.run(
89+
["kubectl", "get", "pods", "--selector=job-name=" + job_name, "-o", "jsonpath={.items[0].metadata.name}"],
90+
capture_output=True, text=True).stdout.strip()
91+
92+
log = subprocess.run(["kubectl", "logs", pod_name], capture_output=True, text=True).stdout.strip()
93+
print(f"[{job_name}] {log}")
94+
if "Miss ratio:" in log:
95+
try:
96+
ratio = float(log.strip().split()[-1])
97+
results.append((job_name, ratio))
98+
except:
99+
results.append((job_name, None))
100+
101+
print("\nAll gathered results:")
102+
for job_name, ratio in results:
103+
print(f"{job_name}: {ratio}")

experiments/simulate.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from libcachesim import open_trace, TraceType, get_trace_file_path, get_trace_file_lists
2+
from libcachesim.eviction import S3FIFO
3+
import pandas as pd
4+
import multiprocessing as mp
5+
from functools import partial
6+
7+
8+
def simulate(eviction_algo, trace_file_name: str, cache_size_ratio: float, ignore_obj_size: bool, eviction_algo_params: dict) -> None:
9+
# get the trace file path
10+
trace_file_path = get_trace_file_path(trace_file_name)
11+
# open the trace file
12+
reader = open_trace(trace_file_path, type=TraceType.ORACLE_GENERAL_TRACE,
13+
ignore_obj_size=ignore_obj_size)
14+
# create a cache with the eviction policy
15+
cache = eviction_algo(cache_size=int(reader.get_wss(ignore_obj_size=ignore_obj_size)*cache_size_ratio), **eviction_algo_params)
16+
# process the trace
17+
miss_ratio = cache.process_trace(reader)
18+
# return the miss ratio
19+
return miss_ratio
20+
21+
22+
def simulate_single_trace(args):
23+
"""单个trace文件的模拟函数,用于多进程"""
24+
trace_file_name, eviction_algo, cache_size_ratio, ignore_obj_size, eviction_algo_params = args
25+
try:
26+
miss_ratio = simulate(eviction_algo, trace_file_name, cache_size_ratio, ignore_obj_size, eviction_algo_params)
27+
return {"trace_file_name": trace_file_name, "miss_ratio": miss_ratio, "status": "success"}
28+
except Exception as e:
29+
return {"trace_file_name": trace_file_name, "miss_ratio": None, "status": f"error: {str(e)}"}
30+
31+
32+
def main():
33+
ignore_obj_size = True
34+
cache_size_ratio = 0.01
35+
eviction_algo = S3FIFO
36+
eviction_algo_params = {
37+
"fifo_size_ratio": 0.1,
38+
"move_to_main_threshold": 2,
39+
}
40+
41+
# get the trace file path
42+
files = get_trace_file_lists("msr")
43+
miss_ratios = []
44+
for file in files:
45+
trace_file_path = get_trace_file_path(file)
46+
# open the trace file
47+
reader = open_trace(trace_file_path, type=TraceType.ORACLE_GENERAL_TRACE, ignore_obj_size=ignore_obj_size)
48+
# create a cache with the eviction policy
49+
cache = eviction_algo(cache_size=int(reader.get_wss(ignore_obj_size=ignore_obj_size)*cache_size_ratio), **eviction_algo_params)
50+
# process the trace
51+
miss_ratio = cache.process_trace(reader)
52+
miss_ratios.append(miss_ratio)
53+
54+
print(f"Miss ratio: {miss_ratios}")
55+
56+
57+
def simulate_all_traces_parallel(num_processes=None):
58+
"""使用多进程并行处理所有trace文件"""
59+
if num_processes is None:
60+
num_processes = mp.cpu_count()
61+
62+
print(f"使用 {num_processes} 个进程进行并行处理...")
63+
64+
# 读取trace文件列表
65+
df = pd.read_csv("trace_set_files.csv")
66+
67+
# 准备参数
68+
eviction_algo = S3FIFO
69+
cache_size_ratio = 0.01
70+
ignore_obj_size = True
71+
eviction_algo_params = {"fifo_size_ratio": 0.1, "move_to_main_threshold": 2}
72+
73+
# 准备任务参数
74+
tasks = []
75+
for index, row in df.iterrows():
76+
trace_file_name = str(row["trace_file_name"])
77+
task_args = (trace_file_name, eviction_algo, cache_size_ratio, ignore_obj_size, eviction_algo_params)
78+
tasks.append(task_args)
79+
80+
print(f"总共需要处理 {len(tasks)} 个trace文件")
81+
82+
# 使用多进程池处理
83+
results = []
84+
with mp.Pool(processes=num_processes) as pool:
85+
# 使用imap来显示进度
86+
for i, result in enumerate(pool.imap(simulate_single_trace, tasks)):
87+
results.append(result)
88+
if (i + 1) % 10 == 0:
89+
print(f"已处理 {i + 1}/{len(tasks)} 个文件")
90+
91+
# 处理结果
92+
successful_results = [r for r in results if r["status"] == "success"]
93+
failed_results = [r for r in results if r["status"] != "success"]
94+
95+
print(f"\n处理完成!")
96+
print(f"成功处理: {len(successful_results)} 个文件")
97+
print(f"失败处理: {len(failed_results)} 个文件")
98+
99+
if failed_results:
100+
print("\n失败的文件:")
101+
for result in failed_results:
102+
print(f" {result['trace_file_name']}: {result['status']}")
103+
104+
# 提取miss ratios
105+
miss_ratios = [r["miss_ratio"] for r in successful_results]
106+
print(f"\nMiss ratios: {miss_ratios}")
107+
108+
# 保存结果到CSV
109+
results_df = pd.DataFrame(results)
110+
results_df.to_csv("simulation_results.csv", index=False)
111+
print("结果已保存到 simulation_results.csv")
112+
113+
return results
114+
115+
116+
def simulate_all_traces():
117+
"""原始的单进程版本"""
118+
df = pd.read_csv("trace_set_files.csv")
119+
miss_ratios = []
120+
for index, row in df.iterrows():
121+
trace_file_name = str(row["trace_file_name"])
122+
miss_ratio = simulate(S3FIFO, trace_file_name, 0.01, True, {"fifo_size_ratio": 0.1, "move_to_main_threshold": 2})
123+
miss_ratios.append(miss_ratio)
124+
print(f"Miss ratio: {miss_ratios}")
125+
126+
127+
if __name__ == "__main__":
128+
# 使用多进程版本
129+
simulate_all_traces_parallel()

experiments/trace_sel.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""
2+
Trace selection
3+
"""
4+
5+
from libcachesim import get_trace_file_lists
6+
import pandas as pd
7+
8+
9+
trace_set_names = ["alibabaBlock", "cloudphysics", "metaCDN", "metaKV", "metaStorage", "msr", "tencentBlock", "tencentPhoto", "twitter", "wiki", "cdn1", "cdn2"] # fiu and systor are not available
10+
11+
12+
def select_traces():
13+
df = pd.DataFrame(columns=["trace_set_name", "trace_file_name"])
14+
trace_set_files = []
15+
for trace_set_name in trace_set_names:
16+
try:
17+
trace_set_files.append(get_trace_file_lists(trace_set_name))
18+
current_trace_set_files = trace_set_files[-1]
19+
current_trace_set_files = [trace_file_name for trace_file_name in current_trace_set_files if ("1K" not in trace_file_name and "10K" not in trace_file_name and "100K" not in trace_file_name)]
20+
for trace_file_name in current_trace_set_files[:200]:
21+
new_row_df = pd.DataFrame([{"trace_set_name": trace_set_name, "trace_file_name": trace_file_name}])
22+
df = pd.concat([df, new_row_df], ignore_index=True)
23+
print(f"Trace set {trace_set_name} has {len(current_trace_set_files)} traces, selected {len(current_trace_set_files[:200])} traces")
24+
except FileNotFoundError:
25+
print(f"Trace set {trace_set_name} not found")
26+
continue
27+
df.to_csv("trace_set_files.csv", index=False)
28+
return trace_set_files
29+
30+
31+
if __name__ == "__main__":
32+
select_traces()

libCacheSim-python/libcachesim/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
__version__,
99
create_cache,
1010
open_trace,
11+
TraceType,
1112
)
12-
from .const import TraceType
1313
from .eviction import (
1414
ARC,
1515
FIFO,
@@ -23,6 +23,9 @@
2323
TwoQ,
2424
)
2525

26+
from .const import HF_CACHE_DIR
27+
from .dataset import get_trace_file_lists, get_trace_file_path
28+
2629
__all__ = [
2730
"ARC",
2831
"FIFO",
@@ -42,5 +45,8 @@
4245
"__version__",
4346
"create_cache",
4447
"open_trace",
48+
"get_trace_file_lists",
49+
"get_trace_file_path",
50+
"HF_CACHE_DIR",
4551
# TODO(haocheng): add more eviction policies
4652
]

libCacheSim-python/libcachesim/__init__.pyi

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,24 @@ libCacheSim Python bindings
2626
TraceType
2727
"""
2828

29-
from .const import TraceType
29+
import enum
30+
31+
class TraceType(enum.Enum):
32+
CSV_TRACE = 0
33+
BIN_TRACE = 1
34+
PLAIN_TXT_TRACE = 2
35+
ORACLE_GENERAL_TRACE = 3
36+
LCS_TRACE = 4 # libCacheSim format
37+
VSCSI_TRACE = 5
38+
TWR_TRACE = 6
39+
TWRNS_TRACE = 7
40+
ORACLE_SIM_TWR_TRACE = 8
41+
ORACLE_SYS_TWR_TRACE = 9
42+
ORACLE_SIM_TWRNS_TRACE = 10
43+
ORACLE_SYS_TWRNS_TRACE = 11
44+
VALPIN_TRACE = 12
45+
UNKNOWN_TRACE = 13
46+
3047

3148
def create_cache(
3249
eviction_algo: str,
@@ -146,3 +163,8 @@ class Reader:
146163
def get_wss(self, ignore_obj_size: bool = False) -> int: ...
147164
def __iter__(self) -> Reader: ...
148165
def __next__(self) -> Request: ...
166+
167+
168+
# -----------------------------
169+
def get_trace_file_lists(trace_set_name: str) -> list[str]: ...
170+
def get_trace_file_path(trace_file_name: str) -> str: ...
Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,4 @@
11
from __future__ import annotations
22

3-
import enum
4-
5-
6-
class TraceType(enum.Enum):
7-
CSV_TRACE = 0
8-
BIN_TRACE = 1
9-
PLAIN_TXT_TRACE = 2
10-
ORACLE_GENERAL_TRACE = 3
11-
LCS_TRACE = 4 # libCacheSim format
3+
HF_CACHE_DIR = "/mnt/cfs"
4+
DATASET_NAME = "1a1a11a/cache_dataset_oracleGeneral"
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from huggingface_hub import hf_hub_download, list_repo_files
2+
3+
from .const import DATASET_NAME, HF_CACHE_DIR
4+
5+
6+
def get_trace_file_lists(trace_set_name: str) -> list[str]:
7+
"""
8+
Get the list of trace files in the dataset.
9+
"""
10+
repo_files = list_repo_files(repo_id=DATASET_NAME, repo_type="dataset")
11+
# Apply keyword filter
12+
keyword = trace_set_name.lower()
13+
return [file for file in repo_files if keyword in file.lower()]
14+
15+
def get_trace_file_path(trace_file_name: str) -> str:
16+
"""
17+
Get the localpath of a trace file in the dataset. Note, it will download the file to the cache directory.
18+
"""
19+
return hf_hub_download(
20+
repo_id=DATASET_NAME, repo_type="dataset", filename=trace_file_name, cache_dir=HF_CACHE_DIR)

0 commit comments

Comments
 (0)