Skip to content

Commit 57a3e87

Browse files
committed
fix numa core pinning, added jvm tuning
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
1 parent 7a3c5c7 commit 57a3e87

2 files changed

Lines changed: 85 additions & 17 deletions

File tree

.github/workflows/benchmark.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ jobs:
416416
417417
- name: Install dependencies
418418
run: |
419-
sudo apt-get install -y sysstat python3-pip
419+
sudo apt-get install -y sysstat python3-pip numactl
420420
sudo python3 -m pip install psycopg2-binary boto3 hdrhistogram
421421
422422
# async-profiler for flame graphs

.github/workflows/benchmark_orchestrator.py

Lines changed: 84 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -885,7 +885,6 @@ def parse_perf_stat(filepath: Path, hardware_available: bool) -> dict:
885885
class BenchmarkOrchestrator:
886886
"""Main benchmark orchestration class"""
887887

888-
INFRA_CORES = "0-3"
889888
AWS_REGION = "us-east-1"
890889

891890
def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Path,
@@ -919,20 +918,84 @@ def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Pa
919918
self.variance_control = VarianceControl()
920919
self.variance_control.setup(network_delay_ms=network_delay_ms)
921920

922-
# Calculate core allocation after SMT is disabled
923-
cpu_count = os.cpu_count() or 8
924-
if cpu_count > 4:
925-
self.benchmark_cores = f"4-{cpu_count - 1}"
926-
else:
927-
self.benchmark_cores = "0-3"
928-
print(f"WARNING: Only {cpu_count} cores, benchmark shares "
929-
f"cores with server")
930-
print(f"Core allocation: Server={self.INFRA_CORES}, "
931-
f"Benchmark={self.benchmark_cores}")
921+
# Detect NUMA topology and allocate cores accordingly
922+
self._setup_numa_aware_cores()
932923

933924
# Java JAR path - find the shaded JAR dynamically
934925
self.java_jar = self._find_java_jar()
935926

927+
def _get_numa_topology(self) -> dict:
928+
"""
929+
Detect NUMA topology from /sys filesystem.
930+
Returns dict: {node_id: [list of cpu cores]}
931+
"""
932+
numa_nodes = {}
933+
numa_base = Path("/sys/devices/system/node")
934+
935+
if not numa_base.exists():
936+
print(" ⚠ NUMA topology not available, assuming single node")
937+
cpu_count = os.cpu_count() or 8
938+
return {0: list(range(cpu_count))}
939+
940+
for node_dir in sorted(numa_base.glob("node[0-9]*")):
941+
node_id = int(node_dir.name.replace("node", ""))
942+
cpulist_file = node_dir / "cpulist"
943+
if cpulist_file.exists():
944+
cpulist_str = cpulist_file.read_text().strip()
945+
cores = self._parse_cpulist(cpulist_str)
946+
numa_nodes[node_id] = cores
947+
948+
if not numa_nodes:
949+
cpu_count = os.cpu_count() or 8
950+
return {0: list(range(cpu_count))}
951+
952+
return numa_nodes
953+
954+
def _parse_cpulist(self, cpulist: str) -> list:
955+
"""Parse CPU list format like '0-3,8-11' into [0,1,2,3,8,9,10,11]"""
956+
cores = []
957+
for part in cpulist.split(","):
958+
if "-" in part:
959+
start, end = part.split("-")
960+
cores.extend(range(int(start), int(end) + 1))
961+
else:
962+
cores.append(int(part))
963+
return sorted(cores)
964+
965+
def _setup_numa_aware_cores(self):
966+
"""
967+
Detect NUMA topology and allocate cores from a single NUMA node.
968+
Uses the NUMA node with the most cores.
969+
"""
970+
numa_topology = self._get_numa_topology()
971+
972+
print(f"NUMA topology detected: {len(numa_topology)} node(s)")
973+
for node_id, cores in numa_topology.items():
974+
print(f" Node {node_id}: {len(cores)} cores ({min(cores)}-{max(cores)})")
975+
976+
# Pick the NUMA node with the most cores
977+
best_node = max(numa_topology.keys(), key=lambda n: len(numa_topology[n]))
978+
node_cores = numa_topology[best_node]
979+
980+
self.numa_node = best_node
981+
982+
# Allocate cores: first 4 for server, rest for benchmark
983+
if len(node_cores) >= 8:
984+
self.infra_cores = f"{node_cores[0]}-{node_cores[3]}"
985+
self.benchmark_cores = f"{node_cores[4]}-{node_cores[-1]}"
986+
elif len(node_cores) > 4:
987+
self.infra_cores = f"{node_cores[0]}-{node_cores[3]}"
988+
self.benchmark_cores = f"{node_cores[4]}-{node_cores[-1]}"
989+
else:
990+
# Not enough cores, share them
991+
self.infra_cores = f"{node_cores[0]}-{node_cores[-1]}"
992+
self.benchmark_cores = self.infra_cores
993+
print(f"WARNING: Only {len(node_cores)} cores on NUMA node {best_node}, "
994+
f"benchmark shares cores with server")
995+
996+
print(f"NUMA node {best_node} selected")
997+
print(f"Core allocation: Server={self.infra_cores}, Benchmark={self.benchmark_cores}")
998+
936999
def _find_java_jar(self) -> Path:
9371000
"""Find the benchmark JAR file dynamically."""
9381001
target_dir = self.resp_bench_dir / "java/target"
@@ -991,15 +1054,15 @@ def _pin_server_processes(self):
9911054
try:
9921055
pid = int(pid_file.read_text().strip())
9931056
result = subprocess.run(
994-
["taskset", "-cp", self.INFRA_CORES, str(pid)],
1057+
["taskset", "-cp", self.infra_cores, str(pid)],
9951058
capture_output=True, text=True)
9961059
if result.returncode == 0:
9971060
pinned += 1
9981061
else:
9991062
print(f" Warning: Failed to pin PID {pid}: {result.stderr}")
10001063
except (ValueError, FileNotFoundError) as e:
10011064
print(f" Warning: Could not read PID from {pid_file}: {e}")
1002-
print(f"Pinned {pinned} server processes to cores {self.INFRA_CORES}")
1065+
print(f"Pinned {pinned} server processes to cores {self.infra_cores}")
10031066

