@@ -964,37 +964,48 @@ def _parse_cpulist(self, cpulist: str) -> list:
964964
965965 def _setup_numa_aware_cores (self ):
966966 """
967- Detect NUMA topology and allocate cores from a single NUMA node .
968- Uses the NUMA node with the most cores .
967+ Detect NUMA topology and allocate cores across NUMA nodes .
968+ Server runs on node 0, benchmark runs on node 1 (if available) .
969969 """
970970 numa_topology = self ._get_numa_topology ()
971971
972972 print (f"NUMA topology detected: { len (numa_topology )} node(s)" )
973973 for node_id , cores in numa_topology .items ():
974974 print (f" Node { node_id } : { len (cores )} cores ({ min (cores )} -{ max (cores )} )" )
975975
976- # Pick the NUMA node with the most cores
977- best_node = max (numa_topology .keys (), key = lambda n : len (numa_topology [n ]))
978- node_cores = numa_topology [best_node ]
976+ if len (numa_topology ) >= 2 :
977+ # Two or more NUMA nodes: server on node 0, benchmark on node 1
978+ node0_cores = numa_topology [0 ]
979+ node1_cores = numa_topology [1 ]
979980
980- self .numa_node = best_node
981+ self .infra_numa_node = 0
982+ self .benchmark_numa_node = 1
981983
982- # Allocate cores: first 4 for server, rest for benchmark
983- if len ( node_cores ) >= 8 :
984- self .infra_cores = f"{ node_cores [0 ]} -{ node_cores [ 3 ]} "
985- self . benchmark_cores = f" { node_cores [ 4 ] } - { node_cores [ - 1 ] } "
986- elif len ( node_cores ) > 4 :
987- self .infra_cores = f" { node_cores [ 0 ] } - { node_cores [ 3 ] } "
988- self .benchmark_cores = f" { node_cores [ 4 ] } - { node_cores [ - 1 ] } "
984+ # Use all cores on each node
985+ self . infra_cores = f" { node0_cores [ 0 ] } - { node0_cores [ - 1 ] } "
986+ self .benchmark_cores = f"{ node1_cores [0 ]} -{ node1_cores [ - 1 ]} "
987+
988+ print ( f"Split NUMA allocation:" )
989+ print ( f" Server: NUMA node { self .infra_numa_node } , cores { self . infra_cores } " )
990+ print ( f" Benchmark: NUMA node { self .benchmark_numa_node } , cores { self . benchmark_cores } " )
989991 else :
990- # Not enough cores, share them
991- self .infra_cores = f"{ node_cores [0 ]} -{ node_cores [- 1 ]} "
992- self .benchmark_cores = self .infra_cores
993- print (f"WARNING: Only { len (node_cores )} cores on NUMA node { best_node } , "
994- f"benchmark shares cores with server" )
992+ # Single NUMA node: 1 core for server, rest for benchmark
993+ node_cores = numa_topology [0 ]
994+ self .infra_numa_node = 0
995+ self .benchmark_numa_node = 0
996+
997+ if len (node_cores ) >= 2 :
998+ self .infra_cores = str (node_cores [0 ])
999+ self .benchmark_cores = f"{ node_cores [1 ]} -{ node_cores [- 1 ]} "
1000+ else :
1001+ # Only 1 core, share it
1002+ self .infra_cores = str (node_cores [0 ])
1003+ self .benchmark_cores = self .infra_cores
1004+ print (f"WARNING: Only 1 core, benchmark shares core with server" )
9951005
996- print (f"NUMA node { best_node } selected" )
997- print (f"Core allocation: Server={ self .infra_cores } , Benchmark={ self .benchmark_cores } " )
1006+ print (f"Single NUMA node { self .infra_numa_node } :" )
1007+ print (f" Server cores: { self .infra_cores } " )
1008+ print (f" Benchmark cores: { self .benchmark_cores } " )
9981009
9991010 def _find_java_jar (self ) -> Path :
10001011 """Find the benchmark JAR file dynamically."""
@@ -1047,22 +1058,27 @@ def _get_server_port(self) -> int:
10471058 return 7379 if self ._is_cluster_mode () else 6379
10481059
10491060 def _pin_server_processes (self ):
1050- """Pin all running valkey-server processes to designated cores."""
1061+ """Pin all running valkey-server processes to designated cores and NUMA node ."""
10511062 work_dir = self .resp_bench_dir / "work"
10521063 pinned = 0
10531064 for pid_file in work_dir .glob ("*.pid" ):
10541065 try :
10551066 pid = int (pid_file .read_text ().strip ())
1067+ # Pin CPU to infra cores
10561068 result = subprocess .run (
10571069 ["taskset" , "-cp" , self .infra_cores , str (pid )],
10581070 capture_output = True , text = True )
10591071 if result .returncode == 0 :
10601072 pinned += 1
10611073 else :
10621074 print (f" Warning: Failed to pin PID { pid } : { result .stderr } " )
1075+ # Migrate memory to infra NUMA node
1076+ subprocess .run (
1077+ ["migratepages" , str (pid ), "all" , str (self .infra_numa_node )],
1078+ capture_output = True , text = True )
10631079 except (ValueError , FileNotFoundError ) as e :
10641080 print (f" Warning: Could not read PID from { pid_file } : { e } " )
1065- print (f"Pinned { pinned } server processes to cores { self .infra_cores } " )
1081+ print (f"Pinned { pinned } server processes to NUMA node { self . infra_numa_node } , cores { self .infra_cores } " )
10661082
10671083 def start_infrastructure (self ):
10681084 if self .skip_infra :
@@ -1075,7 +1091,7 @@ def start_infrastructure(self):
10751091 stop_target = "server-cluster-stop" if is_cluster else "server-standalone-stop"
10761092 port = self ._get_server_port ()
10771093
1078- print (f"Starting Valkey { mode_name } infrastructure..." )
1094+ print (f"Starting Valkey { mode_name } infrastructure on NUMA node { self . infra_numa_node } ..." )
10791095 subprocess .run (["make" , stop_target ], cwd = self .resp_bench_dir ,
10801096 capture_output = True )
10811097 subprocess .run (["pkill" , "-f" , "valkey-server" ], capture_output = True )
@@ -1086,7 +1102,8 @@ def start_infrastructure(self):
10861102 shutil .rmtree (work_dir )
10871103
10881104 result = subprocess .run (
1089- ["make" , make_target ],
1105+ ["numactl" , f"--cpunodebind={ self .infra_numa_node } " ,
1106+ f"--membind={ self .infra_numa_node } " , "make" , make_target ],
10901107 cwd = self .resp_bench_dir , timeout = 600 )
10911108 if result .returncode != 0 :
10921109 raise RuntimeError (
@@ -1124,8 +1141,8 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen:
11241141
11251142 cmd = [
11261143 "numactl" ,
1127- f"--cpunodebind={ self .numa_node } " ,
1128- f"--membind={ self .numa_node } " ,
1144+ f"--cpunodebind={ self .benchmark_numa_node } " ,
1145+ f"--membind={ self .benchmark_numa_node } " ,
11291146 "taskset" , "-c" , self .benchmark_cores ,
11301147 "java" ,
11311148 "-XX:+EnableDynamicAgentLoading" , # Allow async-profiler to attach
@@ -1140,7 +1157,7 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen:
11401157 if self .resp_bench_commit :
11411158 cmd .extend (["--commit-id" , self .resp_bench_commit ])
11421159
1143- print (f"Starting Java benchmark on NUMA node { self .numa_node } , cores { self .benchmark_cores } " )
1160+ print (f"Starting Java benchmark on NUMA node { self .benchmark_numa_node } , cores { self .benchmark_cores } " )
11441161 print (f" Server: { server } " )
11451162 print (f" Driver: { self .driver_config_path } " )
11461163 print (f" Workload: { self .workload_config_path } " )
0 commit comments