diff --git a/config/chassis_modules.py b/config/chassis_modules.py index 4f71ada8028..724d446834d 100755 --- a/config/chassis_modules.py +++ b/config/chassis_modules.py @@ -5,9 +5,9 @@ import re import subprocess import utilities_common.cli as clicommon -from utilities_common.chassis import is_smartswitch, get_all_dpus +from utilities_common.chassis import is_smartswitch, is_bmc, get_all_dpus from utilities_common.module import ModuleHelper -from datetime import datetime, timedelta, timezone +from datetime import timedelta TIMEOUT_SECS = 10 TRANSITION_TIMEOUT = timedelta(seconds=240) # 4 minutes @@ -60,6 +60,9 @@ def get_config_module_state(db, chassis_module_name): if not fvs: if is_smartswitch(): return 'down' + elif is_bmc() and chassis_module_name.startswith("SWITCH-HOST"): + # On BMC, SWITCH-HOST default is 'down' to keep it powered off on boot + return 'down' else: return 'up' else: @@ -141,12 +144,16 @@ def fabric_module_set_admin_status(db, chassis_module_name, state): type=click.Choice(get_all_dpus(), case_sensitive=False) if is_smartswitch() else str ) def shutdown_chassis_module(db, chassis_module_name): - """Chassis-module shutdown of module""" + """Shutdown chassis module (sets admin_status to down; default for SWITCH-HOST on BMC)""" config_db = db.cfgdb ctx = click.get_current_context() - if not chassis_module_name.startswith(("SUPERVISOR", "LINE-CARD", "FABRIC-CARD", "DPU")): - ctx.fail("'module_name' has to begin with 'SUPERVISOR', 'LINE-CARD', 'FABRIC-CARD', or 'DPU'") + allowed_prefixes = ("SUPERVISOR", "LINE-CARD", "FABRIC-CARD", "DPU") + if is_bmc(): + allowed_prefixes += ("SWITCH-HOST",) + if not chassis_module_name.startswith(allowed_prefixes): + allowed_prefixes_str = "', '".join(allowed_prefixes) + ctx.fail(f"'module_name' has to begin with '{allowed_prefixes_str}'") if get_config_module_state(db, chassis_module_name) == 'down': click.echo(f"Module {chassis_module_name} is already in down state") @@ -163,6 +170,10 @@ def shutdown_chassis_module(db, chassis_module_name): 'admin_status': 'down', } config_db.set_entry('CHASSIS_MODULE', chassis_module_name, fvs) + elif is_bmc() and chassis_module_name.startswith("SWITCH-HOST"): + click.echo(f"Shutting down chassis module {chassis_module_name}") + # Use mod_entry to preserve power_on_delay and graceful_shutdown_timeout in the same entry + config_db.mod_entry('CHASSIS_MODULE', chassis_module_name, {'admin_status': 'down'}) else: click.echo(f"Shutting down chassis module {chassis_module_name}") config_db.set_entry('CHASSIS_MODULE', chassis_module_name, {'admin_status': 'down'}) @@ -186,8 +197,12 @@ def startup_chassis_module(db, chassis_module_name): config_db = db.cfgdb ctx = click.get_current_context() - if not chassis_module_name.startswith(("SUPERVISOR", "LINE-CARD", "FABRIC-CARD", "DPU")): - ctx.fail("'module_name' has to begin with 'SUPERVISOR', 'LINE-CARD', 'FABRIC-CARD', or 'DPU'") + allowed_prefixes = ("SUPERVISOR", "LINE-CARD", "FABRIC-CARD", "DPU") + if is_bmc(): + allowed_prefixes += ("SWITCH-HOST",) + if not chassis_module_name.startswith(allowed_prefixes): + allowed_prefixes_str = "', '".join(allowed_prefixes) + ctx.fail(f"'module_name' has to begin with '{allowed_prefixes_str}'") return if get_config_module_state(db, chassis_module_name) == 'up': @@ -205,6 +220,10 @@ def startup_chassis_module(db, chassis_module_name): 'admin_status': 'up', } config_db.set_entry('CHASSIS_MODULE', chassis_module_name, fvs) + elif is_bmc() and chassis_module_name.startswith("SWITCH-HOST"): + click.echo(f"Starting up chassis module {chassis_module_name}") + # Use mod_entry to preserve power_on_delay and graceful_shutdown_timeout in the same entry + config_db.mod_entry('CHASSIS_MODULE', chassis_module_name, {'admin_status': 'up'}) else: click.echo(f"Starting up chassis module {chassis_module_name}") config_db.set_entry('CHASSIS_MODULE', chassis_module_name, None) @@ -212,3 +231,46 @@ def startup_chassis_module(db, chassis_module_name): if chassis_module_name.startswith("FABRIC-CARD"): if not check_config_module_state_with_timeout(ctx, db, chassis_module_name, 'up'): fabric_module_set_admin_status(db, chassis_module_name, 'up') + + +if is_bmc(): + + # + # 'power-on-delay' subcommand ('config chassis modules power-on-delay ...') + # + @modules.command('power-on-delay') + @clicommon.pass_db + @click.argument('chassis_module_name', metavar='', required=True) + @click.argument('seconds', metavar='', required=True, type=click.IntRange(min=0)) + def set_power_on_delay(db, chassis_module_name, seconds): + """Configure delay (secs) BMC waits before powering on Switch-Host (default: 0)""" + ctx = click.get_current_context() + + if not chassis_module_name.startswith("SWITCH-HOST"): + ctx.fail("'power-on-delay' is only applicable to SWITCH-HOST modules") + + config_db = db.cfgdb + fvs = config_db.get_entry('CHASSIS_MODULE', chassis_module_name) or {} + fvs['power_on_delay'] = str(seconds) + config_db.set_entry('CHASSIS_MODULE', chassis_module_name, fvs) + click.echo(f"Power-on-delay for {chassis_module_name} set to {seconds} seconds") + + # + # 'shutdown-timeout' subcommand ('config chassis modules shutdown-timeout ...') + # + @modules.command('shutdown-timeout') + @clicommon.pass_db + @click.argument('chassis_module_name', metavar='', required=True) + @click.argument('seconds', metavar='', required=True, type=click.IntRange(min=0)) + def set_graceful_shutdown_timeout(db, chassis_module_name, seconds): + """Configure graceful-shutdown timeout (secs) before BMC forces power-off (0: immediate, default: 120)""" + ctx = click.get_current_context() + + if not chassis_module_name.startswith("SWITCH-HOST"): + ctx.fail("'shutdown-timeout' is only applicable to SWITCH-HOST modules") + + config_db = db.cfgdb + fvs = config_db.get_entry('CHASSIS_MODULE', chassis_module_name) or {} + fvs['graceful_shutdown_timeout'] = str(seconds) + config_db.set_entry('CHASSIS_MODULE', chassis_module_name, fvs) + click.echo(f"Shutdown-timeout for {chassis_module_name} set to {seconds} seconds") diff --git a/config/liquid_cool.py b/config/liquid_cool.py new file mode 100644 index 00000000000..76bfc367e2b --- /dev/null +++ b/config/liquid_cool.py @@ -0,0 +1,50 @@ +import click +import utilities_common.cli as clicommon + +LEAK_CONTROL_POLICY_TABLE = 'LEAK_CONTROL_POLICY' +LEAK_CONTROL_POLICY_KEY = 'policy' + +VALID_POLICIES = ('system', 'rack_mgr') +VALID_SEVERITIES = ('critical', 'minor') +VALID_ACTIONS = ('syslog_only', 'graceful_shutdown', 'power_off') + +POLICY_FIELD_MAP = { + 'system': 'system_leak_policy', + 'rack_mgr': 'rack_mgr_leak_policy', +} + +ACTION_FIELD_MAP = { + ('system', 'critical'): 'system_critical_leak_action', + ('system', 'minor'): 'system_minor_leak_action', + ('rack_mgr', 'critical'): 'rack_mgr_critical_alert_action', + ('rack_mgr', 'minor'): 'rack_mgr_minor_alert_action', +} + + +@click.group('liquid-cool') +def liquid_cool(): + """Liquid cooling configuration commands""" + pass + + +@liquid_cool.command('leak-control') +@clicommon.pass_db +@click.argument('policy_type', metavar='[system|rack_mgr]', type=click.Choice(VALID_POLICIES)) +@click.argument('state', metavar='[enabled|disabled]', type=click.Choice(['enabled', 'disabled'])) +def leak_control(db, policy_type, state): + """Enable or disable system/rack-manager leak policy enforcement""" + field = POLICY_FIELD_MAP[policy_type] + db.cfgdb.mod_entry(LEAK_CONTROL_POLICY_TABLE, LEAK_CONTROL_POLICY_KEY, {field: state}) + click.echo(f"Leak control policy for '{policy_type}' set to '{state}'") + + +@liquid_cool.command('leak-action') +@clicommon.pass_db +@click.argument('policy_type', metavar='[system|rack_mgr]', type=click.Choice(VALID_POLICIES)) +@click.argument('severity', metavar='[critical|minor]', type=click.Choice(VALID_SEVERITIES)) +@click.argument('action', metavar='[syslog_only|graceful_shutdown|power_off]', type=click.Choice(VALID_ACTIONS)) +def leak_action(db, policy_type, severity, action): + """Configure the action taken when a critical/minor leak event is detected""" + field = ACTION_FIELD_MAP[(policy_type, severity)] + db.cfgdb.mod_entry(LEAK_CONTROL_POLICY_TABLE, LEAK_CONTROL_POLICY_KEY, {field: action}) + click.echo(f"Leak action for '{policy_type}' '{severity}' events set to '{action}'") diff --git a/config/main.py b/config/main.py index b304b9a7eac..9b136cb8019 100644 --- a/config/main.py +++ b/config/main.py @@ -51,6 +51,7 @@ from . import aaa from . import bmc from . import chassis_modules +from . import liquid_cool from . import console from . import feature from . import fabric @@ -1756,6 +1757,7 @@ def config(ctx): config.add_command(aaa.radius) config.add_command(bmc.bmc) config.add_command(chassis_modules.chassis) +config.add_command(liquid_cool.liquid_cool) config.add_command(console.console) config.add_command(fabric.fabric) config.add_command(feature.feature) diff --git a/show/chassis_modules.py b/show/chassis_modules.py index d7c74fc9a6f..48110ff24d0 100644 --- a/show/chassis_modules.py +++ b/show/chassis_modules.py @@ -2,7 +2,8 @@ from natsort import natsorted from tabulate import tabulate from swsscommon.swsscommon import SonicV2Connector -from utilities_common.chassis import is_smartswitch +from utilities_common.chassis import is_smartswitch, is_bmc +from utilities_common.module import ModuleHelper, NOT_AVAILABLE from sonic_platform_base.module_base import ModuleBase import utilities_common.cli as clicommon @@ -51,6 +52,19 @@ def status(db, chassis_module_name): print('Key {} not found in {} table'.format(key_pattern, CHASSIS_MODULE_INFO_TABLE)) return + # On BMC, oper_status is read directly from the platform API. + # ModuleHelper.__init__ does not raise on chassis load failure; it logs and keeps + # platform_chassis=None. Treat that as unavailable so we don't emit per-module + # errors in the loop — just fall back to STATE_DB silently. + module_helper = None + if is_bmc(): + try: + helper = ModuleHelper() + if helper.platform_chassis: + module_helper = helper + except Exception: + pass + table = [] for key in natsorted(keys): key_list = key.split('|') @@ -66,6 +80,12 @@ def status(db, chassis_module_name): oper_status = data_dict.get(CHASSIS_MODULE_INFO_OPERSTATUS_FIELD, ModuleBase.MODULE_STATUS_EMPTY) serial = data_dict.get(CHASSIS_MODULE_INFO_SERIAL_FIELD, 'N/A') + # On BMC, prefer oper_status from platform API; fall back to STATE_DB if unavailable + if module_helper is not None: + platform_oper_status = module_helper.get_module_oper_status(key_list[1]) + if platform_oper_status != NOT_AVAILABLE: + oper_status = platform_oper_status + # Determine admin_status if is_smartswitch(): admin_status = 'down' diff --git a/show/platform.py b/show/platform.py index e930a712994..693d5a6dc4c 100644 --- a/show/platform.py +++ b/show/platform.py @@ -4,6 +4,7 @@ import sys import click +from tabulate import tabulate import utilities_common.cli as clicommon from sonic_py_common import device_info @@ -39,6 +40,7 @@ def try_get(platform, attr, fallback): return chassis_info + # # 'platform' group ("show platform ...") # @@ -306,15 +308,115 @@ def firmware(args): sys.exit(e.returncode) -# 'leakage' subcommand ("show platform leakage status") +LEAK_CONTROL_POLICY_TABLE = 'LEAK_CONTROL_POLICY' +LEAK_CONTROL_POLICY_KEY = 'policy' +RACK_MANAGER_ALERT_TABLE = 'RACK_MANAGER_ALERT' +LEAK_PROFILE_TABLE = 'LEAK_PROFILE' +LIQUID_COOLING_INFO_TABLE = 'LIQUID_COOLING_INFO' + + +def _get_state_db(): + from swsscommon.swsscommon import SonicV2Connector + state_db = SonicV2Connector(host="127.0.0.1") + state_db.connect(state_db.STATE_DB) + return state_db + + +# 'leak' group ("show platform leak ...") @platform.group() -def leakage(): - """Show platform leakage information""" +def leak(): + """Show liquid cooling leak information""" pass -@leakage.command() -def status(): - """Show platform leakage status""" - cmd = ["leakageshow"] - clicommon.run_command(cmd) +@leak.command('control-policy') +def leak_control_policy(): + """Show leak control policy configuration""" + try: + from utilities_common.db import Db + db = Db() + entry = db.cfgdb.get_entry(LEAK_CONTROL_POLICY_TABLE, LEAK_CONTROL_POLICY_KEY) + click.echo(" system_leak_policy : {}".format(entry.get('system_leak_policy', 'enabled'))) + critical_action = entry.get('system_critical_leak_action', 'power_off') + click.echo(" system_critical_leak_action : {}".format(critical_action)) + click.echo(" system_minor_leak_action : {}".format(entry.get('system_minor_leak_action', 'syslog_only'))) + click.echo(" rack_mgr_leak_policy : {}".format(entry.get('rack_mgr_leak_policy', 'enabled'))) + rack_critical_action = entry.get('rack_mgr_critical_alert_action', 'syslog_only') + click.echo(" rack_mgr_critical_alert_action : {}".format(rack_critical_action)) + rack_minor_action = entry.get('rack_mgr_minor_alert_action', 'syslog_only') + click.echo(" rack_mgr_minor_alert_action : {}".format(rack_minor_action)) + except Exception as e: + click.echo(f"Error: Failed to retrieve leak control policy: {e}", err=True) + + +@leak.group('rack-manager') +def leak_rack_manager(): + """Show rack-manager leak information""" + pass + + +@leak_rack_manager.command('alerts') +def leak_rack_manager_alerts(): + """Show rack-manager alerts""" + try: + state_db = _get_state_db() + keys = state_db.keys(state_db.STATE_DB, f"{RACK_MANAGER_ALERT_TABLE}|*") or [] + header = ['Alert', 'Severity', 'Timestamp'] + rows = [] + for key in sorted(keys): + alert_name = key.split('|', 1)[1] + data = state_db.get_all(state_db.STATE_DB, key) or {} + severity = data.get('severity', data.get('leak', 'N/A')) + timestamp = data.get('timestamp', 'N/A') + rows.append((alert_name, severity, timestamp)) + if rows: + click.echo(tabulate(rows, header, tablefmt='simple')) + else: + click.echo("No rack-manager alerts found") + except Exception as e: + click.echo(f"Error: Failed to retrieve rack-manager leak alerts: {e}", err=True) + + +@leak.command('profiles') +def leak_profiles(): + """Show leak sensor profiles""" + try: + from utilities_common.db import Db + db = Db() + keys = db.cfgdb.get_keys(LEAK_PROFILE_TABLE) or [] + header = ['Sensor-Type', 'Max-Minor-Duration-Sec'] + rows = [] + for sensor_type in sorted(keys): + entry = db.cfgdb.get_entry(LEAK_PROFILE_TABLE, sensor_type) + max_dur = entry.get('max_minor_duration_sec', 'N/A') + rows.append((sensor_type, max_dur)) + if rows: + click.echo(tabulate(rows, header, tablefmt='simple')) + else: + click.echo("No leak profiles found") + except Exception as e: + click.echo(f"Error: Failed to retrieve leak sensor profiles: {e}", err=True) + + +@leak.command('status') +def leak_status(): + """Show leak sensor status""" + try: + state_db = _get_state_db() + keys = state_db.keys(state_db.STATE_DB, f"{LIQUID_COOLING_INFO_TABLE}|*") or [] + header = ['Name', 'Leak', 'Leak-sensor-status', 'leak-sensor-type', 'leak-severity'] + rows = [] + for key in sorted(keys): + data = state_db.get_all(state_db.STATE_DB, key) or {} + name = data.get('name', key.split('|', 1)[1]) + leaking = data.get('leaking', 'N/A') + sensor_status = data.get('leak_sensor_status', 'N/A') + sensor_type = data.get('type', 'N/A') + severity = data.get('leak_severity', 'N/A') if leaking.upper() in ('YES', 'TRUE') else 'NA' + rows.append((name, leaking, sensor_status, sensor_type, severity)) + if rows: + click.echo(tabulate(rows, header, tablefmt='simple')) + else: + click.echo("No leak sensor data found") + except Exception as e: + click.echo(f"Error: Failed to retrieve leak sensor status: {e}", err=True) diff --git a/tests/chassis_modules_test.py b/tests/chassis_modules_test.py index 07408bfd3ec..e0fb374dc5e 100755 --- a/tests/chassis_modules_test.py +++ b/tests/chassis_modules_test.py @@ -570,3 +570,298 @@ def test_startup_module_already_up(self): @classmethod def teardown_class(cls): print("TEARDOWN") + os.environ["UTILITIES_UNIT_TESTING"] = "0" + + +class TestChassisModuleTimingConfig(object): + """Tests for 'config chassis modules power-on-delay' and 'shutdown-timeout'""" + + @classmethod + def setup_class(cls): + print("SETUP") + os.environ["UTILITIES_UNIT_TESTING"] = "1" + import importlib + import config.chassis_modules as cm + with mock.patch('utilities_common.chassis.is_bmc', new=lambda: True): + importlib.reload(cm) + cls.modules = cm.modules + + def test_power_on_delay_switch_host(self): + runner = CliRunner() + db = Db() + result = runner.invoke( + self.modules.commands["power-on-delay"], + ["SWITCH-HOST", "300"], + obj=db + ) + print(result.output) + assert result.exit_code == 0 + assert "300" in result.output + entry = db.cfgdb.get_entry("CHASSIS_MODULE", "SWITCH-HOST") + assert entry.get("power_on_delay") == "300" + + def test_power_on_delay_zero(self): + runner = CliRunner() + db = Db() + result = runner.invoke( + self.modules.commands["power-on-delay"], + ["SWITCH-HOST", "0"], + obj=db + ) + print(result.output) + assert result.exit_code == 0 + entry = db.cfgdb.get_entry("CHASSIS_MODULE", "SWITCH-HOST") + assert entry.get("power_on_delay") == "0" + + def test_power_on_delay_negative_rejected(self): + """Verify negative values are invalid; default is 0 per HLD (admin_status=down keeps Switch-Host off).""" + runner = CliRunner() + db = Db() + result = runner.invoke( + self.modules.commands["power-on-delay"], + ["SWITCH-HOST", "-1"], + obj=db + ) + print(result.output) + assert result.exit_code != 0 + + def test_power_on_delay_non_switch_host_rejected(self): + runner = CliRunner() + db = Db() + result = runner.invoke( + self.modules.commands["power-on-delay"], + ["LINE-CARD0", "300"], + obj=db + ) + print(result.output) + assert result.exit_code != 0 + assert "SWITCH-HOST" in result.output + + def test_shutdown_timeout_switch_host(self): + runner = CliRunner() + db = Db() + result = runner.invoke( + self.modules.commands["shutdown-timeout"], + ["SWITCH-HOST", "120"], + obj=db + ) + print(result.output) + assert result.exit_code == 0 + assert "120" in result.output + entry = db.cfgdb.get_entry("CHASSIS_MODULE", "SWITCH-HOST") + assert entry.get("graceful_shutdown_timeout") == "120" + + def test_shutdown_timeout_zero_immediate_poweroff(self): + runner = CliRunner() + db = Db() + result = runner.invoke( + self.modules.commands["shutdown-timeout"], + ["SWITCH-HOST", "0"], + obj=db + ) + print(result.output) + assert result.exit_code == 0 + entry = db.cfgdb.get_entry("CHASSIS_MODULE", "SWITCH-HOST") + assert entry.get("graceful_shutdown_timeout") == "0" + + def test_shutdown_timeout_negative_rejected(self): + runner = CliRunner() + db = Db() + result = runner.invoke( + self.modules.commands["shutdown-timeout"], + ["SWITCH-HOST", "-5"], + obj=db + ) + print(result.output) + assert result.exit_code != 0 + + def test_shutdown_timeout_non_switch_host_rejected(self): + runner = CliRunner() + db = Db() + result = runner.invoke( + self.modules.commands["shutdown-timeout"], + ["FABRIC-CARD0", "120"], + obj=db + ) + print(result.output) + assert result.exit_code != 0 + assert "SWITCH-HOST" in result.output + + def test_both_fields_preserved_in_db(self): + """Setting one field does not overwrite the other.""" + runner = CliRunner() + db = Db() + runner.invoke( + self.modules.commands["power-on-delay"], + ["SWITCH-HOST", "60"], + obj=db + ) + runner.invoke( + self.modules.commands["shutdown-timeout"], + ["SWITCH-HOST", "30"], + obj=db + ) + entry = db.cfgdb.get_entry("CHASSIS_MODULE", "SWITCH-HOST") + assert entry.get("power_on_delay") == "60" + assert entry.get("graceful_shutdown_timeout") == "30" + + @classmethod + def teardown_class(cls): + print("TEARDOWN") + os.environ["UTILITIES_UNIT_TESTING"] = "0" + + +class TestChassisModuleBMCStartupShutdown(object): + """Tests for startup/shutdown of SWITCH-HOST on BMC: + - default admin_status is 'down' when no entry exists + - startup/shutdown use mod_entry to preserve power_on_delay and graceful_shutdown_timeout + """ + + @classmethod + def setup_class(cls): + print("SETUP") + os.environ["UTILITIES_UNIT_TESTING"] = "1" + + def test_default_state_is_down_on_bmc(self): + """SWITCH-HOST has no entry in CONFIG_DB; on BMC the default must be 'down'.""" + from config.chassis_modules import get_config_module_state + db = Db() + with mock.patch('config.chassis_modules.is_bmc', return_value=True): + state = get_config_module_state(db, "SWITCH-HOST") + assert state == 'down' + + def test_startup_switch_host_preserves_timing_fields(self): + """startup SWITCH-HOST on BMC must not erase power_on_delay or graceful_shutdown_timeout.""" + runner = CliRunner() + db = Db() + db.cfgdb.mod_entry('CHASSIS_MODULE', 'SWITCH-HOST', { + 'admin_status': 'down', + 'power_on_delay': '300', + 'graceful_shutdown_timeout': '120', + }) + with mock.patch('config.chassis_modules.is_bmc', return_value=True): + result = runner.invoke( + config.config.commands["chassis"].commands["modules"].commands["startup"], + ["SWITCH-HOST"], + obj=db + ) + print(result.output) + assert result.exit_code == 0 + entry = db.cfgdb.get_entry("CHASSIS_MODULE", "SWITCH-HOST") + assert entry.get("admin_status") == "up" + assert entry.get("power_on_delay") == "300", "power_on_delay must be preserved" + assert entry.get("graceful_shutdown_timeout") == "120", "graceful_shutdown_timeout must be preserved" + + def test_shutdown_switch_host_preserves_timing_fields(self): + """shutdown SWITCH-HOST on BMC must not erase power_on_delay or graceful_shutdown_timeout.""" + runner = CliRunner() + db = Db() + db.cfgdb.mod_entry('CHASSIS_MODULE', 'SWITCH-HOST', { + 'admin_status': 'up', + 'power_on_delay': '60', + 'graceful_shutdown_timeout': '30', + }) + with mock.patch('config.chassis_modules.is_bmc', return_value=True): + result = runner.invoke( + config.config.commands["chassis"].commands["modules"].commands["shutdown"], + ["SWITCH-HOST"], + obj=db + ) + print(result.output) + assert result.exit_code == 0 + entry = db.cfgdb.get_entry("CHASSIS_MODULE", "SWITCH-HOST") + assert entry.get("admin_status") == "down" + assert entry.get("power_on_delay") == "60", "power_on_delay must be preserved" + assert entry.get("graceful_shutdown_timeout") == "30", "graceful_shutdown_timeout must be preserved" + + def test_startup_switch_host_already_up_is_noop(self): + """startup SWITCH-HOST when already up should print a message and return.""" + runner = CliRunner() + db = Db() + db.cfgdb.mod_entry('CHASSIS_MODULE', 'SWITCH-HOST', {'admin_status': 'up'}) + with mock.patch('config.chassis_modules.is_bmc', return_value=True): + result = runner.invoke( + config.config.commands["chassis"].commands["modules"].commands["startup"], + ["SWITCH-HOST"], + obj=db + ) + print(result.output) + assert result.exit_code == 0 + assert "already" in result.output + + def test_shutdown_switch_host_already_down_is_noop(self): + """shutdown SWITCH-HOST when already down (no entry) should print a message and return.""" + runner = CliRunner() + db = Db() + with mock.patch('config.chassis_modules.is_bmc', return_value=True): + result = runner.invoke( + config.config.commands["chassis"].commands["modules"].commands["shutdown"], + ["SWITCH-HOST"], + obj=db + ) + print(result.output) + assert result.exit_code == 0 + assert "already" in result.output + + def test_show_status_bmc_uses_platform_oper_status(self): + """On BMC, oper_status should come from platform API when available.""" + runner = CliRunner() + mock_module_helper = mock.MagicMock() + mock_module_helper.get_module_oper_status.return_value = "Online" + + with mock.patch('show.chassis_modules.is_bmc', return_value=True), \ + mock.patch('show.chassis_modules.ModuleHelper', return_value=mock_module_helper): + result = runner.invoke( + show.cli.commands["chassis"].commands["modules"].commands["status"], + ["LINE-CARD0"] + ) + print(result.output) + assert result.exit_code == 0 + assert "Online" in result.output + + def test_show_status_bmc_falls_back_to_db_when_platform_unavailable(self): + """On BMC, if platform API returns N/A, fall back to STATE_DB oper_status.""" + runner = CliRunner() + mock_module_helper = mock.MagicMock() + mock_module_helper.get_module_oper_status.return_value = "N/A" + + with mock.patch('show.chassis_modules.is_bmc', return_value=True), \ + mock.patch('show.chassis_modules.ModuleHelper', return_value=mock_module_helper): + result = runner.invoke( + show.cli.commands["chassis"].commands["modules"].commands["status"], + ["LINE-CARD0"] + ) + print(result.output) + assert result.exit_code == 0 + # DB value for LINE-CARD0 is "Empty"; should be preserved when platform returns N/A + assert "Empty" in result.output + + def test_show_status_non_bmc_uses_db_oper_status(self): + """On non-BMC, oper_status should always come from STATE_DB.""" + runner = CliRunner() + with mock.patch('show.chassis_modules.is_bmc', return_value=False): + result = runner.invoke( + show.cli.commands["chassis"].commands["modules"].commands["status"], + ["LINE-CARD0"] + ) + print(result.output) + assert result.exit_code == 0 + assert "Empty" in result.output + + def test_show_status_bmc_module_helper_init_failure_falls_back_to_db(self): + """On BMC, if ModuleHelper init raises, gracefully fall back to STATE_DB.""" + runner = CliRunner() + with mock.patch('show.chassis_modules.is_bmc', return_value=True), \ + mock.patch('show.chassis_modules.ModuleHelper', side_effect=Exception("init failed")): + result = runner.invoke( + show.cli.commands["chassis"].commands["modules"].commands["status"], + ["LINE-CARD0"] + ) + print(result.output) + assert result.exit_code == 0 + assert "Empty" in result.output + + @classmethod + def teardown_class(cls): + print("TEARDOWN") + os.environ["UTILITIES_UNIT_TESTING"] = "0" diff --git a/tests/leakage_status_test.py b/tests/leakage_status_test.py index 30601590c8d..b4b772474ab 100644 --- a/tests/leakage_status_test.py +++ b/tests/leakage_status_test.py @@ -1,33 +1,63 @@ import sys import os from click.testing import CliRunner -import show.main as show +from unittest.mock import patch, MagicMock +import show.platform as platform_show test_path = os.path.dirname(os.path.abspath(__file__)) modules_path = os.path.dirname(test_path) -scripts_path = os.path.join(modules_path, "scripts") sys.path.insert(0, modules_path) -class TestFan(object): +def _make_state_db(table_data): + state_db = MagicMock() + state_db.STATE_DB = 'STATE_DB' + + def _keys(db_id, pattern): + prefix = pattern.rstrip('*') + return [k for k in table_data if k.startswith(prefix)] + + def _get_all(db_id, key): + return table_data.get(key) + + state_db.keys.side_effect = _keys + state_db.get_all.side_effect = _get_all + return state_db + + +class TestLeakStatus(object): @classmethod def setup_class(cls): print("SETUP") os.environ["UTILITIES_UNIT_TESTING"] = "1" - def test_show_platform_leakage_status(self): + def test_show_platform_leak_status(self): runner = CliRunner() - result = runner.invoke(show.cli.commands["platform"].commands["leakage"].commands["status"]) - print(result.output) - expected = """\ - Name Leaking --------- --------- -leakage1 No -leakage2 No -leakage3 Yes -""" - - assert result.output == expected + table_data = { + 'LIQUID_COOLING_INFO|leakage_sensors1': { + 'name': 'leakage1', 'leaking': 'No', + 'leak_sensor_status': 'OK', 'type': 'rope', 'leak_severity': 'N/A', + }, + 'LIQUID_COOLING_INFO|leakage_sensors2': { + 'name': 'leakage2', 'leaking': 'No', + 'leak_sensor_status': 'OK', 'type': 'spot', 'leak_severity': 'N/A', + }, + 'LIQUID_COOLING_INFO|leakage_sensors3': { + 'name': 'leakage3', 'leaking': 'Yes', + 'leak_sensor_status': 'OK', 'type': 'rope', 'leak_severity': 'MINOR', + }, + } + mock_state_db = _make_state_db(table_data) + + with patch('show.platform._get_state_db', return_value=mock_state_db): + result = runner.invoke( + platform_show.platform.commands["leak"].commands["status"] + ) + assert result.exit_code == 0 + assert 'leakage1' in result.output + assert 'leakage2' in result.output + assert 'leakage3' in result.output + assert 'Yes' in result.output @classmethod def teardown_class(cls): diff --git a/tests/liquid_cool_test.py b/tests/liquid_cool_test.py new file mode 100644 index 00000000000..80ef8b84aee --- /dev/null +++ b/tests/liquid_cool_test.py @@ -0,0 +1,152 @@ +import os +import sys + +from click.testing import CliRunner +from unittest.mock import MagicMock + +test_path = os.path.dirname(os.path.abspath(__file__)) +modules_path = os.path.dirname(test_path) +sys.path.insert(0, modules_path) + +from config.liquid_cool import liquid_cool # noqa: E402 + + +def _make_mock_db(cfgdb=None): + """Return a MagicMock with spec=Db and a controlled cfgdb attribute.""" + from utilities_common.db import Db + db = MagicMock(spec=Db) + db.cfgdb = cfgdb if cfgdb is not None else MagicMock() + return db + + +class TestLeakControl(object): + """Tests for 'config liquid_cool leak-control' command""" + + @classmethod + def setup_class(cls): + print("SETUP") + + def _invoke(self, args, cfgdb=None): + runner = CliRunner() + db = _make_mock_db(cfgdb) + return runner.invoke(liquid_cool, args, obj=db) + + def test_leak_control_system_enabled(self): + mock_cfgdb = MagicMock() + result = self._invoke(['leak-control', 'system', 'enabled'], mock_cfgdb) + assert result.exit_code == 0, result.output + assert "system" in result.output + assert "enabled" in result.output + mock_cfgdb.mod_entry.assert_called_once_with( + 'LEAK_CONTROL_POLICY', 'policy', {'system_leak_policy': 'enabled'} + ) + + def test_leak_control_system_disabled(self): + mock_cfgdb = MagicMock() + result = self._invoke(['leak-control', 'system', 'disabled'], mock_cfgdb) + assert result.exit_code == 0, result.output + assert "disabled" in result.output + mock_cfgdb.mod_entry.assert_called_once_with( + 'LEAK_CONTROL_POLICY', 'policy', {'system_leak_policy': 'disabled'} + ) + + def test_leak_control_rack_mgr_enabled(self): + mock_cfgdb = MagicMock() + result = self._invoke(['leak-control', 'rack_mgr', 'enabled'], mock_cfgdb) + assert result.exit_code == 0, result.output + mock_cfgdb.mod_entry.assert_called_once_with( + 'LEAK_CONTROL_POLICY', 'policy', {'rack_mgr_leak_policy': 'enabled'} + ) + + def test_leak_control_rack_mgr_disabled(self): + mock_cfgdb = MagicMock() + result = self._invoke(['leak-control', 'rack_mgr', 'disabled'], mock_cfgdb) + assert result.exit_code == 0, result.output + mock_cfgdb.mod_entry.assert_called_once_with( + 'LEAK_CONTROL_POLICY', 'policy', {'rack_mgr_leak_policy': 'disabled'} + ) + + def test_leak_control_invalid_policy_type(self): + result = self._invoke(['leak-control', 'invalid_type', 'enabled']) + assert result.exit_code != 0 + + def test_leak_control_invalid_state(self): + result = self._invoke(['leak-control', 'system', 'invalid_state']) + assert result.exit_code != 0 + + def test_leak_control_missing_args(self): + result = self._invoke(['leak-control']) + assert result.exit_code != 0 + + +class TestLeakAction(object): + """Tests for 'config liquid_cool leak-action' command""" + + @classmethod + def setup_class(cls): + print("SETUP") + + def _invoke(self, args, cfgdb=None): + runner = CliRunner() + db = _make_mock_db(cfgdb) + return runner.invoke(liquid_cool, args, obj=db) + + def test_leak_action_system_critical_power_off(self): + mock_cfgdb = MagicMock() + result = self._invoke(['leak-action', 'system', 'critical', 'power_off'], mock_cfgdb) + assert result.exit_code == 0, result.output + mock_cfgdb.mod_entry.assert_called_once_with( + 'LEAK_CONTROL_POLICY', 'policy', {'system_critical_leak_action': 'power_off'} + ) + + def test_leak_action_system_critical_graceful_shutdown(self): + mock_cfgdb = MagicMock() + result = self._invoke(['leak-action', 'system', 'critical', 'graceful_shutdown'], mock_cfgdb) + assert result.exit_code == 0, result.output + mock_cfgdb.mod_entry.assert_called_once_with( + 'LEAK_CONTROL_POLICY', 'policy', {'system_critical_leak_action': 'graceful_shutdown'} + ) + + def test_leak_action_system_critical_syslog_only(self): + mock_cfgdb = MagicMock() + result = self._invoke(['leak-action', 'system', 'critical', 'syslog_only'], mock_cfgdb) + assert result.exit_code == 0, result.output + mock_cfgdb.mod_entry.assert_called_once_with( + 'LEAK_CONTROL_POLICY', 'policy', {'system_critical_leak_action': 'syslog_only'} + ) + + def test_leak_action_system_minor_syslog_only(self): + mock_cfgdb = MagicMock() + result = self._invoke(['leak-action', 'system', 'minor', 'syslog_only'], mock_cfgdb) + assert result.exit_code == 0, result.output + mock_cfgdb.mod_entry.assert_called_once_with( + 'LEAK_CONTROL_POLICY', 'policy', {'system_minor_leak_action': 'syslog_only'} + ) + + def test_leak_action_rack_mgr_critical_syslog_only(self): + mock_cfgdb = MagicMock() + result = self._invoke(['leak-action', 'rack_mgr', 'critical', 'syslog_only'], mock_cfgdb) + assert result.exit_code == 0, result.output + mock_cfgdb.mod_entry.assert_called_once_with( + 'LEAK_CONTROL_POLICY', 'policy', {'rack_mgr_critical_alert_action': 'syslog_only'} + ) + + def test_leak_action_rack_mgr_minor_syslog_only(self): + mock_cfgdb = MagicMock() + result = self._invoke(['leak-action', 'rack_mgr', 'minor', 'syslog_only'], mock_cfgdb) + assert result.exit_code == 0, result.output + mock_cfgdb.mod_entry.assert_called_once_with( + 'LEAK_CONTROL_POLICY', 'policy', {'rack_mgr_minor_alert_action': 'syslog_only'} + ) + + def test_leak_action_invalid_action(self): + result = self._invoke(['leak-action', 'system', 'critical', 'invalid']) + assert result.exit_code != 0 + + def test_leak_action_invalid_severity(self): + result = self._invoke(['leak-action', 'system', 'invalid', 'power_off']) + assert result.exit_code != 0 + + def test_leak_action_missing_args(self): + result = self._invoke(['leak-action', 'system', 'critical']) + assert result.exit_code != 0 diff --git a/tests/show_platform_leak_test.py b/tests/show_platform_leak_test.py new file mode 100644 index 00000000000..4993cf5b0f8 --- /dev/null +++ b/tests/show_platform_leak_test.py @@ -0,0 +1,238 @@ +import os +import sys + +from click.testing import CliRunner +from unittest.mock import MagicMock, patch + +test_path = os.path.dirname(os.path.abspath(__file__)) +modules_path = os.path.dirname(test_path) +sys.path.insert(0, modules_path) + +import show.platform as show_platform # noqa: E402 + + +def _make_state_db(table_data): + """Build a mock SonicV2Connector with STATE_DB data keyed by full Redis keys.""" + state_db = MagicMock() + state_db.STATE_DB = 'STATE_DB' + + def _keys(db_id, pattern): + prefix = pattern.rstrip('*') + return [k for k in table_data if k.startswith(prefix)] + + def _get_all(db_id, key): + return table_data.get(key) + + state_db.keys.side_effect = _keys + state_db.get_all.side_effect = _get_all + return state_db + + +class TestShowPlatformLeakControlPolicy(object): + """Tests for 'show platform leak control-policy'""" + + @classmethod + def setup_class(cls): + print("SETUP") + + def test_leak_control_policy_with_data(self): + runner = CliRunner() + mock_cfgdb = MagicMock() + mock_cfgdb.get_entry.return_value = { + 'system_leak_policy': 'enabled', + 'system_critical_leak_action': 'power_off', + 'system_minor_leak_action': 'syslog_only', + 'rack_mgr_leak_policy': 'disabled', + 'rack_mgr_critical_alert_action': 'syslog_only', + 'rack_mgr_minor_alert_action': 'syslog_only', + } + mock_db = MagicMock() + mock_db.cfgdb = mock_cfgdb + + with patch('utilities_common.db.Db', return_value=mock_db): + result = runner.invoke( + show_platform.platform.commands['leak'].commands['control-policy'] + ) + assert result.exit_code == 0 + assert 'system_leak_policy' in result.output + assert 'enabled' in result.output + assert 'rack_mgr_leak_policy' in result.output + assert 'disabled' in result.output + assert 'power_off' in result.output + + def test_leak_control_policy_empty_db(self): + runner = CliRunner() + mock_cfgdb = MagicMock() + mock_cfgdb.get_entry.return_value = {} + mock_db = MagicMock() + mock_db.cfgdb = mock_cfgdb + + with patch('utilities_common.db.Db', return_value=mock_db): + result = runner.invoke( + show_platform.platform.commands['leak'].commands['control-policy'] + ) + assert result.exit_code == 0 + # Defaults should be shown + assert 'enabled' in result.output + assert 'power_off' in result.output + assert 'syslog_only' in result.output + + +class TestShowPlatformLeakRackManagerAlerts(object): + """Tests for 'show platform leak rack-manager alerts'""" + + @classmethod + def setup_class(cls): + print("SETUP") + + def test_rack_manager_alerts_with_data(self): + runner = CliRunner() + table_data = { + 'RACK_MANAGER_ALERT|Inlet_liquid_temperature': { + 'severity': 'NORMAL', + 'timestamp': '2026-03-25 22:10:00', + }, + 'RACK_MANAGER_ALERT|Rack_level_leak': { + 'severity': 'CRITICAL', + 'timestamp': '2026-03-25 22:11:00', + }, + } + mock_state_db = _make_state_db(table_data) + + with patch('show.platform._get_state_db', return_value=mock_state_db): + result = runner.invoke( + show_platform.platform.commands['leak'].commands['rack-manager'].commands['alerts'] + ) + assert result.exit_code == 0 + assert 'Inlet_liquid_temperature' in result.output + assert 'NORMAL' in result.output + assert 'Rack_level_leak' in result.output + assert 'CRITICAL' in result.output + + def test_rack_manager_alerts_empty(self): + runner = CliRunner() + mock_state_db = _make_state_db({}) + + with patch('show.platform._get_state_db', return_value=mock_state_db): + result = runner.invoke( + show_platform.platform.commands['leak'].commands['rack-manager'].commands['alerts'] + ) + assert result.exit_code == 0 + assert 'No rack-manager alerts found' in result.output + + +class TestShowPlatformLeakProfiles(object): + """Tests for 'show platform leak profiles'""" + + @classmethod + def setup_class(cls): + print("SETUP") + + def test_leak_profiles_with_data(self): + runner = CliRunner() + mock_cfgdb = MagicMock() + mock_cfgdb.get_keys.return_value = ['rope', 'spot', 'flex_pcb'] + mock_cfgdb.get_entry.side_effect = lambda t, k: { + 'rope': {'max_minor_duration_sec': '300'}, + 'spot': {'max_minor_duration_sec': '600'}, + 'flex_pcb': {'max_minor_duration_sec': '180'}, + }[k] + mock_db = MagicMock() + mock_db.cfgdb = mock_cfgdb + + with patch('utilities_common.db.Db', return_value=mock_db): + result = runner.invoke( + show_platform.platform.commands['leak'].commands['profiles'] + ) + assert result.exit_code == 0 + assert 'rope' in result.output + assert '300' in result.output + assert 'spot' in result.output + assert 'flex_pcb' in result.output + assert '180' in result.output + + def test_leak_profiles_empty(self): + runner = CliRunner() + mock_cfgdb = MagicMock() + mock_cfgdb.get_keys.return_value = [] + mock_db = MagicMock() + mock_db.cfgdb = mock_cfgdb + + with patch('utilities_common.db.Db', return_value=mock_db): + result = runner.invoke( + show_platform.platform.commands['leak'].commands['profiles'] + ) + assert result.exit_code == 0 + assert 'No leak profiles found' in result.output + + +class TestShowPlatformLeakStatus(object): + """Tests for 'show platform leak status'""" + + @classmethod + def setup_class(cls): + print("SETUP") + + def test_leak_status_with_data(self): + runner = CliRunner() + table_data = { + 'LIQUID_COOLING_INFO|leakage_sensors1': { + 'name': 'leak_sensors1', + 'leaking': 'YES', + 'leak_sensor_status': 'OK', + 'type': 'rope', + 'leak_severity': 'MINOR', + }, + 'LIQUID_COOLING_INFO|leakage_sensors2': { + 'name': 'leak_sensors2', + 'leaking': 'NO', + 'leak_sensor_status': 'FAULTY', + 'type': 'spot', + 'leak_severity': 'N/A', + }, + } + mock_state_db = _make_state_db(table_data) + + with patch('show.platform._get_state_db', return_value=mock_state_db): + result = runner.invoke( + show_platform.platform.commands['leak'].commands['status'] + ) + assert result.exit_code == 0 + assert 'leak_sensors1' in result.output + assert 'YES' in result.output + assert 'MINOR' in result.output + assert 'leak_sensors2' in result.output + # leaking=NO → severity shown as NA + assert 'NA' in result.output + + def test_leak_status_critical_sensor(self): + runner = CliRunner() + table_data = { + 'LIQUID_COOLING_INFO|leakage_sensorsX': { + 'name': 'leak_sensorsX', + 'leaking': 'Yes', + 'leak_sensor_status': 'OK', + 'type': 'flex_pcb', + 'leak_severity': 'CRITICAL', + }, + } + mock_state_db = _make_state_db(table_data) + + with patch('show.platform._get_state_db', return_value=mock_state_db): + result = runner.invoke( + show_platform.platform.commands['leak'].commands['status'] + ) + assert result.exit_code == 0 + assert 'CRITICAL' in result.output + assert 'flex_pcb' in result.output + + def test_leak_status_empty(self): + runner = CliRunner() + mock_state_db = _make_state_db({}) + + with patch('show.platform._get_state_db', return_value=mock_state_db): + result = runner.invoke( + show_platform.platform.commands['leak'].commands['status'] + ) + assert result.exit_code == 0 + assert 'No leak sensor data found' in result.output diff --git a/tests/test_module.py b/tests/test_module.py index ea68542a296..590451f88b3 100644 --- a/tests/test_module.py +++ b/tests/test_module.py @@ -297,3 +297,64 @@ def test_get_module_state_transition_method_not_found(self, result = module_helper.get_module_state_transition("DPU1") assert result is False mock_log_error.assert_called_once_with("Get module state transition method not found in platform chassis") + + def test_get_module_oper_status_success(self, mock_load_platform_chassis, mock_try_get_args): + mock_try_get_args.side_effect = [1, "Online"] + mock_module = mock.MagicMock() + mock_module.get_oper_status.return_value = "Online" + module_helper.platform_chassis.get_module.return_value = mock_module + + result = module_helper.get_module_oper_status("LINE-CARD0") + assert result == "Online" + + def test_get_module_oper_status_invalid_index(self, mock_load_platform_chassis, + mock_try_get_args, mock_log_error): + mock_try_get_args.return_value = INVALID_MODULE_INDEX + + result = module_helper.get_module_oper_status("LINE-CARD0") + assert result == "N/A" + mock_log_error.assert_called_once_with("Unable to get module-index for LINE-CARD0") + + def test_get_module_oper_status_no_chassis(self, mock_load_platform_chassis, mock_log_error): + original = module_helper.platform_chassis + module_helper.platform_chassis = None + + result = module_helper.get_module_oper_status("LINE-CARD0") + assert result == "N/A" + mock_log_error.assert_called_once_with( + "Platform chassis not loaded, cannot get oper_status for LINE-CARD0" + ) + + module_helper.platform_chassis = original + + def test_get_module_oper_status_none_module(self, mock_load_platform_chassis, + mock_try_get_args, mock_log_error): + mock_try_get_args.return_value = 1 + module_helper.platform_chassis.get_module.return_value = None + + result = module_helper.get_module_oper_status("LINE-CARD0") + assert result == "N/A" + mock_log_error.assert_called_once_with("Unable to get module object for LINE-CARD0") + + def test_get_module_oper_status_method_not_found(self, mock_load_platform_chassis, + mock_try_get_args, mock_log_error): + mock_try_get_args.return_value = 1 + mock_module = object() + module_helper.platform_chassis.get_module.return_value = mock_module + + result = module_helper.get_module_oper_status("LINE-CARD0") + assert result == "N/A" + mock_log_error.assert_called_once_with( + "get_oper_status method not found for module LINE-CARD0" + ) + + def test_get_module_oper_status_not_implemented(self, mock_load_platform_chassis, + mock_try_get_args): + # First call: module_index; second call: get_oper_status raises NotImplementedError → N/A + mock_try_get_args.side_effect = [1, "N/A"] + mock_module = mock.MagicMock() + mock_module.get_oper_status.side_effect = NotImplementedError + module_helper.platform_chassis.get_module.return_value = mock_module + + result = module_helper.get_module_oper_status("LINE-CARD0") + assert result == "N/A" diff --git a/utilities_common/chassis.py b/utilities_common/chassis.py index c95562bb554..9164dcf8f11 100644 --- a/utilities_common/chassis.py +++ b/utilities_common/chassis.py @@ -18,6 +18,10 @@ def get_chassis_local_interfaces(): return lst +def is_bmc(): + return hasattr(device_info, 'is_switch_bmc') and device_info.is_switch_bmc() + + def is_smartswitch(): return hasattr(device_info, 'is_smartswitch') and device_info.is_smartswitch() diff --git a/utilities_common/module.py b/utilities_common/module.py index 7d00f2fd049..b487dd2f831 100644 --- a/utilities_common/module.py +++ b/utilities_common/module.py @@ -220,3 +220,35 @@ def get_module_state_transition(self, module_name): log.log_info("Getting state_transition_in_progress flag for module {}...".format(module_name)) return self.try_get_args(self.platform_chassis.get_module(module_index).get_module_state_transition, module_name, default=False) + + def get_module_oper_status(self, module_name): + """ + Get the operational status of the specified module via the platform API. + + Args: + module_name (str): The name of the module. + Returns: + str: The operational status string (e.g., "Online", "Offline", "Empty"), + or NOT_AVAILABLE if it cannot be retrieved. + """ + if not self.platform_chassis: + log.log_error("Platform chassis not loaded, cannot get oper_status for {}".format(module_name)) + return NOT_AVAILABLE + + module_name = module_name.upper() + module_index = self.try_get_args(self.platform_chassis.get_module_index, module_name, + default=INVALID_MODULE_INDEX) + if module_index < 0: + log.log_error("Unable to get module-index for {}".format(module_name)) + return NOT_AVAILABLE + + module = self.platform_chassis.get_module(module_index) + if module is None: + log.log_error("Unable to get module object for {}".format(module_name)) + return NOT_AVAILABLE + + if not hasattr(module, 'get_oper_status'): + log.log_error("get_oper_status method not found for module {}".format(module_name)) + return NOT_AVAILABLE + + return self.try_get_args(module.get_oper_status, default=NOT_AVAILABLE)