@@ -42,6 +42,9 @@ class WorkloadConfig:
4242 # Whether to include user id in request header
4343 enable_user_id : bool
4444
45+ # Whether strictly cap active sessions at num_users
46+ enforce_strict_concurrent_users : bool = False
47+
4548
4649@dataclass
4750class UserConfig :
@@ -457,6 +460,10 @@ def __init__(
457460 if self .use_sharegpt :
458461 self ._load_sharegpt_data ()
459462
463+ self .enforce_strict_concurrent_users = (
464+ workload_config .enforce_strict_concurrent_users
465+ )
466+
460467 def _load_sharegpt_data (self ):
461468 with open ("ShareGPT.json" , "r" , encoding = "utf-8" ) as file :
462469 self .sharegpt_data = json .load (file )
@@ -499,15 +506,28 @@ def _remove_finished_sessions(self):
499506 self .session_summaries .append (session .summary ())
500507 self .sessions = [s for s in self .sessions if not s .finished ]
501508
509+ def _can_join_user (self , timestamp : float ) -> bool :
510+ # No new user session if gap_between_users time interval not meets
511+ if timestamp - self .last_user_join <= self .gap_between_users :
512+ return False
513+
514+ # No user seession if active user count is less than configured
515+ if (
516+ self .enforce_strict_concurrent_users
517+ and len (self .sessions ) >= self .workload_config .num_users
518+ ):
519+ return False
520+ return True
521+
502522 def step (self , timestamp : float , executor : RequestExecutor ):
503523 if self .need_ramp_up :
504524 self ._ramp_up (timestamp , self .ramp_up_time )
505525
506526 if self .start_time is None :
507527 self .start_time = timestamp
508528
509- # New user session only joins when meets time interval and active user count is less than configured
510- if timestamp - self .last_user_join > self . gap_between_users and len ( self . sessions ) < self . workload_config . num_users :
529+ # Check if can join new user session
530+ if self ._can_join_user ( timestamp ) :
511531 new_session = self ._create_user_session ()
512532 if new_session is not None :
513533 self .last_user_join = timestamp
@@ -704,6 +724,11 @@ def parse_arguments() -> WorkloadConfig:
704724 parser .add_argument (
705725 "--sharegpt" , action = "store_true" , help = "Whether to use ShareGPT dataset"
706726 )
727+ parser .add_argument (
728+ "--enforce-strict-concurrent-users" ,
729+ action = "store_true" ,
730+ help = "Strictly enforce concurrent users count to match --num-users" ,
731+ )
707732 args = parser .parse_args ()
708733 return args
709734
@@ -750,6 +775,7 @@ def main():
750775 qps = args .qps ,
751776 model = args .model ,
752777 enable_user_id = args .request_with_user_id ,
778+ enforce_strict_concurrent_users = args .enforce_strict_concurrent_users ,
753779 )
754780
755781 manager = UserSessionManager (
0 commit comments