The CallbackFunctionExecutor allows chaining a trigger function with a callback function.
This is useful for post-test workflows — for example, run a test, then automatically
generate a report.
from je_load_density import callback_executor
def after_test():
print("Test finished, generating report...")
callback_executor.callback_function(
trigger_function_name="user_test",
callback_function=after_test,
user_detail_dict={"user": "fast_http_user"},
user_count=10,
spawn_rate=5,
test_time=5,
tasks={"get": {"request_url": "http://httpbin.org/get"}},
)- The
trigger_function_nameis looked up in the executor'sevent_dict - The trigger function is executed with the provided
**kwargs - After the trigger function completes, the
callback_functionis called - The return value of the trigger function is returned
| Trigger Name | Function |
|---|---|
user_test |
start_test() — Run a load test |
LD_generate_html |
generate_html() — Generate HTML fragments |
LD_generate_html_report |
generate_html_report() — Generate HTML report file |
LD_generate_json |
generate_json() — Generate JSON data |
LD_generate_json_report |
generate_json_report() — Generate JSON report files |
LD_generate_xml |
generate_xml() — Generate XML strings |
LD_generate_xml_report |
generate_xml_report() — Generate XML report files |
With keyword arguments (default):
def my_callback(report_name, format_type):
print(f"Generating {format_type} report: {report_name}")
callback_executor.callback_function(
trigger_function_name="user_test",
callback_function=my_callback,
callback_function_param={"report_name": "final", "format_type": "html"},
callback_param_method="kwargs",
user_detail_dict={"user": "fast_http_user"},
user_count=10,
spawn_rate=5,
test_time=5,
tasks={"get": {"request_url": "http://httpbin.org/get"}},
)With positional arguments:
def my_callback(arg1, arg2):
print(f"Args: {arg1}, {arg2}")
callback_executor.callback_function(
trigger_function_name="user_test",
callback_function=my_callback,
callback_function_param=["value1", "value2"],
callback_param_method="args",
user_detail_dict={"user": "fast_http_user"},
user_count=10,
spawn_rate=5,
test_time=5,
tasks={"get": {"request_url": "http://httpbin.org/get"}},
)| Parameter | Type | Description |
|---|---|---|
trigger_function_name |
str |
Name of function in event_dict to trigger |
callback_function |
Callable |
Callback function to execute after the trigger |
callback_function_param |
dict or list or None |
Parameters for the callback (dict for kwargs, list for args) |
callback_param_method |
str |
"kwargs" (default) or "args" |
**kwargs |
— | Parameters passed to the trigger function |
CallbackExecutorExceptionis raised if:trigger_function_nameis not found inevent_dictcallback_param_methodis not"kwargs"or"args"