|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +# |
| 3 | +# --- BEGIN_HEADER --- |
| 4 | +# |
| 5 | +# test_mig_shared_cmdapi - unit tests for cmdapi helper functions |
| 6 | +# Copyright (C) 2003-2026 The MiG Project by the Science HPC Center at UCPH |
| 7 | +# |
| 8 | +# This file is part of MiG. |
| 9 | +# |
| 10 | +# MiG is free software: you can redistribute it and/or modify |
| 11 | +# it under the terms of the GNU General Public License as published by |
| 12 | +# the Free Software Foundation; either version 2 of the License, or |
| 13 | +# (at your option) any later version. |
| 14 | +# |
| 15 | +# MiG is distributed in the hope that it will be useful, |
| 16 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 17 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 18 | +# GNU General Public License for more details. |
| 19 | +# |
| 20 | +# You should have received a copy of the GNU General Public License |
| 21 | +# along with this program; if not, write to the Free Software |
| 22 | +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, |
| 23 | +# USA. |
| 24 | +# |
| 25 | +# -- END_HEADER --- |
| 26 | +# |
| 27 | + |
| 28 | +"""Unit tests for cmdapi helper module""" |
| 29 | + |
| 30 | +import unittest |
| 31 | + |
| 32 | +# Imports required for the unit test wrapping |
| 33 | + |
| 34 | +# Imports of the code under test |
| 35 | +from mig.shared.cmdapi import ( |
| 36 | + get_flag_map, |
| 37 | + get_command_map, |
| 38 | + get_usage_map, |
| 39 | + legacy_main, |
| 40 | + map_args_to_vars, |
| 41 | + parse_command_args, |
| 42 | +) |
| 43 | + |
| 44 | +# Imports required for the unit tests themselves |
| 45 | +from tests.support import MigTestCase, ensure_dirs_exist, testmain |
| 46 | + |
| 47 | + |
| 48 | +class TestMigSharedCmdapi(MigTestCase): |
| 49 | + """Unit tests for cmdapi helpers""" |
| 50 | + |
| 51 | + def _provide_configuration(self): |
| 52 | + """Return configuration to use""" |
| 53 | + return "testconfig" |
| 54 | + |
| 55 | + def before_each(self): |
| 56 | + """Create test environment for cmdapi tests""" |
| 57 | + ensure_dirs_exist(self.configuration.mig_system_files) |
| 58 | + |
| 59 | + def test_get_command_map_basic(self): |
| 60 | + """Test basic command map structure""" |
| 61 | + command_map = get_command_map(self.configuration) |
| 62 | + self.assertIsInstance(command_map, dict) |
| 63 | + self.assertIn('cp', command_map) |
| 64 | + self.assertIn('mv', command_map) |
| 65 | + self.assertIn('rm', command_map) |
| 66 | + self.assertIn('mkdir', command_map) |
| 67 | + self.assertIn('du', command_map) |
| 68 | + |
| 69 | + def test_get_command_map_with_jobs_enabled(self): |
| 70 | + """Test command map includes job commands when enabled""" |
| 71 | + self.configuration.site_enable_jobs = True |
| 72 | + command_map = get_command_map(self.configuration) |
| 73 | + self.assertIn('submit', command_map) |
| 74 | + self.assertIn('canceljob', command_map) |
| 75 | + self.assertIn('resubmit', command_map) |
| 76 | + self.assertIn('jobaction', command_map) |
| 77 | + self.assertIn('liveio', command_map) |
| 78 | + |
| 79 | + def test_get_command_map_with_sharelinks_enabled(self): |
| 80 | + """Test command map includes sharelink commands when enabled""" |
| 81 | + self.configuration.site_enable_sharelinks = True |
| 82 | + command_map = get_command_map(self.configuration) |
| 83 | + self.assertIn('delsharelink', command_map) |
| 84 | + self.assertIn('importsharelink', command_map) |
| 85 | + |
| 86 | + def test_get_command_map_with_transfers_enabled(self): |
| 87 | + """Test command map includes transfer commands when enabled""" |
| 88 | + self.configuration.site_enable_transfers = True |
| 89 | + command_map = get_command_map(self.configuration) |
| 90 | + self.assertIn('datatransfer', command_map) |
| 91 | + |
| 92 | + def test_get_command_map_with_freeze_enabled(self): |
| 93 | + """Test command map includes freeze commands when enabled""" |
| 94 | + self.configuration.site_enable_freeze = True |
| 95 | + command_map = get_command_map(self.configuration) |
| 96 | + self.assertIn('createbackup', command_map) |
| 97 | + self.assertIn('deletebackup', command_map) |
| 98 | + self.assertIn('addfreezedata', command_map) |
| 99 | + self.assertIn('importfreeze', command_map) |
| 100 | + |
| 101 | + def test_get_command_map_with_crontab_enabled(self): |
| 102 | + """Test command map includes crontab commands when enabled""" |
| 103 | + self.configuration.site_enable_crontab = True |
| 104 | + command_map = get_command_map(self.configuration) |
| 105 | + self.assertIn('crontab', command_map) |
| 106 | + |
| 107 | + def test_get_command_map(self): |
| 108 | + """Test that get_command_map returns expected command definitions""" |
| 109 | + cmd_map = get_command_map(self.configuration) |
| 110 | + # Only a subset is relevant for basic tests |
| 111 | + expected_subset = { |
| 112 | + 'pack': ['src', 'dst'], |
| 113 | + 'unpack': ['src', 'dst'], |
| 114 | + 'zip': ['src', 'dst'], |
| 115 | + 'unzip': ['src', 'dst'], |
| 116 | + 'tar': ['src', 'dst'], |
| 117 | + 'untar': ['src', 'dst'], |
| 118 | + 'cp': ['src', 'dst'], |
| 119 | + 'mv': ['src', 'dst'], |
| 120 | + 'rm': ['path'], |
| 121 | + 'du': ['path', 'dst'], |
| 122 | + 'rmdir': ['path'], |
| 123 | + 'truncate': ['path'], |
| 124 | + 'touch': ['path'], |
| 125 | + 'mkdir': ['path'], |
| 126 | + 'chksum': ['hash_algo', 'path', 'dst', 'max_chunks'], |
| 127 | + 'mqueue': ['queue', 'action', 'msg_id', 'msg'], |
| 128 | + } |
| 129 | + for cmd, args in expected_subset.items(): |
| 130 | + self.assertIn(cmd, cmd_map) |
| 131 | + self.assertEqual(cmd_map[cmd][:len(args)], args) |
| 132 | + |
| 133 | + def test_get_flag_map_structure(self): |
| 134 | + """Test flag map structure""" |
| 135 | + flag_map = get_flag_map(self.configuration) |
| 136 | + self.assertIsInstance(flag_map, dict) |
| 137 | + self.assertIn('cp', flag_map) |
| 138 | + self.assertIn('rm', flag_map) |
| 139 | + self.assertIn('du', flag_map) |
| 140 | + self.assertIn('mkdir', flag_map) |
| 141 | + self.assertIn('rmdir', flag_map) |
| 142 | + |
| 143 | + def test_get_flag_map_values(self): |
| 144 | + """Test flag map values""" |
| 145 | + flag_map = get_flag_map(self.configuration) |
| 146 | + self.assertEqual(flag_map['cp'], ['r', 'f']) |
| 147 | + self.assertEqual(flag_map['rm'], ['r', 'f']) |
| 148 | + self.assertEqual(flag_map['du'], ['s']) |
| 149 | + self.assertEqual(flag_map['mkdir'], ['p']) |
| 150 | + self.assertEqual(flag_map['rmdir'], ['p']) |
| 151 | + |
| 152 | + def test_get_flag_map(self): |
| 153 | + """Test that get_flag_map returns expected flag definitions""" |
| 154 | + flags = get_flag_map(self.configuration) |
| 155 | + expected = { |
| 156 | + 'cp': ['r', 'f'], |
| 157 | + 'rm': ['r', 'f'], |
| 158 | + 'du': ['s'], |
| 159 | + 'mkdir': ['p'], |
| 160 | + 'rmdir': ['p'], |
| 161 | + 'importsharelink': ['r', 'f'], |
| 162 | + 'importfreeze': ['r', 'f'], |
| 163 | + } |
| 164 | + self.assertEqual(flags, expected) |
| 165 | + |
| 166 | + def test_get_usage_map_structure(self): |
| 167 | + """Test usage map structure""" |
| 168 | + usage_map = get_usage_map(self.configuration) |
| 169 | + self.assertIsInstance(usage_map, dict) |
| 170 | + self.assertIn('cp', usage_map) |
| 171 | + self.assertIn('mv', usage_map) |
| 172 | + self.assertIn('rm', usage_map) |
| 173 | + self.assertIn('mkdir', usage_map) |
| 174 | + self.assertIn('du', usage_map) |
| 175 | + |
| 176 | + def test_get_usage_map_values(self): |
| 177 | + """Test usage map values""" |
| 178 | + usage_map = get_usage_map(self.configuration) |
| 179 | + self.assertEqual(usage_map['cp'], 'cp [-r] [-f] SRC [SRC ..] DST') |
| 180 | + self.assertEqual(usage_map['mv'], 'mv SRC [SRC ..] DST') |
| 181 | + self.assertEqual(usage_map['rm'], 'rm [-r] [-f] PATH [PATH ..]') |
| 182 | + self.assertEqual(usage_map['mkdir'], 'mkdir [-p] PATH [PATH ..]') |
| 183 | + self.assertEqual(usage_map['du'], 'du [-s] PATH [PATH ..] DST') |
| 184 | + |
| 185 | + def test_get_usage_map(self): |
| 186 | + """Test that get_usage_map builds usage strings correctly""" |
| 187 | + usage = get_usage_map(self.configuration) |
| 188 | + # Check a known command |
| 189 | + self.assertIn('cp', usage) |
| 190 | + self.assertIn('[-r]', usage['cp']) |
| 191 | + self.assertIn('SRC [SRC ..] DST', usage['cp']) |
| 192 | + |
| 193 | + def test_map_args_to_vars_variable_length(self): |
| 194 | + """Test that map_args_to_vars expands variable length arguments""" |
| 195 | + var_list = ['src', 'dst'] |
| 196 | + arg_list = ['a.txt', 'b.txt', 'c.txt'] |
| 197 | + result = map_args_to_vars(var_list, arg_list) |
| 198 | + self.assertEqual( |
| 199 | + result, {'src': ['a.txt', 'b.txt'], 'dst': ['c.txt']} |
| 200 | + ) |
| 201 | + |
| 202 | + def test_map_args_to_vars_exact_match(self): |
| 203 | + """Test map_args_to_vars with exact number of arguments""" |
| 204 | + var_list = ['src', 'dst'] |
| 205 | + arg_list = ['a.txt', 'b.txt'] |
| 206 | + result = map_args_to_vars(var_list, arg_list) |
| 207 | + self.assertEqual(result, {'src': ['a.txt'], 'dst': ['b.txt']}) |
| 208 | + |
| 209 | + def test_parse_command_args_basic(self): |
| 210 | + """Test that parse_command_args parses a simple command correctly""" |
| 211 | + cmd_list = ['cp', 'srcfile', 'dstfile'] |
| 212 | + backend, args_dict = parse_command_args(self.configuration, cmd_list) |
| 213 | + self.assertEqual(backend, 'cp') |
| 214 | + self.assertEqual(args_dict.get('src'), ['srcfile']) |
| 215 | + self.assertEqual(args_dict.get('dst'), ['dstfile']) |
| 216 | + |
| 217 | + def test_parse_command_args_with_flags(self): |
| 218 | + """Test that parse_command_args handles flags correctly""" |
| 219 | + cmd_list = ['cp', '-r', 'srcdir', 'dstdir'] |
| 220 | + backend, args_dict = parse_command_args(self.configuration, cmd_list) |
| 221 | + self.assertEqual(backend, 'cp') |
| 222 | + self.assertIn('flags', args_dict) |
| 223 | + self.assertEqual(args_dict['flags'], ['r']) |
| 224 | + |
| 225 | + def test_parse_command_args_with_multiple_flags(self): |
| 226 | + """Test that parse_command_args handles multiple combined flags""" |
| 227 | + cmd_list = ['cp', '-rf', 'srcdir', 'dstdir'] |
| 228 | + backend, args_dict = parse_command_args(self.configuration, cmd_list) |
| 229 | + self.assertEqual(backend, 'cp') |
| 230 | + self.assertIn('flags', args_dict) |
| 231 | + self.assertEqual(args_dict['flags'], ['rf']) |
| 232 | + |
| 233 | + def test_parse_command_args_unsupported(self): |
| 234 | + """Test that parse_command_args raises on unsupported command""" |
| 235 | + cmd_list = ['unknown_cmd', 'arg1'] |
| 236 | + with self.assertRaises(ValueError) as cm: |
| 237 | + parse_command_args(self.configuration, cmd_list) |
| 238 | + self.assertIn('unsupported command', str(cm.exception)) |
| 239 | + |
| 240 | + def test_parse_command_args_delsharelink_no_flags_entry(self): |
| 241 | + """Regression: ensure commands without flags don't produce flags key""" |
| 242 | + self.configuration.site_enable_sharelinks = True |
| 243 | + cmd_list = ['delsharelink', 'share123'] |
| 244 | + backend, args_dict = parse_command_args(self.configuration, cmd_list) |
| 245 | + self.assertEqual(backend, 'delsharelink') |
| 246 | + self.assertNotIn('flags', args_dict) |
| 247 | + self.assertEqual(args_dict.get('share_id'), ['share123']) |
| 248 | + |
| 249 | + def test_parse_command_args_canceljob_no_flags_entry(self): |
| 250 | + """Regression: ensure commands without flags don't produce flags key""" |
| 251 | + self.configuration.site_enable_jobs = True |
| 252 | + cmd_list = ['canceljob', 'job123'] |
| 253 | + backend, args_dict = parse_command_args(self.configuration, cmd_list) |
| 254 | + self.assertEqual(backend, 'canceljob') |
| 255 | + self.assertNotIn('flags', args_dict) |
| 256 | + self.assertEqual(args_dict.get('job_id'), ['job123']) |
| 257 | + |
| 258 | + def test_parse_command_args_datatransfer_no_flags_entry(self): |
| 259 | + """Regression: ensure commands without flags don't produce flags key""" |
| 260 | + self.configuration.site_enable_transfers = True |
| 261 | + cmd_list = ['datatransfer', 'transfer123'] |
| 262 | + backend, args_dict = parse_command_args(self.configuration, cmd_list) |
| 263 | + self.assertEqual(backend, 'datatransfer') |
| 264 | + self.assertNotIn('flags', args_dict) |
| 265 | + self.assertEqual(args_dict.get('transfer_id'), ['transfer123']) |
| 266 | + |
| 267 | + def test_parse_command_args_deletebackup_no_flags_entry(self): |
| 268 | + """Regression: ensure commands without flags don't produce flags key""" |
| 269 | + self.configuration.site_enable_freeze = True |
| 270 | + cmd_list = ['deletebackup', 'backup123'] |
| 271 | + backend, args_dict = parse_command_args(self.configuration, cmd_list) |
| 272 | + self.assertEqual(backend, 'deletebackup') |
| 273 | + self.assertNotIn('flags', args_dict) |
| 274 | + self.assertEqual(args_dict.get('freeze_id'), ['backup123']) |
| 275 | + |
| 276 | + def test_parse_command_args_crontab_no_flags_entry(self): |
| 277 | + """Regression: ensure commands without flags don't produce flags key""" |
| 278 | + self.configuration.site_enable_crontab = True |
| 279 | + cmd_list = ['crontab', 'transfer123', 'reschedule'] |
| 280 | + backend, args_dict = parse_command_args(self.configuration, cmd_list) |
| 281 | + self.assertEqual(backend, 'crontab') |
| 282 | + self.assertNotIn('flags', args_dict) |
| 283 | + self.assertEqual(args_dict.get('action'), ['reschedule']) |
| 284 | + |
| 285 | + def test_parse_command_args_mqueue_no_flags_entry(self): |
| 286 | + """Regression: ensure commands without flags don't produce flags key""" |
| 287 | + self.configuration.site_enable_jobs = True |
| 288 | + cmd_list = ['mqueue', 'testqueue', 'msgaction', 'msgid', 'test msg'] |
| 289 | + backend, args_dict = parse_command_args(self.configuration, cmd_list) |
| 290 | + self.assertEqual(backend, 'mqueue') |
| 291 | + self.assertNotIn('flags', args_dict) |
| 292 | + self.assertEqual(args_dict.get('queue'), ['testqueue']) |
| 293 | + self.assertEqual(args_dict.get('msg'), ['test msg']) |
| 294 | + |
| 295 | + |
| 296 | +class TestMigSharedCmdapi__legacy_main(MigTestCase): |
| 297 | + """Unit tests for legacy cmdapi self-checks""" |
| 298 | + |
| 299 | + def test_existing_main(self): |
| 300 | + """Run the legacy self-tests directly in module""" |
| 301 | + |
| 302 | + def raise_on_error_exit(exit_code): |
| 303 | + if exit_code != 0: |
| 304 | + if raise_on_error_exit.last_print is not None: |
| 305 | + identifying_message = raise_on_error_exit.last_print |
| 306 | + else: |
| 307 | + identifying_message = "unknown" |
| 308 | + raise AssertionError( |
| 309 | + "failure in unittest/testcore: %s" % (identifying_message,) |
| 310 | + ) |
| 311 | + |
| 312 | + raise_on_error_exit.last_print = None |
| 313 | + |
| 314 | + def record_last_print(value): |
| 315 | + """Keep track of printed output""" |
| 316 | + raise_on_error_exit.last_print = value |
| 317 | + |
| 318 | + legacy_main(_exit=raise_on_error_exit, _print=record_last_print) |
| 319 | + |
| 320 | + |
| 321 | +if __name__ == '__main__': |
| 322 | + testmain() |
0 commit comments