diff --git a/bindings/sysman/python/source/cts_tests/conftest.py b/bindings/sysman/python/source/cts_tests/conftest.py new file mode 100644 index 00000000..a7d1ea60 --- /dev/null +++ b/bindings/sysman/python/source/cts_tests/conftest.py @@ -0,0 +1,74 @@ +## +# Copyright (C) 2026 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +import sys +from ctypes import byref, c_uint32 + +import pytest + +CTS_TESTS_DIR = os.path.dirname(os.path.abspath(__file__)) +SOURCE_DIR = os.path.abspath(os.path.join(CTS_TESTS_DIR, "..")) + +if SOURCE_DIR not in sys.path: + sys.path.insert(0, SOURCE_DIR) + + +@pytest.fixture(scope="session") +def pyzes_module(): + try: + import pyzes as pz + except (ImportError, OSError) as exc: + pytest.skip(f"pyzes is not available for CTS validation: {exc}") + + return pz + + +@pytest.fixture(scope="session") +def sysman_devices(pyzes_module): + pz = pyzes_module + + rc = pz.zesInit(0) + if rc != pz.ZE_RESULT_SUCCESS: + pytest.skip(f"zesInit failed with ze_result_t={rc}") + + driver_count = c_uint32(0) + rc = pz.zesDriverGet(byref(driver_count), None) + if rc != pz.ZE_RESULT_SUCCESS: + pytest.skip(f"zesDriverGet(count) failed with ze_result_t={rc}") + + if driver_count.value == 0: + pytest.skip("No Sysman drivers available") + + DriverArray = pz.zes_driver_handle_t * driver_count.value + drivers = DriverArray() + rc = pz.zesDriverGet(byref(driver_count), drivers) + if rc != pz.ZE_RESULT_SUCCESS: + pytest.skip(f"zesDriverGet(handles) failed with ze_result_t={rc}") + + devices = [] + for driver in drivers: + device_count = c_uint32(0) + rc = pz.zesDeviceGet(driver, byref(device_count), None) + if rc != pz.ZE_RESULT_SUCCESS: + continue + + if device_count.value == 0: + continue + + DeviceArray = pz.zes_device_handle_t * device_count.value + driver_devices = DeviceArray() + rc = pz.zesDeviceGet(driver, byref(device_count), driver_devices) + if rc != pz.ZE_RESULT_SUCCESS: + continue + + devices.extend(driver_devices) + + if not devices: + pytest.skip("No Sysman devices available") + + return devices diff --git a/bindings/sysman/python/source/cts_tests/test_power_cts.py b/bindings/sysman/python/source/cts_tests/test_power_cts.py new file mode 100644 index 00000000..ceea5126 --- /dev/null +++ b/bindings/sysman/python/source/cts_tests/test_power_cts.py @@ -0,0 +1,388 @@ +## +# Copyright (C) 2026 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +from ctypes import byref, c_uint32 + +import pytest + + +class TestPowerCts: + """Pytest fixture class""" + + @pytest.fixture(autouse=True) + def _set_up_fixture(self, pyzes_module, sysman_devices): + self.pz = pyzes_module + self.devices = sysman_devices + + def _get_power_handles(self, device): + power_count = c_uint32(0) + rc = self.pz.zesDeviceEnumPowerDomains(device, byref(power_count), None) + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesDeviceEnumPowerDomains(count) failed with ze_result_t={rc}" + + if power_count.value == 0: + return [] + + PowerArray = self.pz.zes_pwr_handle_t * power_count.value + power_handles = PowerArray() + rc = self.pz.zesDeviceEnumPowerDomains( + device, byref(power_count), power_handles + ) + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesDeviceEnumPowerDomains(handles) failed with ze_result_t={rc}" + + return list(power_handles) + + def _get_power_properties(self, power_handle): + default_limit = self.pz.zes_power_limit_ext_desc_t() + ext_properties = self.pz.zes_power_ext_properties_t() + ext_properties.stype = self.pz.ZES_STRUCTURE_TYPE_POWER_EXT_PROPERTIES + ext_properties.pNext = None + ext_properties.defaultLimit = self.pz.pointer(default_limit) + + properties = self.pz.zes_power_properties_t() + properties.stype = self.pz.ZES_STRUCTURE_TYPE_POWER_PROPERTIES + properties.pNext = self.pz.cast( + self.pz.pointer(ext_properties), self.pz.c_void_p + ) + + rc = self.pz.zesPowerGetProperties(power_handle, byref(properties)) + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesPowerGetProperties failed with ze_result_t={rc}" + + return properties, ext_properties, default_limit + + def _copy_power_limit_descriptor(self, source_descriptor): + copied_descriptor = self.pz.zes_power_limit_ext_desc_t() + copied_descriptor.stype = self.pz.ZES_STRUCTURE_TYPE_POWER_LIMIT_EXT_DESC + copied_descriptor.pNext = None + copied_descriptor.level = source_descriptor.level + copied_descriptor.source = source_descriptor.source + copied_descriptor.limitUnit = source_descriptor.limitUnit + copied_descriptor.enabledStateLocked = source_descriptor.enabledStateLocked + copied_descriptor.enabled = source_descriptor.enabled + copied_descriptor.intervalValueLocked = source_descriptor.intervalValueLocked + copied_descriptor.interval = source_descriptor.interval + copied_descriptor.limitValueLocked = source_descriptor.limitValueLocked + copied_descriptor.limit = source_descriptor.limit + return copied_descriptor + + def _get_power_limits_ext_descriptors(self, power_handle): + limit_count = c_uint32(0) + rc = self.pz.zesPowerGetLimitsExt(power_handle, byref(limit_count), None) + if rc == self.pz.ZE_RESULT_ERROR_UNSUPPORTED_FEATURE: + return None + + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesPowerGetLimitsExt(count) failed with ze_result_t={rc}" + + if limit_count.value == 0: + return [] + + LimitsArray = self.pz.zes_power_limit_ext_desc_t * limit_count.value + limit_descriptors = LimitsArray() + + for index in range(limit_count.value): + limit_descriptors[index].stype = ( + self.pz.ZES_STRUCTURE_TYPE_POWER_LIMIT_EXT_DESC + ) + limit_descriptors[index].pNext = None + + rc = self.pz.zesPowerGetLimitsExt( + power_handle, byref(limit_count), limit_descriptors + ) + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesPowerGetLimitsExt(descriptors) failed with ze_result_t={rc}" + + return [ + self._copy_power_limit_descriptor(descriptor) + for descriptor in limit_descriptors + ] + + def _iter_supported_power_handles(self): + found_power_support = False + + for device in self.devices: + power_handles = self._get_power_handles(device) + if power_handles: + found_power_support = True + for power_handle in power_handles: + yield power_handle + + assert found_power_support, "No power handles found on any of the devices" + + def test_GivenValidDeviceWhenEnumeratingPowerHandlesThenNonZeroCountAndValidPowerHandlesAreReturned( + self, + ): + found_power_support = False + + for device in self.devices: + power_handles = self._get_power_handles(device) + if not power_handles: + continue + + found_power_support = True + for power_handle in power_handles: + assert power_handle.value is not None, "Expected a valid power handle" + + assert found_power_support, "No power handles found on any of the devices" + + def test_GivenValidDeviceWhenRetrievingPowerHandlesThenSimilarHandlesAreReturnedTwice( + self, + ): + found_power_support = False + + for device in self.devices: + initial_handles = self._get_power_handles(device) + later_handles = self._get_power_handles(device) + + if not initial_handles: + continue + + found_power_support = True + assert initial_handles == later_handles + + assert found_power_support, "No power handles found on any of the devices" + + def test_GivenValidDeviceWhenRetrievingPowerHandlesThenActualHandleCountIsUpdatedAndIfRequestedHandlesAreLessThanActualHandleCountThenDesiredNumberOfHandlesAreReturned( + self, + ): + found_power_support = False + + for device in self.devices: + power_count = c_uint32(0) + rc = self.pz.zesDeviceEnumPowerDomains(device, byref(power_count), None) + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesDeviceEnumPowerDomains(count) failed with ze_result_t={rc}" + + if power_count.value == 0: + continue + + found_power_support = True + actual_count = power_count.value + + oversized_count = c_uint32(actual_count + 1) + OversizedArray = self.pz.zes_pwr_handle_t * oversized_count.value + oversized_handles = OversizedArray() + rc = self.pz.zesDeviceEnumPowerDomains( + device, byref(oversized_count), oversized_handles + ) + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesDeviceEnumPowerDomains(oversized) failed with ze_result_t={rc}" + assert oversized_count.value == actual_count + + if actual_count > 1: + reduced_count = c_uint32(actual_count - 1) + ReducedArray = self.pz.zes_pwr_handle_t * reduced_count.value + reduced_handles = ReducedArray() + rc = self.pz.zesDeviceEnumPowerDomains( + device, byref(reduced_count), reduced_handles + ) + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesDeviceEnumPowerDomains(reduced) failed with ze_result_t={rc}" + assert len(reduced_handles) == reduced_count.value + + assert found_power_support, "No power handles found on any of the devices" + + def test_GivenSamePowerHandleWhenRequestingPowerPropertiesThenExpectSamePropertiesTwiceAndLimitsAreInRange( + self, + ): + for power_handle in self._iter_supported_power_handles(): + initial_properties, initial_ext_properties, initial_default_limit = ( + self._get_power_properties(power_handle) + ) + later_properties, later_ext_properties, later_default_limit = ( + self._get_power_properties(power_handle) + ) + + assert initial_properties.onSubdevice == later_properties.onSubdevice + assert initial_properties.subdeviceId == later_properties.subdeviceId + assert initial_properties.canControl == later_properties.canControl + assert ( + initial_properties.isEnergyThresholdSupported + == later_properties.isEnergyThresholdSupported + ) + assert initial_properties.defaultLimit == later_properties.defaultLimit + assert initial_properties.minLimit == later_properties.minLimit + assert initial_properties.maxLimit == later_properties.maxLimit + assert initial_ext_properties.domain == later_ext_properties.domain + assert initial_default_limit.limit == later_default_limit.limit + + if initial_properties.maxLimit != -1: + assert initial_properties.maxLimit > 0 + assert initial_properties.maxLimit >= initial_properties.minLimit + if initial_properties.minLimit != -1: + assert initial_properties.minLimit >= 0 + if initial_properties.defaultLimit != -1: + assert initial_properties.defaultLimit > 0 + + def test_GivenValidPowerHandleThenExpectZesPowerGetEnergyCounterToReturnSuccess( + self, + ): + exercised_any = False + + for power_handle in self._iter_supported_power_handles(): + energy_initial = self.pz.zes_power_energy_counter_t() + rc = self.pz.zesPowerGetEnergyCounter(power_handle, byref(energy_initial)) + if rc == self.pz.ZE_RESULT_ERROR_UNSUPPORTED_FEATURE: + continue + + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesPowerGetEnergyCounter(first) failed with ze_result_t={rc}" + + energy_later = self.pz.zes_power_energy_counter_t() + rc = self.pz.zesPowerGetEnergyCounter(power_handle, byref(energy_later)) + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesPowerGetEnergyCounter(second) failed with ze_result_t={rc}" + + exercised_any = True + assert energy_later.energy >= energy_initial.energy + assert energy_later.timestamp >= energy_initial.timestamp + + if not exercised_any: + pytest.skip( + "zesPowerGetEnergyCounter is unsupported on all available power handles" + ) + + def test_GivenValidRootDevicePowerHandleWhenRequestingPowerLimitsExtThenExpectSameValuesTwice( + self, + ): + exercised_any = False + + for power_handle in self._iter_supported_power_handles(): + properties, _, _ = self._get_power_properties(power_handle) + if properties.onSubdevice: + continue + + first_descriptors = self._get_power_limits_ext_descriptors(power_handle) + if first_descriptors is None: + continue + + second_descriptors = self._get_power_limits_ext_descriptors(power_handle) + exercised_any = True + + assert len(first_descriptors) == len(second_descriptors) + for first_descriptor, second_descriptor in zip( + first_descriptors, second_descriptors + ): + assert first_descriptor.level == second_descriptor.level + assert first_descriptor.source == second_descriptor.source + assert first_descriptor.limitUnit == second_descriptor.limitUnit + assert ( + first_descriptor.enabledStateLocked + == second_descriptor.enabledStateLocked + ) + assert first_descriptor.enabled == second_descriptor.enabled + assert ( + first_descriptor.intervalValueLocked + == second_descriptor.intervalValueLocked + ) + assert first_descriptor.interval == second_descriptor.interval + assert ( + first_descriptor.limitValueLocked + == second_descriptor.limitValueLocked + ) + assert first_descriptor.limit == second_descriptor.limit + + if not exercised_any: + pytest.skip( + "zesPowerGetLimitsExt is unsupported or unavailable on all root-device power handles" + ) + + def test_GivenValidRootPowerHandleWhenSettingMutablePowerLimitDescriptorThenExpectZesPowerSetLimitsExtFollowedByZesPowerGetLimitsExtToMatch( + self, + ): + if getattr(os, "geteuid", None) and os.geteuid() != 0: + pytest.skip("zesPowerSetLimitsExt CTS validation requires root permissions") + + exercised_any = False + + for power_handle in self._iter_supported_power_handles(): + properties, _, _ = self._get_power_properties(power_handle) + if properties.onSubdevice: + continue + + initial_descriptors = self._get_power_limits_ext_descriptors(power_handle) + if initial_descriptors is None or not initial_descriptors: + continue + + mutable_index = None + for index, descriptor in enumerate(initial_descriptors): + if descriptor.limitValueLocked: + continue + if descriptor.limit <= 0: + continue + mutable_index = index + break + + if mutable_index is None: + continue + + modified_descriptors = [ + self._copy_power_limit_descriptor(descriptor) + for descriptor in initial_descriptors + ] + original_descriptor = initial_descriptors[mutable_index] + modified_descriptor = modified_descriptors[mutable_index] + modified_descriptor.limit = max(0, original_descriptor.limit - 1000) + + if modified_descriptor.limit == original_descriptor.limit: + continue + + limit_count = c_uint32(len(modified_descriptors)) + ModifiedArray = self.pz.zes_power_limit_ext_desc_t * len( + modified_descriptors + ) + modified_array = ModifiedArray(*modified_descriptors) + rc = self.pz.zesPowerSetLimitsExt( + power_handle, byref(limit_count), modified_array + ) + if rc == self.pz.ZE_RESULT_ERROR_UNSUPPORTED_FEATURE: + continue + + assert ( + rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesPowerSetLimitsExt(set) failed with ze_result_t={rc}" + + try: + current_descriptors = self._get_power_limits_ext_descriptors( + power_handle + ) + assert current_descriptors is not None + assert ( + current_descriptors[mutable_index].limit + == modified_descriptor.limit + ) + exercised_any = True + finally: + restore_count = c_uint32(len(initial_descriptors)) + RestoreArray = self.pz.zes_power_limit_ext_desc_t * len( + initial_descriptors + ) + restore_array = RestoreArray(*initial_descriptors) + restore_rc = self.pz.zesPowerSetLimitsExt( + power_handle, byref(restore_count), restore_array + ) + assert ( + restore_rc == self.pz.ZE_RESULT_SUCCESS + ), f"zesPowerSetLimitsExt(restore) failed with ze_result_t={restore_rc}" + + if not exercised_any: + pytest.skip( + "No mutable root-device power limit descriptors were available for zesPowerSetLimitsExt" + ) diff --git a/bindings/sysman/python/source/examples/pyzes_black_box_test.py b/bindings/sysman/python/source/examples/pyzes_black_box_test.py index 5c9fce10..1da248e5 100755 --- a/bindings/sysman/python/source/examples/pyzes_black_box_test.py +++ b/bindings/sysman/python/source/examples/pyzes_black_box_test.py @@ -13,6 +13,7 @@ import argparse import os import sys +import time from ctypes import * # Add the source directory to Python path so we can import pyzes @@ -139,6 +140,41 @@ def get_device_type_string(device_type): return type_map.get(device_type, f"UNKNOWN_DEVICE_TYPE_{device_type}") +def get_pci_link_status_string(link_status): + """Convert PCI link status enum to string""" + status_map = { + pz.ZES_PCI_LINK_STATUS_UNKNOWN: "ZES_PCI_LINK_STATUS_UNKNOWN", + pz.ZES_PCI_LINK_STATUS_GOOD: "ZES_PCI_LINK_STATUS_GOOD", + pz.ZES_PCI_LINK_STATUS_QUALITY_ISSUES: "ZES_PCI_LINK_STATUS_QUALITY_ISSUES", + pz.ZES_PCI_LINK_STATUS_STABILITY_ISSUES: "ZES_PCI_LINK_STATUS_STABILITY_ISSUES", + } + return status_map.get(link_status, f"UNKNOWN_PCI_LINK_STATUS_{link_status}") + + +def get_pci_quality_issues_string(quality_issues): + """Convert PCI quality issue flags to string""" + if quality_issues == 0: + return "None" + + issues = [] + if quality_issues & pz.ZES_PCI_LINK_QUAL_ISSUE_FLAG_REPLAYS: + issues.append("REPLAYS") + if quality_issues & pz.ZES_PCI_LINK_QUAL_ISSUE_FLAG_SPEED: + issues.append("SPEED") + return " | ".join(issues) + + +def get_pci_stability_issues_string(stability_issues): + """Convert PCI stability issue flags to string""" + if stability_issues == 0: + return "None" + + issues = [] + if stability_issues & pz.ZES_PCI_LINK_STAB_ISSUE_FLAG_RETRAINING: + issues.append("RETRAINING") + return " | ".join(issues) + + def get_frequency_domain_string(freq_domain): """Convert frequency domain enum to string""" domain_map = { @@ -209,6 +245,86 @@ def get_engine_type_string(engine_type): return type_map.get(engine_type, f"UNKNOWN_ENGINE_TYPE_{engine_type}") +def get_power_domain_string(power_domain): + """Convert power domain enum to string""" + domain_map = { + pz.ZES_POWER_DOMAIN_UNKNOWN: "ZES_POWER_DOMAIN_UNKNOWN", + pz.ZES_POWER_DOMAIN_CARD: "ZES_POWER_DOMAIN_CARD", + pz.ZES_POWER_DOMAIN_PACKAGE: "ZES_POWER_DOMAIN_PACKAGE", + pz.ZES_POWER_DOMAIN_STACK: "ZES_POWER_DOMAIN_STACK", + pz.ZES_POWER_DOMAIN_MEMORY: "ZES_POWER_DOMAIN_MEMORY", + pz.ZES_POWER_DOMAIN_GPU: "ZES_POWER_DOMAIN_GPU", + } + return domain_map.get(power_domain, f"UNKNOWN_POWER_DOMAIN_{power_domain}") + + +def get_power_level_string(power_level): + """Convert power level enum to string""" + level_map = { + pz.ZES_POWER_LEVEL_UNKNOWN: "ZES_POWER_LEVEL_UNKNOWN", + pz.ZES_POWER_LEVEL_SUSTAINED: "ZES_POWER_LEVEL_SUSTAINED", + pz.ZES_POWER_LEVEL_BURST: "ZES_POWER_LEVEL_BURST", + pz.ZES_POWER_LEVEL_PEAK: "ZES_POWER_LEVEL_PEAK", + pz.ZES_POWER_LEVEL_INSTANTANEOUS: "ZES_POWER_LEVEL_INSTANTANEOUS", + } + return level_map.get(power_level, f"UNKNOWN_POWER_LEVEL_{power_level}") + + +def get_power_source_string(power_source): + """Convert power source enum to string""" + source_map = { + pz.ZES_POWER_SOURCE_ANY: "ZES_POWER_SOURCE_ANY", + pz.ZES_POWER_SOURCE_MAINS: "ZES_POWER_SOURCE_MAINS", + pz.ZES_POWER_SOURCE_BATTERY: "ZES_POWER_SOURCE_BATTERY", + } + return source_map.get(power_source, f"UNKNOWN_POWER_SOURCE_{power_source}") + + +def get_limit_unit_string(limit_unit): + """Convert power limit unit enum to string""" + unit_map = { + pz.ZES_LIMIT_UNIT_UNKNOWN: "ZES_LIMIT_UNIT_UNKNOWN", + pz.ZES_LIMIT_UNIT_CURRENT: "ZES_LIMIT_UNIT_CURRENT", + pz.ZES_LIMIT_UNIT_POWER: "ZES_LIMIT_UNIT_POWER", + } + return unit_map.get(limit_unit, f"UNKNOWN_LIMIT_UNIT_{limit_unit}") + + +def get_ecc_state_string(ecc_state): + """Convert ECC state enum to string""" + state_map = { + pz.ZES_DEVICE_ECC_STATE_UNAVAILABLE: "ZES_DEVICE_ECC_STATE_UNAVAILABLE", + pz.ZES_DEVICE_ECC_STATE_ENABLED: "ZES_DEVICE_ECC_STATE_ENABLED", + pz.ZES_DEVICE_ECC_STATE_DISABLED: "ZES_DEVICE_ECC_STATE_DISABLED", + } + return state_map.get(ecc_state, f"UNKNOWN_ECC_STATE_{ecc_state}") + + +def get_device_action_string(action): + """Convert device action enum to string""" + action_map = { + pz.ZES_DEVICE_ACTION_NONE: "ZES_DEVICE_ACTION_NONE", + pz.ZES_DEVICE_ACTION_WARM_CARD_RESET: "ZES_DEVICE_ACTION_WARM_CARD_RESET", + pz.ZES_DEVICE_ACTION_COLD_CARD_RESET: "ZES_DEVICE_ACTION_COLD_CARD_RESET", + pz.ZES_DEVICE_ACTION_COLD_SYSTEM_REBOOT: "ZES_DEVICE_ACTION_COLD_SYSTEM_REBOOT", + } + return action_map.get(action, f"UNKNOWN_DEVICE_ACTION_{action}") + + +def is_root_user(): + """Return whether the current user has root privileges on platforms that support it""" + geteuid = getattr(os, "geteuid", None) + return bool(geteuid and geteuid() == 0) + + +def check_rc_allow_action_required(label, rc): + """Accept ZE_RESULT_SUCCESS and ZE_RESULT_WARNING_ACTION_REQUIRED""" + if rc in (pz.ZE_RESULT_SUCCESS, pz.ZE_RESULT_WARNING_ACTION_REQUIRED): + return True + print(f"ERROR: {label} failed with ze_result_t={rc}") + return False + + def initialize_sysman_and_get_devices(): """Initialize Sysman and enumerate drivers/devices. Returns (drivers, driver_count, devices, device_count).""" if not initialize_sysman(): @@ -460,6 +576,165 @@ def test_global_operation(driver_handle, device_handle, device_index): return True +def test_pci_module(device_handle, device_index): + """Test PCI properties, state, and stats operations""" + print(f"\n---- Device {device_index} PCI Test ----") + + properties = pz.zes_pci_properties_t() + properties.stype = pz.ZES_STRUCTURE_TYPE_PCI_PROPERTIES + properties.pNext = None + + rc = pz.zesDevicePciGetProperties(device_handle, byref(properties)) + if not check_rc(f"zesDevicePciGetProperties(device {device_index})", rc): + return False + + print_verbose("PCI Properties:") + print_verbose(f" Domain: 0x{properties.address.domain:X}") + print_verbose(f" Bus: 0x{properties.address.bus:X}") + print_verbose(f" Device: 0x{properties.address.device:X}") + print_verbose(f" Function: 0x{properties.address.function:X}") + print_verbose(f" Max Gen: {properties.maxSpeed.gen}") + print_verbose(f" Max Width: {properties.maxSpeed.width}") + print_verbose(f" Max Bandwidth: {properties.maxSpeed.maxBandwidth}") + print_verbose( + f" Have Bandwidth Counters: {bool(properties.haveBandwidthCounters)}" + ) + print_verbose(f" Have Packet Counters: {bool(properties.havePacketCounters)}") + print_verbose(f" Have Replay Counters: {bool(properties.haveReplayCounters)}") + + state = pz.zes_pci_state_t() + state.stype = pz.ZES_STRUCTURE_TYPE_PCI_STATE + state.pNext = None + + rc = pz.zesDevicePciGetState(device_handle, byref(state)) + if not check_rc(f"zesDevicePciGetState(device {device_index})", rc): + return False + + print_verbose("PCI State:") + print_verbose(f" Link Status: {get_pci_link_status_string(state.status)}") + print_verbose( + f" Quality Issues: {get_pci_quality_issues_string(state.qualityIssues)}" + ) + print_verbose( + f" Stability Issues: {get_pci_stability_issues_string(state.stabilityIssues)}" + ) + print_verbose(f" Current Gen: {state.speed.gen}") + print_verbose(f" Current Width: {state.speed.width}") + print_verbose(f" Current Max Bandwidth: {state.speed.maxBandwidth}") + + stats = pz.zes_pci_stats_t() + rc = pz.zesDevicePciGetStats(device_handle, byref(stats)) + if not check_rc(f"zesDevicePciGetStats(device {device_index})", rc): + return False + + print_verbose("PCI Stats:") + if properties.haveReplayCounters: + print_verbose(f" Replay Counter: {stats.replayCounter}") + if properties.havePacketCounters: + print_verbose(f" Packet Counter: {stats.packetCounter}") + if properties.haveBandwidthCounters: + print_verbose(f" RX Counter: {stats.rxCounter}") + print_verbose(f" TX Counter: {stats.txCounter}") + print_verbose(f" Timestamp: {stats.timestamp}") + print_verbose(f" Current Gen: {stats.speed.gen}") + print_verbose(f" Current Width: {stats.speed.width}") + print_verbose(f" Current Max Bandwidth: {stats.speed.maxBandwidth}") + + return True + + +def test_ecc_module(device_handle, device_index): + """Test ECC availability, configurability, state, and set-state operations""" + print(f"\n---- Device {device_index} ECC Test ----") + + ecc_available = pz.ze_bool_t(0) + rc = pz.zesDeviceEccAvailable(device_handle, byref(ecc_available)) + if not check_rc(f"zesDeviceEccAvailable(device {device_index})", rc): + return False + + print_verbose(f"ECC Available: {bool(ecc_available.value)}") + if not ecc_available.value: + print_verbose("ECC is not available on this device") + return True + + ecc_configurable = pz.ze_bool_t(0) + rc = pz.zesDeviceEccConfigurable(device_handle, byref(ecc_configurable)) + if not check_rc(f"zesDeviceEccConfigurable(device {device_index})", rc): + return False + + print_verbose(f"ECC Configurable: {bool(ecc_configurable.value)}") + + get_state = pz.zes_device_ecc_properties_t() + get_state.stype = pz.ZES_STRUCTURE_TYPE_DEVICE_ECC_PROPERTIES + get_state.pNext = None + + rc = pz.zesDeviceGetEccState(device_handle, byref(get_state)) + if not check_rc(f"zesDeviceGetEccState(device {device_index})", rc): + return False + + print_verbose("ECC State:") + print_verbose(f" Current State: {get_ecc_state_string(get_state.currentState)}") + print_verbose(f" Pending State: {get_ecc_state_string(get_state.pendingState)}") + print_verbose( + f" Pending Action: {get_device_action_string(get_state.pendingAction)}" + ) + + if not ecc_configurable.value: + print_verbose("ECC is not configurable on this device") + return True + + if not is_root_user(): + print_verbose( + "Skipping zesDeviceSetEccState test due to insufficient permissions" + ) + return True + + restore_state = get_state.pendingState + if restore_state not in ( + pz.ZES_DEVICE_ECC_STATE_ENABLED, + pz.ZES_DEVICE_ECC_STATE_DISABLED, + ): + restore_state = get_state.currentState + + if restore_state == pz.ZES_DEVICE_ECC_STATE_ENABLED: + test_state = pz.ZES_DEVICE_ECC_STATE_DISABLED + else: + test_state = pz.ZES_DEVICE_ECC_STATE_ENABLED + + new_state = pz.zes_device_ecc_desc_t() + new_state.stype = pz.ZES_STRUCTURE_TYPE_DEVICE_ECC_DESC + new_state.pNext = None + new_state.state = test_state + + set_state = pz.zes_device_ecc_properties_t() + set_state.stype = pz.ZES_STRUCTURE_TYPE_DEVICE_ECC_PROPERTIES + set_state.pNext = None + + rc = pz.zesDeviceSetEccState(device_handle, byref(new_state), byref(set_state)) + if not check_rc_allow_action_required( + f"zesDeviceSetEccState(device {device_index}, test)", rc + ): + return False + + print_verbose("ECC Set State Result:") + print_verbose(f" Current State: {get_ecc_state_string(set_state.currentState)}") + print_verbose(f" Pending State: {get_ecc_state_string(set_state.pendingState)}") + print_verbose( + f" Pending Action: {get_device_action_string(set_state.pendingAction)}" + ) + + if restore_state != test_state: + new_state.state = restore_state + rc = pz.zesDeviceSetEccState(device_handle, byref(new_state), byref(set_state)) + if not check_rc_allow_action_required( + f"zesDeviceSetEccState(device {device_index}, restore)", rc + ): + return False + print_verbose("ECC configuration restored to original state") + + return True + + def test_device_processes(device_handle, device_index): """Test device processes state""" print(f"\n---- Device {device_index} Processes Test ----") @@ -645,7 +920,7 @@ def test_memory_modules(device_handle, device_index): def test_power_module(device_handle, device_index): - """Test power domain enumeration and energy counter operations""" + """Test power domain enumeration, properties, energy-derived power, and power limit extension operations""" print(f"\n---- Device {device_index} Power Domains Test ----") # Get power domain count @@ -672,37 +947,130 @@ def test_power_module(device_handle, device_index): for i in range(power_count.value): print_verbose(f"\n Power Domain {i}:") - # Test power energy counter - energy_counter = pz.zes_power_energy_counter_t() + default_limit = pz.zes_power_limit_ext_desc_t() + ext_properties = pz.zes_power_ext_properties_t() + ext_properties.stype = pz.ZES_STRUCTURE_TYPE_POWER_EXT_PROPERTIES + ext_properties.pNext = None + ext_properties.defaultLimit = pointer(default_limit) + + properties = pz.zes_power_properties_t() + properties.stype = pz.ZES_STRUCTURE_TYPE_POWER_PROPERTIES + properties.pNext = cast(pointer(ext_properties), c_void_p) - rc = pz.zesPowerGetEnergyCounter(power_handles[i], byref(energy_counter)) - if not check_rc(f"zesPowerGetEnergyCounter(power {i})", rc): + rc = pz.zesPowerGetProperties(power_handles[i], byref(properties)) + if not check_rc(f"zesPowerGetProperties(power {i})", rc): continue - print_verbose(" Energy Counter:") - print_verbose(f" Energy: {energy_counter.energy}") - print_verbose(f" Timestamp: {energy_counter.timestamp} microseconds") + print_verbose(" Power Properties:") + print_verbose(f" On Subdevice: {bool(properties.onSubdevice)}") + if properties.onSubdevice: + print_verbose(f" Subdevice ID: {properties.subdeviceId}") + print_verbose(f" Can Control: {bool(properties.canControl)}") + print_verbose( + f" Energy Threshold Supported: {bool(properties.isEnergyThresholdSupported)}" + ) + print_verbose(f" Default Limit: {properties.defaultLimit}") + print_verbose(f" Min Limit: {properties.minLimit}") + print_verbose(f" Max Limit: {properties.maxLimit}") + print_verbose(f" Domain: {get_power_domain_string(ext_properties.domain)}") + + limit_descs = None + limit_count = c_uint32(0) + if properties.onSubdevice: + print_verbose( + " Skipping power limit APIs for subdevice-scoped power domain" + ) + else: + rc = pz.zesPowerGetLimitsExt(power_handles[i], byref(limit_count), None) + if not check_rc(f"zesPowerGetLimitsExt(power {i}, count)", rc): + continue - # Take a second reading after a small delay - import time + print_verbose(f" Power Limit Descriptor Count: {limit_count.value}") - time.sleep(0.01) # 10ms delay + if limit_count.value > 0: + PowerLimitArray = pz.zes_power_limit_ext_desc_t * limit_count.value + limit_descs = PowerLimitArray() - energy_counter2 = pz.zes_power_energy_counter_t() - ret2 = pz.zesPowerGetEnergyCounter(power_handles[i], byref(energy_counter2)) - if ret2 == pz.ZE_RESULT_SUCCESS: - energy_delta = energy_counter2.energy - energy_counter.energy - time_delta = energy_counter2.timestamp - energy_counter.timestamp - if time_delta > 0: - print_verbose( - f" Energy Delta: {energy_delta} over {time_delta} microseconds" + for limit_index in range(limit_count.value): + limit_descs[limit_index].stype = ( + pz.ZES_STRUCTURE_TYPE_POWER_LIMIT_EXT_DESC + ) + limit_descs[limit_index].pNext = None + + rc = pz.zesPowerGetLimitsExt( + power_handles[i], byref(limit_count), limit_descs ) + if not check_rc(f"zesPowerGetLimitsExt(power {i}, descriptors)", rc): + continue + + print_verbose(" Power Limit Descriptors:") + for limit_index in range(limit_count.value): + limit_desc = limit_descs[limit_index] + print_verbose(f" Descriptor {limit_index}:") + print_verbose( + f" Level: {get_power_level_string(limit_desc.level)}" + ) + print_verbose( + f" Source: {get_power_source_string(limit_desc.source)}" + ) + print_verbose( + f" Limit Unit: {get_limit_unit_string(limit_desc.limitUnit)}" + ) + print_verbose( + f" Enabled State Locked: {bool(limit_desc.enabledStateLocked)}" + ) + print_verbose(f" Enabled: {bool(limit_desc.enabled)}") + print_verbose( + f" Interval Value Locked: {bool(limit_desc.intervalValueLocked)}" + ) + print_verbose(f" Interval: {limit_desc.interval}") + print_verbose( + f" Limit Value Locked: {bool(limit_desc.limitValueLocked)}" + ) + print_verbose(f" Limit: {limit_desc.limit}") + + energy_counter1 = pz.zes_power_energy_counter_t() + rc = pz.zesPowerGetEnergyCounter(power_handles[i], byref(energy_counter1)) + if not check_rc(f"zesPowerGetEnergyCounter(power {i}, first)", rc): + continue + + time.sleep(1) + + energy_counter2 = pz.zes_power_energy_counter_t() + rc = pz.zesPowerGetEnergyCounter(power_handles[i], byref(energy_counter2)) + if not check_rc(f"zesPowerGetEnergyCounter(power {i}, second)", rc): + continue + + energy_delta = energy_counter2.energy - energy_counter1.energy + time_delta = energy_counter2.timestamp - energy_counter1.timestamp + if time_delta > 0: + power_watt = energy_delta / time_delta + device_scope = "subDevice" if properties.onSubdevice else "rootDevice" + print_verbose(f" Current Power: {power_watt:.6f} W for {device_scope}") + else: + print_verbose(" Current Power: unavailable due to zero delta time") + + if properties.onSubdevice or limit_descs is None: + continue + + if not is_root_user(): + print_verbose( + " Skipping zesPowerSetLimitsExt due to insufficient permissions" + ) + continue + + set_count = c_uint32(limit_count.value) + rc = pz.zesPowerSetLimitsExt(power_handles[i], byref(set_count), limit_descs) + if not check_rc(f"zesPowerSetLimitsExt(power {i})", rc): + return False + + print_verbose(" Set power limit successfully") return True def test_frequency_domains(device_handle, device_index): - """Test frequency domain enumeration and state operations""" + """Test frequency domain enumeration and state, range, and clock operations""" print(f"\n---- Device {device_index} Frequency Domains Test ----") # Get frequency domain count @@ -733,6 +1101,60 @@ def test_frequency_domains(device_handle, device_index): for i in range(freq_count.value): print_verbose(f"\n Frequency Domain {i}:") + freq_properties = pz.zes_freq_properties_t() + freq_properties.stype = pz.ZES_STRUCTURE_TYPE_FREQ_PROPERTIES + freq_properties.pNext = None + + rc = pz.zesFrequencyGetProperties(freq_handles[i], byref(freq_properties)) + if not check_rc(f"zesFrequencyGetProperties(frequency {i})", rc): + continue + + print_verbose(" Frequency Properties:") + print_verbose( + f" Type: {get_frequency_domain_string(freq_properties.type)}" + ) + print_verbose(f" Can Control: {bool(freq_properties.canControl)}") + print_verbose( + " Throttle Event Supported: " + f"{bool(freq_properties.isThrottleEventSupported)}" + ) + print_verbose(f" Minimum Frequency: {freq_properties.min:.1f} MHz") + print_verbose(f" Maximum Frequency: {freq_properties.max:.1f} MHz") + if freq_properties.onSubdevice: + print_verbose(f" Subdevice ID: {freq_properties.subdeviceId}") + + available_clock_count = c_uint32(0) + rc = pz.zesFrequencyGetAvailableClocks( + freq_handles[i], byref(available_clock_count), None + ) + if not check_rc(f"zesFrequencyGetAvailableClocks(frequency {i}, count)", rc): + continue + + available_clocks = None + if available_clock_count.value > 0: + AvailableClockArray = c_double * available_clock_count.value + available_clocks = AvailableClockArray() + rc = pz.zesFrequencyGetAvailableClocks( + freq_handles[i], byref(available_clock_count), available_clocks + ) + if not check_rc( + f"zesFrequencyGetAvailableClocks(frequency {i}, clocks)", rc + ): + continue + + print_verbose(" Available Clocks:") + for clock_index in range(available_clock_count.value): + print_verbose(f" {available_clocks[clock_index]:.1f} MHz") + + freq_range = pz.zes_freq_range_t() + rc = pz.zesFrequencyGetRange(freq_handles[i], byref(freq_range)) + if not check_rc(f"zesFrequencyGetRange(frequency {i})", rc): + continue + + print_verbose(" Frequency Range:") + print_verbose(f" Min: {freq_range.min:.1f} MHz") + print_verbose(f" Max: {freq_range.max:.1f} MHz") + # Test frequency state freq_state = pz.zes_freq_state_t() freq_state.stype = pz.ZES_STRUCTURE_TYPE_FREQ_STATE @@ -772,6 +1194,49 @@ def test_frequency_domains(device_handle, device_index): f" Throttle Reasons: {get_throttle_reasons_string(freq_state.throttleReasons)}" ) + if available_clocks is None or available_clock_count.value == 0: + continue + + if not is_root_user(): + print_verbose( + " Skipping zesFrequencySetRange test due to insufficient permissions" + ) + continue + + original_range = pz.zes_freq_range_t() + original_range.min = freq_range.min + original_range.max = freq_range.max + + test_range = pz.zes_freq_range_t() + test_range.min = available_clocks[0] + test_range.max = available_clocks[0] + + rc = pz.zesFrequencySetRange(freq_handles[i], byref(test_range)) + if not check_rc(f"zesFrequencySetRange(frequency {i}, test)", rc): + return False + + verify_range = pz.zes_freq_range_t() + rc = pz.zesFrequencyGetRange(freq_handles[i], byref(verify_range)) + if not check_rc(f"zesFrequencyGetRange(frequency {i}, verify)", rc): + return False + + print_verbose(" Frequency Range After Set:") + print_verbose(f" Min: {verify_range.min:.1f} MHz") + print_verbose(f" Max: {verify_range.max:.1f} MHz") + + rc = pz.zesFrequencySetRange(freq_handles[i], byref(original_range)) + if not check_rc(f"zesFrequencySetRange(frequency {i}, restore)", rc): + return False + + restored_range = pz.zes_freq_range_t() + rc = pz.zesFrequencyGetRange(freq_handles[i], byref(restored_range)) + if not check_rc(f"zesFrequencyGetRange(frequency {i}, restored)", rc): + return False + + print_verbose(" Frequency Range Restored:") + print_verbose(f" Min: {restored_range.min:.1f} MHz") + print_verbose(f" Max: {restored_range.max:.1f} MHz") + return True @@ -901,6 +1366,12 @@ def run_all_tests(): # Test global device operations (properties and processes) test_global_operation(drivers[driver_idx], devices[device_idx], device_idx) + # Test PCI module + test_pci_module(devices[device_idx], device_idx) + + # Test ECC module + test_ecc_module(devices[device_idx], device_idx) + # Test memory modules test_memory_modules(devices[device_idx], device_idx) @@ -928,7 +1399,9 @@ def main(): %(prog)s -a # Run all tests %(prog)s -m # Memory tests only %(prog)s -g # Global operations (device properties and processes) only - %(prog)s -p # Power tests only + %(prog)s -p # PCI tests only + %(prog)s -C # ECC tests only + %(prog)s -o # Power tests only %(prog)s -f # Frequency tests only %(prog)s -t # Temperature tests only %(prog)s -e # Engine tests only @@ -947,8 +1420,10 @@ def main(): help="Run only global operations (device properties and processes)", ) parser.add_argument( - "-p", "--power", action="store_true", help="Run only power-related tests" + "-o", "--power", action="store_true", help="Run only power-related tests" ) + parser.add_argument("-p", "--pci", action="store_true", help="Run only PCI tests") + parser.add_argument("-C", "--ecc", action="store_true", help="Run only ECC tests") parser.add_argument( "-f", "--frequency", @@ -974,6 +1449,8 @@ def main(): specific_test = ( args.memory or getattr(args, "global", False) + or args.pci + or args.ecc or args.power or args.frequency or args.temperature @@ -1009,6 +1486,12 @@ def main(): if getattr(args, "global", False): test_global_operation(drivers[0], devices[device_idx], device_idx) + if args.pci: + test_pci_module(devices[device_idx], device_idx) + + if args.ecc: + test_ecc_module(devices[device_idx], device_idx) + if args.memory: test_memory_modules(devices[device_idx], device_idx) diff --git a/bindings/sysman/python/source/pyzes.py b/bindings/sysman/python/source/pyzes.py index 6b8c5097..5fcccc1a 100644 --- a/bindings/sysman/python/source/pyzes.py +++ b/bindings/sysman/python/source/pyzes.py @@ -67,7 +67,7 @@ def _LoadZeLibrary(): # load the library libName = "ze_loader" if sys.platform.startswith("linux"): - libName = "/usr/lib/x86_64-linux-gnu/lib" + libName + ".so.1" + libName = "/usr/lib/x86_64-linux-gnu/lib" + libName + ".so" else: # Try multiple common locations for Windows Intel GPU drivers possible_paths = [ @@ -224,6 +224,26 @@ class zes_engine_handle_t(c_void_p): ZES_POWER_DOMAIN_GPU = 5 ZES_POWER_DOMAIN_FORCE_UINT32 = 0x7FFFFFFF +zes_power_level_t = c_int32 +ZES_POWER_LEVEL_UNKNOWN = 0 +ZES_POWER_LEVEL_SUSTAINED = 1 +ZES_POWER_LEVEL_BURST = 2 +ZES_POWER_LEVEL_PEAK = 3 +ZES_POWER_LEVEL_INSTANTANEOUS = 4 +ZES_POWER_LEVEL_FORCE_UINT32 = 0x7FFFFFFF + +zes_power_source_t = c_int32 +ZES_POWER_SOURCE_ANY = 0 +ZES_POWER_SOURCE_MAINS = 1 +ZES_POWER_SOURCE_BATTERY = 2 +ZES_POWER_SOURCE_FORCE_UINT32 = 0x7FFFFFFF + +zes_limit_unit_t = c_int32 +ZES_LIMIT_UNIT_UNKNOWN = 0 +ZES_LIMIT_UNIT_CURRENT = 1 +ZES_LIMIT_UNIT_POWER = 2 +ZES_LIMIT_UNIT_FORCE_UINT32 = 0x7FFFFFFF + ## Frequency domain enums ## zes_freq_domain_t = c_int32 ZES_FREQ_DOMAIN_GPU = 0 @@ -340,6 +360,13 @@ class zes_engine_handle_t(c_void_p): # Structure type enum values ZES_STRUCTURE_TYPE_DEVICE_PROPERTIES = 0x1 +ZES_STRUCTURE_TYPE_PCI_PROPERTIES = 0x2 +ZES_STRUCTURE_TYPE_POWER_PROPERTIES = 0xD +ZES_STRUCTURE_TYPE_PCI_STATE = 0x17 +ZES_STRUCTURE_TYPE_DEVICE_ECC_DESC = 0x25 +ZES_STRUCTURE_TYPE_DEVICE_ECC_PROPERTIES = 0x26 +ZES_STRUCTURE_TYPE_POWER_LIMIT_EXT_DESC = 0x27 +ZES_STRUCTURE_TYPE_POWER_EXT_PROPERTIES = 0x28 ZES_STRUCTURE_TYPE_PROCESS_STATE = 0x16 ZES_STRUCTURE_TYPE_DEVICE_EXT_PROPERTIES = 0x2D # from zes_structure_type_t ZES_STRUCTURE_TYPE_SUBDEVICE_EXP_PROPERTIES = ( @@ -415,6 +442,117 @@ class zes_process_state_t(_PrintableStructure): _fmt_ = {"memSize": "%d bytes", "sharedSize": "%d bytes"} +## PCI structures ## +class zes_pci_address_t(_PrintableStructure): + _fields_ = [ + ("domain", c_uint32), + ("bus", c_uint32), + ("device", c_uint32), + ("function", c_uint32), + ] + + +class zes_pci_speed_t(_PrintableStructure): + _fields_ = [ + ("gen", c_int32), + ("width", c_int32), + ("maxBandwidth", c_int64), + ] + _fmt_ = {"maxBandwidth": "%d"} + + +class zes_pci_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("address", zes_pci_address_t), + ("maxSpeed", zes_pci_speed_t), + ("haveBandwidthCounters", ze_bool_t), + ("havePacketCounters", ze_bool_t), + ("haveReplayCounters", ze_bool_t), + ] + + +zes_pci_link_status_t = c_int32 +ZES_PCI_LINK_STATUS_UNKNOWN = 0 +ZES_PCI_LINK_STATUS_GOOD = 1 +ZES_PCI_LINK_STATUS_QUALITY_ISSUES = 2 +ZES_PCI_LINK_STATUS_STABILITY_ISSUES = 3 +ZES_PCI_LINK_STATUS_FORCE_UINT32 = 0x7FFFFFFF + +zes_pci_link_qual_issue_flags_t = c_uint32 +ZES_PCI_LINK_QUAL_ISSUE_FLAG_REPLAYS = 1 << 0 +ZES_PCI_LINK_QUAL_ISSUE_FLAG_SPEED = 1 << 1 +ZES_PCI_LINK_QUAL_ISSUE_FLAG_FORCE_UINT32 = 0x7FFFFFFF + +zes_pci_link_stab_issue_flags_t = c_uint32 +ZES_PCI_LINK_STAB_ISSUE_FLAG_RETRAINING = 1 << 0 +ZES_PCI_LINK_STAB_ISSUE_FLAG_FORCE_UINT32 = 0x7FFFFFFF + + +class zes_pci_state_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("status", zes_pci_link_status_t), + ("qualityIssues", zes_pci_link_qual_issue_flags_t), + ("stabilityIssues", zes_pci_link_stab_issue_flags_t), + ("speed", zes_pci_speed_t), + ] + _fmt_ = {"qualityIssues": "0x%08X", "stabilityIssues": "0x%08X"} + + +class zes_pci_stats_t(_PrintableStructure): + _fields_ = [ + ("timestamp", c_uint64), + ("replayCounter", c_uint64), + ("packetCounter", c_uint64), + ("rxCounter", c_uint64), + ("txCounter", c_uint64), + ("speed", zes_pci_speed_t), + ] + _fmt_ = { + "timestamp": "%d", + "replayCounter": "%d", + "packetCounter": "%d", + "rxCounter": "%d", + "txCounter": "%d", + } + + +## ECC enums and structures ## +zes_device_ecc_state_t = c_int32 +ZES_DEVICE_ECC_STATE_UNAVAILABLE = 0 +ZES_DEVICE_ECC_STATE_ENABLED = 1 +ZES_DEVICE_ECC_STATE_DISABLED = 2 +ZES_DEVICE_ECC_STATE_FORCE_UINT32 = 0x7FFFFFFF + +zes_device_action_t = c_int32 +ZES_DEVICE_ACTION_NONE = 0 +ZES_DEVICE_ACTION_WARM_CARD_RESET = 1 +ZES_DEVICE_ACTION_COLD_CARD_RESET = 2 +ZES_DEVICE_ACTION_COLD_SYSTEM_REBOOT = 3 +ZES_DEVICE_ACTION_FORCE_UINT32 = 0x7FFFFFFF + + +class zes_device_ecc_desc_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("state", zes_device_ecc_state_t), + ] + + +class zes_device_ecc_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("currentState", zes_device_ecc_state_t), + ("pendingState", zes_device_ecc_state_t), + ("pendingAction", zes_device_action_t), + ] + + ## Sysman zes_uuid_t ## class zes_uuid_t(_PrintableStructure): _fields_ = [("id", c_ubyte * ZES_MAX_UUID_SIZE)] @@ -487,6 +625,45 @@ class zes_mem_bandwidth_t(_PrintableStructure): ## Power structures ## +class zes_power_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("onSubdevice", ze_bool_t), + ("subdeviceId", c_uint32), + ("canControl", ze_bool_t), + ("isEnergyThresholdSupported", ze_bool_t), + ("defaultLimit", c_int32), + ("minLimit", c_int32), + ("maxLimit", c_int32), + ] + + +class zes_power_limit_ext_desc_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("level", zes_power_level_t), + ("source", zes_power_source_t), + ("limitUnit", zes_limit_unit_t), + ("enabledStateLocked", ze_bool_t), + ("enabled", ze_bool_t), + ("intervalValueLocked", ze_bool_t), + ("interval", c_int32), + ("limitValueLocked", ze_bool_t), + ("limit", c_int32), + ] + + +class zes_power_ext_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("domain", zes_power_domain_t), + ("defaultLimit", POINTER(zes_power_limit_ext_desc_t)), + ] + + class zes_power_energy_counter_t(_PrintableStructure): _fields_ = [ ("energy", c_uint64), # monotonic energy counter in microjoules @@ -496,6 +673,29 @@ class zes_power_energy_counter_t(_PrintableStructure): ## Frequency structures ## +class zes_freq_properties_t(_PrintableStructure): + _fields_ = [ + ("stype", c_int32), + ("pNext", c_void_p), + ("type", zes_freq_domain_t), + ("onSubdevice", ze_bool_t), + ("subdeviceId", c_uint32), + ("canControl", ze_bool_t), + ("isThrottleEventSupported", ze_bool_t), + ("min", c_double), + ("max", c_double), + ] + _fmt_ = {"min": "%.1f MHz", "max": "%.1f MHz"} + + +class zes_freq_range_t(_PrintableStructure): + _fields_ = [ + ("min", c_double), + ("max", c_double), + ] + _fmt_ = {"min": "%.1f MHz", "max": "%.1f MHz"} + + class zes_freq_state_t(_PrintableStructure): _fields_ = [ ("stype", c_int32), # ZES_STRUCTURE_TYPE_FREQ_STATE @@ -517,6 +717,14 @@ class zes_freq_state_t(_PrintableStructure): } +class zes_freq_throttle_time_t(_PrintableStructure): + _fields_ = [ + ("throttleTime", c_uint64), + ("timestamp", c_uint64), + ] + _fmt_ = {"throttleTime": "%d", "timestamp": "%d microseconds"} + + ## Temperature structures ## class zes_temp_properties_t(_PrintableStructure): _fields_ = [ @@ -652,6 +860,135 @@ def zesDeviceGetProperties(hDevice, pProperties): return retVal +## PCI management functions ## +def zesDevicePciGetProperties(hDevice, pProperties): + """Wraps API: + ze_result_t zesDevicePciGetProperties(zes_device_handle_t hDevice, zes_pci_properties_t* pProperties) + + Parameters: + hDevice: device handle + pProperties: POINTER(zes_pci_properties_t) - PCI properties structure to fill + Returns: + ze_result_t - return code only, properties are filled into pProperties + """ + funcPtr = getFunctionPointerList("zesDevicePciGetProperties") + funcPtr.argtypes = [zes_device_handle_t, POINTER(zes_pci_properties_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pProperties) + return retVal + + +def zesDevicePciGetState(hDevice, pState): + """Wraps API: + ze_result_t zesDevicePciGetState(zes_device_handle_t hDevice, zes_pci_state_t* pState) + + Parameters: + hDevice: device handle + pState: POINTER(zes_pci_state_t) - PCI state structure to fill + Returns: + ze_result_t - return code only, state is filled into pState + """ + funcPtr = getFunctionPointerList("zesDevicePciGetState") + funcPtr.argtypes = [zes_device_handle_t, POINTER(zes_pci_state_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pState) + return retVal + + +def zesDevicePciGetStats(hDevice, pStats): + """Wraps API: + ze_result_t zesDevicePciGetStats(zes_device_handle_t hDevice, zes_pci_stats_t* pStats) + + Parameters: + hDevice: device handle + pStats: POINTER(zes_pci_stats_t) - PCI stats structure to fill + Returns: + ze_result_t - return code only, stats are filled into pStats + """ + funcPtr = getFunctionPointerList("zesDevicePciGetStats") + funcPtr.argtypes = [zes_device_handle_t, POINTER(zes_pci_stats_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pStats) + return retVal + + +## ECC management functions ## +def zesDeviceEccAvailable(hDevice, pAvailable): + """Wraps API: + ze_result_t zesDeviceEccAvailable(zes_device_handle_t hDevice, ze_bool_t* pAvailable) + + Parameters: + hDevice: device handle + pAvailable: POINTER(ze_bool_t) - ECC availability flag to fill + Returns: + ze_result_t - return code only, availability is filled into pAvailable + """ + funcPtr = getFunctionPointerList("zesDeviceEccAvailable") + funcPtr.argtypes = [zes_device_handle_t, POINTER(ze_bool_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pAvailable) + return retVal + + +def zesDeviceEccConfigurable(hDevice, pConfigurable): + """Wraps API: + ze_result_t zesDeviceEccConfigurable(zes_device_handle_t hDevice, ze_bool_t* pConfigurable) + + Parameters: + hDevice: device handle + pConfigurable: POINTER(ze_bool_t) - ECC configurability flag to fill + Returns: + ze_result_t - return code only, configurability is filled into pConfigurable + """ + funcPtr = getFunctionPointerList("zesDeviceEccConfigurable") + funcPtr.argtypes = [zes_device_handle_t, POINTER(ze_bool_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pConfigurable) + return retVal + + +def zesDeviceGetEccState(hDevice, pState): + """Wraps API: + ze_result_t zesDeviceGetEccState(zes_device_handle_t hDevice, zes_device_ecc_properties_t* pState) + + Parameters: + hDevice: device handle + pState: POINTER(zes_device_ecc_properties_t) - ECC state structure to fill + Returns: + ze_result_t - return code only, ECC state is filled into pState + """ + funcPtr = getFunctionPointerList("zesDeviceGetEccState") + funcPtr.argtypes = [zes_device_handle_t, POINTER(zes_device_ecc_properties_t)] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, pState) + return retVal + + +def zesDeviceSetEccState(hDevice, newState, pState): + """Wraps API: + ze_result_t zesDeviceSetEccState( + zes_device_handle_t hDevice, + const zes_device_ecc_desc_t* newState, + zes_device_ecc_properties_t* pState) + + Parameters: + hDevice: device handle + newState: POINTER(zes_device_ecc_desc_t) - desired ECC state descriptor + pState: POINTER(zes_device_ecc_properties_t) - resulting ECC state structure to fill + Returns: + ze_result_t - return code only, ECC state is filled into pState + """ + funcPtr = getFunctionPointerList("zesDeviceSetEccState") + funcPtr.argtypes = [ + zes_device_handle_t, + POINTER(zes_device_ecc_desc_t), + POINTER(zes_device_ecc_properties_t), + ] + funcPtr.restype = ze_result_t + retVal = funcPtr(hDevice, newState, pState) + return retVal + + def zesDeviceGetSubDevicePropertiesExp(hDevice, pCount, pSubdeviceProps): """Wraps API: ze_result_t zesDeviceGetSubDevicePropertiesExp( @@ -856,6 +1193,78 @@ def zesPowerGetEnergyCounter(hPower, pEnergy): return retVal +def zesPowerGetProperties(hPower, pProperties): + """Wraps API: + ze_result_t zesPowerGetProperties( + zes_pwr_handle_t hPower, + zes_power_properties_t* pProperties) + + Parameters: + hPower: power handle + pProperties: POINTER(zes_power_properties_t) - power properties structure to fill + Returns: + ze_result_t - return code only, power properties are filled into pProperties + """ + funcPtr = getFunctionPointerList("zesPowerGetProperties") + funcPtr.argtypes = [zes_pwr_handle_t, POINTER(zes_power_properties_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hPower, pProperties) + return retVal + + +def zesPowerGetLimitsExt(hPower, pCount, pSustained): + """Wraps API: + ze_result_t zesPowerGetLimitsExt( + zes_pwr_handle_t hPower, + uint32_t* pCount, + zes_power_limit_ext_desc_t* pSustained) + + Parameters: + hPower: power handle + pCount: POINTER(c_uint32) + pSustained: POINTER(zes_power_limit_ext_desc_t) or None + Returns: + ze_result_t - return code only, power limit descriptors are filled into pSustained + """ + funcPtr = getFunctionPointerList("zesPowerGetLimitsExt") + funcPtr.argtypes = [ + zes_pwr_handle_t, + POINTER(c_uint32), + POINTER(zes_power_limit_ext_desc_t), + ] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hPower, pCount, pSustained) + return retVal + + +def zesPowerSetLimitsExt(hPower, pCount, pSustained): + """Wraps API: + ze_result_t zesPowerSetLimitsExt( + zes_pwr_handle_t hPower, + uint32_t* pCount, + zes_power_limit_ext_desc_t* pSustained) + + Parameters: + hPower: power handle + pCount: POINTER(c_uint32) + pSustained: POINTER(zes_power_limit_ext_desc_t) or None + Returns: + ze_result_t - return code only + """ + funcPtr = getFunctionPointerList("zesPowerSetLimitsExt") + funcPtr.argtypes = [ + zes_pwr_handle_t, + POINTER(c_uint32), + POINTER(zes_power_limit_ext_desc_t), + ] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hPower, pCount, pSustained) + return retVal + + ## Frequency module functions ## def zesDeviceEnumFrequencyDomains(hDevice, pCount, phFrequency): """Wraps API: @@ -880,6 +1289,26 @@ def zesDeviceEnumFrequencyDomains(hDevice, pCount, phFrequency): return retVal +def zesFrequencyGetProperties(hFrequency, pProperties): + """Wraps API: + ze_result_t zesFrequencyGetProperties( + zes_freq_handle_t hFrequency, + zes_freq_properties_t* pProperties) + + Parameters: + hFrequency: frequency handle + pProperties: POINTER(zes_freq_properties_t) - properties structure to fill + Returns: + ze_result_t - return code only, properties are filled into pProperties + """ + funcPtr = getFunctionPointerList("zesFrequencyGetProperties") + funcPtr.argtypes = [zes_freq_handle_t, POINTER(zes_freq_properties_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hFrequency, pProperties) + return retVal + + def zesFrequencyGetState(hFrequency, pState): """Wraps API: ze_result_t zesFrequencyGetState( @@ -900,6 +1329,88 @@ def zesFrequencyGetState(hFrequency, pState): return retVal +def zesFrequencyGetAvailableClocks(hFrequency, pCount, phFrequency): + """Wraps API: + ze_result_t zesFrequencyGetAvailableClocks( + zes_freq_handle_t hFrequency, + uint32_t* pCount, + double* phFrequency) + + Parameters: + hFrequency: frequency handle + pCount: POINTER(c_uint32) + phFrequency: POINTER(c_double) or None + Returns: + ze_result_t - return code only, available clocks are filled into phFrequency + """ + funcPtr = getFunctionPointerList("zesFrequencyGetAvailableClocks") + funcPtr.argtypes = [zes_freq_handle_t, POINTER(c_uint32), POINTER(c_double)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hFrequency, pCount, phFrequency) + return retVal + + +def zesFrequencyGetRange(hFrequency, pLimits): + """Wraps API: + ze_result_t zesFrequencyGetRange( + zes_freq_handle_t hFrequency, + zes_freq_range_t* pLimits) + + Parameters: + hFrequency: frequency handle + pLimits: POINTER(zes_freq_range_t) + Returns: + ze_result_t - return code only, frequency range is filled into pLimits + """ + funcPtr = getFunctionPointerList("zesFrequencyGetRange") + funcPtr.argtypes = [zes_freq_handle_t, POINTER(zes_freq_range_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hFrequency, pLimits) + return retVal + + +def zesFrequencySetRange(hFrequency, pLimits): + """Wraps API: + ze_result_t zesFrequencySetRange( + zes_freq_handle_t hFrequency, + const zes_freq_range_t* pLimits) + + Parameters: + hFrequency: frequency handle + pLimits: POINTER(zes_freq_range_t) + Returns: + ze_result_t - return code only + """ + funcPtr = getFunctionPointerList("zesFrequencySetRange") + funcPtr.argtypes = [zes_freq_handle_t, POINTER(zes_freq_range_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hFrequency, pLimits) + return retVal + + +def zesFrequencyGetThrottleTime(hFrequency, pThrottleTime): + """Wraps API: + ze_result_t zesFrequencyGetThrottleTime( + zes_freq_handle_t hFrequency, + zes_freq_throttle_time_t* pThrottleTime) + + Parameters: + hFrequency: frequency handle + pThrottleTime: POINTER(zes_freq_throttle_time_t) + Returns: + ze_result_t - return code only, throttle time is filled into pThrottleTime + """ + funcPtr = getFunctionPointerList("zesFrequencyGetThrottleTime") + funcPtr.argtypes = [zes_freq_handle_t, POINTER(zes_freq_throttle_time_t)] + funcPtr.restype = ze_result_t + + retVal = funcPtr(hFrequency, pThrottleTime) + return retVal + + ## Temperature sensor functions ## def zesDeviceEnumTemperatureSensors(hDevice, pCount, phTemperature): """Wraps API: diff --git a/bindings/sysman/python/test/unit_tests/test_ecc.py b/bindings/sysman/python/test/unit_tests/test_ecc.py new file mode 100644 index 00000000..be2dd391 --- /dev/null +++ b/bindings/sysman/python/test/unit_tests/test_ecc.py @@ -0,0 +1,138 @@ +## +# Copyright (C) 2026 Intel Corporation +# +# SPDX-License-Identifier: MIT +# +## + +import os +import sys +import unittest +from ctypes import * +from unittest.mock import MagicMock, patch + +# Add the source directory to Python path so we can import pyzes +script_dir = os.path.dirname(os.path.abspath(__file__)) +source_dir = os.path.join(script_dir, "..", "..", "source") +source_dir = os.path.abspath(source_dir) +if source_dir not in sys.path: + sys.path.insert(0, source_dir) + + +@patch("pyzes.getFunctionPointerList") +class TestEccFunctions(unittest.TestCase): + def setUp(self): + import pyzes + + self.pyzes = pyzes + + def test_GivenValidDeviceHandleWhenCallingZesDeviceEccAvailableThenCallSucceedsWithAvailability( + self, mock_get_func + ): + mock_available = 1 + + def mock_ecc_available(device_handle, available_ptr): + available_ptr._obj.value = mock_available + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_ecc_available) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + available = self.pyzes.ze_bool_t(0) + + result = self.pyzes.zesDeviceEccAvailable(device_handle, byref(available)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(available.value, mock_available) + mock_get_func.assert_called_with("zesDeviceEccAvailable") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDeviceEccConfigurableThenCallSucceedsWithConfigurability( + self, mock_get_func + ): + mock_configurable = 1 + + def mock_ecc_configurable(device_handle, configurable_ptr): + configurable_ptr._obj.value = mock_configurable + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_ecc_configurable) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + configurable = self.pyzes.ze_bool_t(0) + + result = self.pyzes.zesDeviceEccConfigurable(device_handle, byref(configurable)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(configurable.value, mock_configurable) + mock_get_func.assert_called_with("zesDeviceEccConfigurable") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDeviceGetEccStateThenCallSucceedsWithState( + self, mock_get_func + ): + mock_current_state = self.pyzes.ZES_DEVICE_ECC_STATE_ENABLED + mock_pending_state = self.pyzes.ZES_DEVICE_ECC_STATE_DISABLED + mock_pending_action = self.pyzes.ZES_DEVICE_ACTION_WARM_CARD_RESET + + def mock_get_ecc_state(device_handle, state_ptr): + state_ptr._obj.currentState = mock_current_state + state_ptr._obj.pendingState = mock_pending_state + state_ptr._obj.pendingAction = mock_pending_action + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_ecc_state) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + ecc_state = self.pyzes.zes_device_ecc_properties_t() + + result = self.pyzes.zesDeviceGetEccState(device_handle, byref(ecc_state)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(ecc_state.currentState, mock_current_state) + self.assertEqual(ecc_state.pendingState, mock_pending_state) + self.assertEqual(ecc_state.pendingAction, mock_pending_action) + mock_get_func.assert_called_with("zesDeviceGetEccState") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDeviceSetEccStateThenCallSucceedsWithUpdatedState( + self, mock_get_func + ): + requested_state = self.pyzes.ZES_DEVICE_ECC_STATE_ENABLED + mock_current_state = self.pyzes.ZES_DEVICE_ECC_STATE_DISABLED + mock_pending_state = self.pyzes.ZES_DEVICE_ECC_STATE_ENABLED + mock_pending_action = self.pyzes.ZES_DEVICE_ACTION_COLD_CARD_RESET + + def mock_set_ecc_state(device_handle, new_state_ptr, state_ptr): + self.assertEqual(new_state_ptr._obj.state, requested_state) + state_ptr._obj.currentState = mock_current_state + state_ptr._obj.pendingState = mock_pending_state + state_ptr._obj.pendingAction = mock_pending_action + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_set_ecc_state) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + new_state = self.pyzes.zes_device_ecc_desc_t() + new_state.stype = self.pyzes.ZES_STRUCTURE_TYPE_DEVICE_ECC_DESC + new_state.state = requested_state + ecc_state = self.pyzes.zes_device_ecc_properties_t() + + result = self.pyzes.zesDeviceSetEccState( + device_handle, byref(new_state), byref(ecc_state) + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(ecc_state.currentState, mock_current_state) + self.assertEqual(ecc_state.pendingState, mock_pending_state) + self.assertEqual(ecc_state.pendingAction, mock_pending_action) + mock_get_func.assert_called_with("zesDeviceSetEccState") + mock_func.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/bindings/sysman/python/test/unit_tests/test_frequency.py b/bindings/sysman/python/test/unit_tests/test_frequency.py index 2df47743..f9c9c885 100644 --- a/bindings/sysman/python/test/unit_tests/test_frequency.py +++ b/bindings/sysman/python/test/unit_tests/test_frequency.py @@ -21,7 +21,6 @@ @patch("pyzes.getFunctionPointerList") class TestFrequencyFunctions(unittest.TestCase): - def setUp(self): import pyzes @@ -51,6 +50,52 @@ def mock_enum_frequency_domains(device_handle, count_ptr, handles_ptr): mock_get_func.assert_called_with("zesDeviceEnumFrequencyDomains") mock_func.assert_called_once() + def test_GivenValidFrequencyHandleWhenCallingZesFrequencyGetPropertiesThenCallSucceedsWithProperties( + self, mock_get_func + ): + mock_type = self.pyzes.ZES_FREQ_DOMAIN_GPU + mock_on_subdevice = True + mock_subdevice_id = 1 + mock_can_control = True + mock_is_throttle_event_supported = True + mock_min = 300.0 + mock_max = 1800.0 + + def mock_get_properties(frequency_handle, properties_ptr): + properties_ptr._obj.type = mock_type + properties_ptr._obj.onSubdevice = mock_on_subdevice + properties_ptr._obj.subdeviceId = mock_subdevice_id + properties_ptr._obj.canControl = mock_can_control + properties_ptr._obj.isThrottleEventSupported = ( + mock_is_throttle_event_supported + ) + properties_ptr._obj.min = mock_min + properties_ptr._obj.max = mock_max + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_properties) + mock_get_func.return_value = mock_func + + frequency_handle = self.pyzes.zes_freq_handle_t() + properties = self.pyzes.zes_freq_properties_t() + + result = self.pyzes.zesFrequencyGetProperties( + frequency_handle, byref(properties) + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(properties.type, mock_type) + self.assertEqual(properties.onSubdevice, mock_on_subdevice) + self.assertEqual(properties.subdeviceId, mock_subdevice_id) + self.assertEqual(properties.canControl, mock_can_control) + self.assertEqual( + properties.isThrottleEventSupported, mock_is_throttle_event_supported + ) + self.assertEqual(properties.min, mock_min) + self.assertEqual(properties.max, mock_max) + mock_get_func.assert_called_with("zesFrequencyGetProperties") + mock_func.assert_called_once() + def test_GivenValidFrequencyHandleWhenCallingZesFrequencyGetStateThenCallSucceedsWithState( self, mock_get_func ): @@ -87,6 +132,109 @@ def mock_get_state(frequency_handle, state_ptr): mock_get_func.assert_called_with("zesFrequencyGetState") mock_func.assert_called_once() + def test_GivenValidFrequencyHandleWhenCallingZesFrequencyGetAvailableClocksThenCallSucceedsWithClockList( + self, mock_get_func + ): + mock_clocks = [300.0, 600.0, 1200.0] + + def mock_get_available_clocks(frequency_handle, count_ptr, clocks_ptr): + count_ptr._obj.value = len(mock_clocks) + if clocks_ptr: + for index, clock in enumerate(mock_clocks): + clocks_ptr[index] = clock + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_available_clocks) + mock_get_func.return_value = mock_func + + frequency_handle = self.pyzes.zes_freq_handle_t() + count = c_uint32(len(mock_clocks)) + clock_array = (c_double * len(mock_clocks))() + + result = self.pyzes.zesFrequencyGetAvailableClocks( + frequency_handle, byref(count), clock_array + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, len(mock_clocks)) + self.assertEqual(list(clock_array), mock_clocks) + mock_get_func.assert_called_with("zesFrequencyGetAvailableClocks") + mock_func.assert_called_once() + + def test_GivenValidFrequencyHandleWhenCallingZesFrequencyGetRangeThenCallSucceedsWithRange( + self, mock_get_func + ): + mock_min = 400.0 + mock_max = 1700.0 + + def mock_get_range(frequency_handle, range_ptr): + range_ptr._obj.min = mock_min + range_ptr._obj.max = mock_max + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_range) + mock_get_func.return_value = mock_func + + frequency_handle = self.pyzes.zes_freq_handle_t() + freq_range = self.pyzes.zes_freq_range_t() + + result = self.pyzes.zesFrequencyGetRange(frequency_handle, byref(freq_range)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(freq_range.min, mock_min) + self.assertEqual(freq_range.max, mock_max) + mock_get_func.assert_called_with("zesFrequencyGetRange") + mock_func.assert_called_once() + + def test_GivenValidFrequencyHandleWhenCallingZesFrequencySetRangeThenCallSucceeds( + self, mock_get_func + ): + def mock_set_range(frequency_handle, range_ptr): + self.assertEqual(range_ptr._obj.min, 500.0) + self.assertEqual(range_ptr._obj.max, 1500.0) + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_set_range) + mock_get_func.return_value = mock_func + + frequency_handle = self.pyzes.zes_freq_handle_t() + freq_range = self.pyzes.zes_freq_range_t() + freq_range.min = 500.0 + freq_range.max = 1500.0 + + result = self.pyzes.zesFrequencySetRange(frequency_handle, byref(freq_range)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + mock_get_func.assert_called_with("zesFrequencySetRange") + mock_func.assert_called_once() + + def test_GivenValidFrequencyHandleWhenCallingZesFrequencyGetThrottleTimeThenCallSucceedsWithThrottleTime( + self, mock_get_func + ): + mock_throttle_time = 12345 + mock_timestamp = 67890 + + def mock_get_throttle_time(frequency_handle, throttle_time_ptr): + throttle_time_ptr._obj.throttleTime = mock_throttle_time + throttle_time_ptr._obj.timestamp = mock_timestamp + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_throttle_time) + mock_get_func.return_value = mock_func + + frequency_handle = self.pyzes.zes_freq_handle_t() + throttle_time = self.pyzes.zes_freq_throttle_time_t() + + result = self.pyzes.zesFrequencyGetThrottleTime( + frequency_handle, byref(throttle_time) + ) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(throttle_time.throttleTime, mock_throttle_time) + self.assertEqual(throttle_time.timestamp, mock_timestamp) + mock_get_func.assert_called_with("zesFrequencyGetThrottleTime") + mock_func.assert_called_once() + if __name__ == "__main__": unittest.main() diff --git a/bindings/sysman/python/test/unit_tests/test_global_operations.py b/bindings/sysman/python/test/unit_tests/test_global_operations.py index da6e2460..375c46d4 100644 --- a/bindings/sysman/python/test/unit_tests/test_global_operations.py +++ b/bindings/sysman/python/test/unit_tests/test_global_operations.py @@ -21,7 +21,6 @@ @patch("pyzes.getFunctionPointerList") class TestGlobalOperations(unittest.TestCase): - def setUp(self): import pyzes @@ -102,6 +101,141 @@ def mock_get_properties(device_handle, properties_ptr): mock_get_func.assert_called_with("zesDeviceGetProperties") mock_func.assert_called_once() + def test_GivenValidDeviceHandleWhenCallingZesDevicePciGetPropertiesThenCallSucceedsWithValidProperties( + self, mock_get_func + ): + mock_domain = 0 + mock_bus = 3 + mock_device = 0x1F + mock_function = 0 + mock_gen = 5 + mock_width = 16 + mock_max_bandwidth = 63_015_384_000 + mock_have_bandwidth_counters = 1 + mock_have_packet_counters = 1 + mock_have_replay_counters = 0 + + def mock_get_pci_properties(device_handle, properties_ptr): + properties_ptr._obj.address.domain = mock_domain + properties_ptr._obj.address.bus = mock_bus + properties_ptr._obj.address.device = mock_device + properties_ptr._obj.address.function = mock_function + properties_ptr._obj.maxSpeed.gen = mock_gen + properties_ptr._obj.maxSpeed.width = mock_width + properties_ptr._obj.maxSpeed.maxBandwidth = mock_max_bandwidth + properties_ptr._obj.haveBandwidthCounters = mock_have_bandwidth_counters + properties_ptr._obj.havePacketCounters = mock_have_packet_counters + properties_ptr._obj.haveReplayCounters = mock_have_replay_counters + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_pci_properties) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + pci_props = self.pyzes.zes_pci_properties_t() + + result = self.pyzes.zesDevicePciGetProperties(device_handle, byref(pci_props)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(pci_props.address.domain, mock_domain) + self.assertEqual(pci_props.address.bus, mock_bus) + self.assertEqual(pci_props.address.device, mock_device) + self.assertEqual(pci_props.address.function, mock_function) + self.assertEqual(pci_props.maxSpeed.gen, mock_gen) + self.assertEqual(pci_props.maxSpeed.width, mock_width) + self.assertEqual(pci_props.maxSpeed.maxBandwidth, mock_max_bandwidth) + self.assertEqual(pci_props.haveBandwidthCounters, mock_have_bandwidth_counters) + self.assertEqual(pci_props.havePacketCounters, mock_have_packet_counters) + self.assertEqual(pci_props.haveReplayCounters, mock_have_replay_counters) + + mock_get_func.assert_called_with("zesDevicePciGetProperties") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDevicePciGetStatsThenCallSucceedsWithValidStats( + self, mock_get_func + ): + mock_timestamp = 123456789 + mock_replay_counter = 7 + mock_packet_counter = 19 + mock_rx_counter = 4096 + mock_tx_counter = 8192 + mock_gen = 4 + mock_width = 8 + mock_max_bandwidth = 16_000_000_000 + + def mock_get_pci_stats(device_handle, stats_ptr): + stats_ptr._obj.timestamp = mock_timestamp + stats_ptr._obj.replayCounter = mock_replay_counter + stats_ptr._obj.packetCounter = mock_packet_counter + stats_ptr._obj.rxCounter = mock_rx_counter + stats_ptr._obj.txCounter = mock_tx_counter + stats_ptr._obj.speed.gen = mock_gen + stats_ptr._obj.speed.width = mock_width + stats_ptr._obj.speed.maxBandwidth = mock_max_bandwidth + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_pci_stats) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + pci_stats = self.pyzes.zes_pci_stats_t() + + result = self.pyzes.zesDevicePciGetStats(device_handle, byref(pci_stats)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(pci_stats.timestamp, mock_timestamp) + self.assertEqual(pci_stats.replayCounter, mock_replay_counter) + self.assertEqual(pci_stats.packetCounter, mock_packet_counter) + self.assertEqual(pci_stats.rxCounter, mock_rx_counter) + self.assertEqual(pci_stats.txCounter, mock_tx_counter) + self.assertEqual(pci_stats.speed.gen, mock_gen) + self.assertEqual(pci_stats.speed.width, mock_width) + self.assertEqual(pci_stats.speed.maxBandwidth, mock_max_bandwidth) + + mock_get_func.assert_called_with("zesDevicePciGetStats") + mock_func.assert_called_once() + + def test_GivenValidDeviceHandleWhenCallingZesDevicePciGetStateThenCallSucceedsWithValidState( + self, mock_get_func + ): + mock_status = self.pyzes.ZES_PCI_LINK_STATUS_QUALITY_ISSUES + mock_quality_issues = ( + self.pyzes.ZES_PCI_LINK_QUAL_ISSUE_FLAG_REPLAYS + | self.pyzes.ZES_PCI_LINK_QUAL_ISSUE_FLAG_SPEED + ) + mock_stability_issues = self.pyzes.ZES_PCI_LINK_STAB_ISSUE_FLAG_RETRAINING + mock_gen = 4 + mock_width = 8 + mock_max_bandwidth = 16_000_000_000 + + def mock_get_pci_state(device_handle, state_ptr): + state_ptr._obj.status = mock_status + state_ptr._obj.qualityIssues = mock_quality_issues + state_ptr._obj.stabilityIssues = mock_stability_issues + state_ptr._obj.speed.gen = mock_gen + state_ptr._obj.speed.width = mock_width + state_ptr._obj.speed.maxBandwidth = mock_max_bandwidth + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_pci_state) + mock_get_func.return_value = mock_func + + device_handle = self.pyzes.zes_device_handle_t() + pci_state = self.pyzes.zes_pci_state_t() + + result = self.pyzes.zesDevicePciGetState(device_handle, byref(pci_state)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(pci_state.status, mock_status) + self.assertEqual(pci_state.qualityIssues, mock_quality_issues) + self.assertEqual(pci_state.stabilityIssues, mock_stability_issues) + self.assertEqual(pci_state.speed.gen, mock_gen) + self.assertEqual(pci_state.speed.width, mock_width) + self.assertEqual(pci_state.speed.maxBandwidth, mock_max_bandwidth) + + mock_get_func.assert_called_with("zesDevicePciGetState") + mock_func.assert_called_once() + def test_GivenValidDeviceHandleWhenCallingZesDeviceProcessesGetStateThenCallSucceedsWithProcessCount( self, mock_get_func ): diff --git a/bindings/sysman/python/test/unit_tests/test_power.py b/bindings/sysman/python/test/unit_tests/test_power.py index b4b98734..a9094205 100644 --- a/bindings/sysman/python/test/unit_tests/test_power.py +++ b/bindings/sysman/python/test/unit_tests/test_power.py @@ -21,7 +21,6 @@ @patch("pyzes.getFunctionPointerList") class TestPowerFunctions(unittest.TestCase): - def setUp(self): import pyzes @@ -74,6 +73,135 @@ def mock_get_energy(power_handle, energy_ptr): mock_get_func.assert_called_with("zesPowerGetEnergyCounter") mock_func.assert_called_once() + def test_GivenValidPowerHandleWhenCallingZesPowerGetPropertiesThenCallSucceedsWithDomainData( + self, mock_get_func + ): + def mock_get_properties(power_handle, properties_ptr): + properties = properties_ptr._obj + properties.onSubdevice = 1 + properties.subdeviceId = 3 + properties.canControl = 1 + properties.isEnergyThresholdSupported = 1 + properties.defaultLimit = 250000 + properties.minLimit = 150000 + properties.maxLimit = 300000 + + ext_properties_ptr = cast( + properties.pNext, POINTER(self.pyzes.zes_power_ext_properties_t) + ) + ext_properties_ptr.contents.domain = self.pyzes.ZES_POWER_DOMAIN_GPU + ext_properties_ptr.contents.defaultLimit.contents.level = ( + self.pyzes.ZES_POWER_LEVEL_SUSTAINED + ) + ext_properties_ptr.contents.defaultLimit.contents.limit = 250000 + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_properties) + mock_get_func.return_value = mock_func + + power_handle = self.pyzes.zes_pwr_handle_t() + default_limit = self.pyzes.zes_power_limit_ext_desc_t() + ext_properties = self.pyzes.zes_power_ext_properties_t() + ext_properties.stype = self.pyzes.ZES_STRUCTURE_TYPE_POWER_EXT_PROPERTIES + ext_properties.defaultLimit = pointer(default_limit) + + properties = self.pyzes.zes_power_properties_t() + properties.stype = self.pyzes.ZES_STRUCTURE_TYPE_POWER_PROPERTIES + properties.pNext = cast(pointer(ext_properties), c_void_p) + + result = self.pyzes.zesPowerGetProperties(power_handle, byref(properties)) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(properties.onSubdevice, 1) + self.assertEqual(properties.subdeviceId, 3) + self.assertEqual(properties.canControl, 1) + self.assertEqual(properties.isEnergyThresholdSupported, 1) + self.assertEqual(properties.defaultLimit, 250000) + self.assertEqual(properties.minLimit, 150000) + self.assertEqual(properties.maxLimit, 300000) + self.assertEqual(ext_properties.domain, self.pyzes.ZES_POWER_DOMAIN_GPU) + self.assertEqual( + ext_properties.defaultLimit.contents.level, + self.pyzes.ZES_POWER_LEVEL_SUSTAINED, + ) + self.assertEqual(ext_properties.defaultLimit.contents.limit, 250000) + mock_get_func.assert_called_with("zesPowerGetProperties") + mock_func.assert_called_once() + + def test_GivenValidPowerHandleWhenCallingZesPowerGetLimitsExtThenCallSucceedsWithDescriptors( + self, mock_get_func + ): + mock_count = 2 + + def mock_get_limits_ext(power_handle, count_ptr, limits_ptr): + count_ptr._obj.value = mock_count + limits_ptr[0].level = self.pyzes.ZES_POWER_LEVEL_SUSTAINED + limits_ptr[0].source = self.pyzes.ZES_POWER_SOURCE_MAINS + limits_ptr[0].limitUnit = self.pyzes.ZES_LIMIT_UNIT_POWER + limits_ptr[0].enabled = 1 + limits_ptr[0].interval = 1000 + limits_ptr[0].limit = 250000 + limits_ptr[1].level = self.pyzes.ZES_POWER_LEVEL_BURST + limits_ptr[1].source = self.pyzes.ZES_POWER_SOURCE_ANY + limits_ptr[1].limitUnit = self.pyzes.ZES_LIMIT_UNIT_POWER + limits_ptr[1].enabled = 0 + limits_ptr[1].interval = 0 + limits_ptr[1].limit = 300000 + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_get_limits_ext) + mock_get_func.return_value = mock_func + + power_handle = self.pyzes.zes_pwr_handle_t() + count = c_uint32(mock_count) + limits = (self.pyzes.zes_power_limit_ext_desc_t * mock_count)() + + result = self.pyzes.zesPowerGetLimitsExt(power_handle, byref(count), limits) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + self.assertEqual(count.value, mock_count) + self.assertEqual(limits[0].level, self.pyzes.ZES_POWER_LEVEL_SUSTAINED) + self.assertEqual(limits[0].source, self.pyzes.ZES_POWER_SOURCE_MAINS) + self.assertEqual(limits[0].limit, 250000) + self.assertEqual(limits[1].level, self.pyzes.ZES_POWER_LEVEL_BURST) + self.assertEqual(limits[1].enabled, 0) + self.assertEqual(limits[1].limit, 300000) + mock_get_func.assert_called_with("zesPowerGetLimitsExt") + mock_func.assert_called_once() + + def test_GivenValidPowerHandleWhenCallingZesPowerSetLimitsExtThenCallSucceeds( + self, mock_get_func + ): + def mock_set_limits_ext(power_handle, count_ptr, limits_ptr): + self.assertEqual(count_ptr._obj.value, 1) + self.assertEqual(limits_ptr[0].level, self.pyzes.ZES_POWER_LEVEL_SUSTAINED) + self.assertEqual(limits_ptr[0].source, self.pyzes.ZES_POWER_SOURCE_MAINS) + self.assertEqual(limits_ptr[0].limitUnit, self.pyzes.ZES_LIMIT_UNIT_POWER) + self.assertEqual(limits_ptr[0].enabled, 1) + self.assertEqual(limits_ptr[0].interval, 2000) + self.assertEqual(limits_ptr[0].limit, 275000) + return self.pyzes.ZE_RESULT_SUCCESS + + mock_func = MagicMock(side_effect=mock_set_limits_ext) + mock_get_func.return_value = mock_func + + power_handle = self.pyzes.zes_pwr_handle_t() + count = c_uint32(1) + limits = (self.pyzes.zes_power_limit_ext_desc_t * 1)() + limits[0].stype = self.pyzes.ZES_STRUCTURE_TYPE_POWER_LIMIT_EXT_DESC + limits[0].level = self.pyzes.ZES_POWER_LEVEL_SUSTAINED + limits[0].source = self.pyzes.ZES_POWER_SOURCE_MAINS + limits[0].limitUnit = self.pyzes.ZES_LIMIT_UNIT_POWER + limits[0].enabled = 1 + limits[0].interval = 2000 + limits[0].limit = 275000 + + result = self.pyzes.zesPowerSetLimitsExt(power_handle, byref(count), limits) + + self.assertEqual(result, self.pyzes.ZE_RESULT_SUCCESS) + mock_get_func.assert_called_with("zesPowerSetLimitsExt") + mock_func.assert_called_once() + if __name__ == "__main__": unittest.main()