@@ -885,7 +885,6 @@ def parse_perf_stat(filepath: Path, hardware_available: bool) -> dict:
885885class BenchmarkOrchestrator :
886886 """Main benchmark orchestration class"""
887887
888- INFRA_CORES = "0-3"
889888 AWS_REGION = "us-east-1"
890889
891890 def __init__ (self , resp_bench_dir : Path , resp_bench_commit : str , output_file : Path ,
@@ -919,20 +918,89 @@ def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Pa
919918 self .variance_control = VarianceControl ()
920919 self .variance_control .setup (network_delay_ms = network_delay_ms )
921920
922- # Calculate core allocation after SMT is disabled
923- cpu_count = os .cpu_count () or 8
924- if cpu_count > 4 :
925- self .benchmark_cores = f"4-{ cpu_count - 1 } "
926- else :
927- self .benchmark_cores = "0-3"
928- print (f"WARNING: Only { cpu_count } cores, benchmark shares "
929- f"cores with server" )
930- print (f"Core allocation: Server={ self .INFRA_CORES } , "
931- f"Benchmark={ self .benchmark_cores } " )
921+ # Detect NUMA topology and allocate cores accordingly
922+ self ._setup_numa_aware_cores ()
932923
933924 # Java JAR path - find the shaded JAR dynamically
934925 self .java_jar = self ._find_java_jar ()
935926
927+ def _get_numa_topology (self ) -> dict :
928+ """
929+ Detect NUMA topology from /sys filesystem.
930+ Returns dict: {node_id: [list of cpu cores]}
931+ """
932+ numa_nodes = {}
933+ numa_base = Path ("/sys/devices/system/node" )
934+
935+ if not numa_base .exists ():
936+ print (" ⚠ NUMA topology not available, assuming single node" )
937+ cpu_count = os .cpu_count () or 8
938+ return {0 : list (range (cpu_count ))}
939+
940+ for node_dir in sorted (numa_base .glob ("node[0-9]*" )):
941+ node_id = int (node_dir .name .replace ("node" , "" ))
942+ cpulist_file = node_dir / "cpulist"
943+ if cpulist_file .exists ():
944+ cpulist_str = cpulist_file .read_text ().strip ()
945+ cores = self ._parse_cpulist (cpulist_str )
946+ numa_nodes [node_id ] = cores
947+
948+ if not numa_nodes :
949+ cpu_count = os .cpu_count () or 8
950+ return {0 : list (range (cpu_count ))}
951+
952+ return numa_nodes
953+
954+ def _parse_cpulist (self , cpulist : str ) -> list :
955+ """Parse CPU list format like '0-3,8-11' into [0,1,2,3,8,9,10,11]"""
956+ cores = []
957+ for part in cpulist .split ("," ):
958+ if "-" in part :
959+ start , end = part .split ("-" )
960+ cores .extend (range (int (start ), int (end ) + 1 ))
961+ else :
962+ cores .append (int (part ))
963+ return sorted (cores )
964+
965+ def _setup_numa_aware_cores (self ):
966+ """
967+ Detect NUMA topology and allocate cores across NUMA nodes.
968+ Server runs on node 0, benchmark runs on node 1 (if available).
969+ """
970+ numa_topology = self ._get_numa_topology ()
971+
972+ print (f"NUMA topology detected: { len (numa_topology )} node(s)" )
973+ for node_id , cores in numa_topology .items ():
974+ print (f" Node { node_id } : { len (cores )} cores ({ min (cores )} -{ max (cores )} )" )
975+
976+ if len (numa_topology ) >= 2 :
977+ # Two or more NUMA nodes: server on node 0, benchmark on node 1
978+ node0_cores = numa_topology [0 ]
979+ node1_cores = numa_topology [1 ]
980+
981+ self .infra_numa_node = 0
982+ self .benchmark_numa_node = 1
983+
984+ # Use all cores on each node
985+ self .infra_cores = f"{ node0_cores [0 ]} -{ node0_cores [- 1 ]} "
986+ self .benchmark_cores = f"{ node1_cores [0 ]} -{ node1_cores [- 1 ]} "
987+
988+ print (f"Split NUMA allocation:" )
989+ print (f" Server: NUMA node { self .infra_numa_node } , cores { self .infra_cores } " )
990+ print (f" Benchmark: NUMA node { self .benchmark_numa_node } , cores { self .benchmark_cores } " )
991+ else :
992+ # Single NUMA node: all cores shared by server and benchmark
993+ node_cores = numa_topology [0 ]
994+ self .infra_numa_node = 0
995+ self .benchmark_numa_node = 0
996+
997+ all_cores = f"{ node_cores [0 ]} -{ node_cores [- 1 ]} "
998+ self .infra_cores = all_cores
999+ self .benchmark_cores = all_cores
1000+
1001+ print (f"Single NUMA node { self .infra_numa_node } : all cores shared" )
1002+ print (f" Cores: { all_cores } " )
1003+
9361004 def _find_java_jar (self ) -> Path :
9371005 """Find the benchmark JAR file dynamically."""
9381006 target_dir = self .resp_bench_dir / "java/target"
@@ -984,22 +1052,27 @@ def _get_server_port(self) -> int:
9841052 return 7379 if self ._is_cluster_mode () else 6379
9851053
9861054 def _pin_server_processes (self ):
987- """Pin all running valkey-server processes to designated cores."""
1055+ """Pin all running valkey-server processes to designated cores and NUMA node ."""
9881056 work_dir = self .resp_bench_dir / "work"
9891057 pinned = 0
9901058 for pid_file in work_dir .glob ("*.pid" ):
9911059 try :
9921060 pid = int (pid_file .read_text ().strip ())
1061+ # Pin CPU to infra cores
9931062 result = subprocess .run (
994- ["taskset" , "-cp" , self .INFRA_CORES , str (pid )],
1063+ ["taskset" , "-cp" , self .infra_cores , str (pid )],
9951064 capture_output = True , text = True )
9961065 if result .returncode == 0 :
9971066 pinned += 1
9981067 else :
9991068 print (f" Warning: Failed to pin PID { pid } : { result .stderr } " )
1069+ # Migrate memory to infra NUMA node
1070+ subprocess .run (
1071+ ["migratepages" , str (pid ), "all" , str (self .infra_numa_node )],
1072+ capture_output = True , text = True )
10001073 except (ValueError , FileNotFoundError ) as e :
10011074 print (f" Warning: Could not read PID from { pid_file } : { e } " )
1002- print (f"Pinned { pinned } server processes to cores { self .INFRA_CORES } " )
1075+ print (f"Pinned { pinned } server processes to NUMA node { self . infra_numa_node } , cores { self .infra_cores } " )
10031076
10041077 def start_infrastructure (self ):
10051078 if self .skip_infra :
@@ -1012,7 +1085,7 @@ def start_infrastructure(self):
10121085 stop_target = "server-cluster-stop" if is_cluster else "server-standalone-stop"
10131086 port = self ._get_server_port ()
10141087
1015- print (f"Starting Valkey { mode_name } infrastructure on cores { self .INFRA_CORES } ..." )
1088+ print (f"Starting Valkey { mode_name } infrastructure on NUMA node { self .infra_numa_node } ..." )
10161089 subprocess .run (["make" , stop_target ], cwd = self .resp_bench_dir ,
10171090 capture_output = True )
10181091 subprocess .run (["pkill" , "-f" , "valkey-server" ], capture_output = True )
@@ -1023,7 +1096,8 @@ def start_infrastructure(self):
10231096 shutil .rmtree (work_dir )
10241097
10251098 result = subprocess .run (
1026- ["taskset" , "-c" , self .INFRA_CORES , "make" , make_target ],
1099+ ["numactl" , f"--cpunodebind={ self .infra_numa_node } " ,
1100+ f"--membind={ self .infra_numa_node } " , "make" , make_target ],
10271101 cwd = self .resp_bench_dir , timeout = 600 )
10281102 if result .returncode != 0 :
10291103 raise RuntimeError (
@@ -1060,6 +1134,9 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen:
10601134 server = f"localhost:{ port } "
10611135
10621136 cmd = [
1137+ "numactl" ,
1138+ f"--cpunodebind={ self .benchmark_numa_node } " ,
1139+ f"--membind={ self .benchmark_numa_node } " ,
10631140 "taskset" , "-c" , self .benchmark_cores ,
10641141 "java" ,
10651142 "-XX:+EnableDynamicAgentLoading" , # Allow async-profiler to attach
@@ -1074,7 +1151,7 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen:
10741151 if self .resp_bench_commit :
10751152 cmd .extend (["--commit-id" , self .resp_bench_commit ])
10761153
1077- print (f"Starting Java benchmark on cores { self .benchmark_cores } " )
1154+ print (f"Starting Java benchmark on NUMA node { self . benchmark_numa_node } , cores { self .benchmark_cores } " )
10781155 print (f" Server: { server } " )
10791156 print (f" Driver: { self .driver_config_path } " )
10801157 print (f" Workload: { self .workload_config_path } " )
0 commit comments