10041067
def start_infrastructure(self):
10051068
if self.skip_infra:
@@ -1012,7 +1075,7 @@ def start_infrastructure(self):
10121075
stop_target = "server-cluster-stop" if is_cluster else "server-standalone-stop"
10131076
port = self._get_server_port()
10141077

1015-
print(f"Starting Valkey {mode_name} infrastructure on cores {self.INFRA_CORES}...")
1078+
print(f"Starting Valkey {mode_name} infrastructure...")
10161079
subprocess.run(["make", stop_target], cwd=self.resp_bench_dir,
10171080
capture_output=True)
10181081
subprocess.run(["pkill", "-f", "valkey-server"], capture_output=True)
@@ -1023,7 +1086,7 @@ def start_infrastructure(self):
10231086
shutil.rmtree(work_dir)
10241087

10251088
result = subprocess.run(
1026-
["taskset", "-c", self.INFRA_CORES, "make", make_target],
1089+
["make", make_target],
10271090
cwd=self.resp_bench_dir, timeout=600)
10281091
if result.returncode != 0:
10291092
raise RuntimeError(
@@ -1060,9 +1123,14 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen:
10601123
server = f"localhost:{port}"
10611124

10621125
cmd = [
1126+
"numactl",
1127+
f"--cpunodebind={self.numa_node}",
1128+
f"--membind={self.numa_node}",
10631129
"taskset", "-c", self.benchmark_cores,
10641130
"java",
10651131
"-XX:+EnableDynamicAgentLoading", # Allow async-profiler to attach
1132+
"-Xms4g", "-Xmx4g", # Fixed heap size to avoid resize pauses
1133+
"-XX:+AlwaysPreTouch", # Pre-fault heap pages at startup
10661134
"-jar", str(self.java_jar),
10671135
"--server", server,
10681136
"--driver", str(self.driver_config_path),
@@ -1074,7 +1142,7 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen:
10741142
if self.resp_bench_commit:
10751143
cmd.extend(["--commit-id", self.resp_bench_commit])
10761144

1077-
print(f"Starting Java benchmark on cores {self.benchmark_cores}")
1145+
print(f"Starting Java benchmark on NUMA node {self.numa_node}, cores {self.benchmark_cores}")
10781146
print(f" Server: {server}")
10791147
print(f" Driver: {self.driver_config_path}")
10801148
print(f" Workload: {self.workload_config_path}")

0 commit comments

Comments
 (0)