Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions mig/lib/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,12 +557,10 @@
# ret_msg, txt_out))


def main(_exit=sys.exit, _print=print):
def legacy_main(conf, print=print, _exit=sys.exit):
"""Run module self-tests"""
from mig.shared.conf import get_configuration_object

conf = get_configuration_object()
client_id = "/C=DK/ST=NA/L=NA/O=NBI/OU=NA/CN=Jonas Bardino/emailAddress=bardino@nbi.ku.dk"

Check warning on line 563 in mig/lib/events.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (94 > 80 characters)
now = datetime.datetime.now()
now = now.replace(second=0, microsecond=0)
trigger_rule = {
Expand All @@ -589,7 +587,7 @@
print(" %s: %s" % (key, val))

crontab_lines = [
"* * * * * pack cront-test.txt cron-test-+SCHEDYEAR+-+SCHEDMONTH+-+SCHEDDAY+.zip"

Check warning on line 590 in mig/lib/events.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (89 > 80 characters)
]
crontab_rules = parse_crontab_contents(conf, client_id, crontab_lines)
cron_times = [
Expand Down Expand Up @@ -621,6 +619,10 @@
"At %s job is %dm in the future for rule" % (timestamp, remain)
)

_exit(0)


if __name__ == "__main__":
main()
from mig.shared.conf import get_configuration_object
conf = get_configuration_object(skip_log=True, disable_auth_log=True)
legacy_main(conf)
11 changes: 6 additions & 5 deletions mig/shared/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
Expand All @@ -20,7 +20,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

Check warning on line 23 in mig/shared/base.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (81 > 80 characters)
#
# -- END_HEADER ---
#
Expand Down Expand Up @@ -258,7 +258,7 @@
(key, val) = field.split(_key_val_sep, 1)
if 'NA' == val:
val = ''
if not key in cert_field_map.values():

Check warning on line 261 in mig/shared/base.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

test for membership should be 'not in'
user_dict[key] = val
else:
for (name, short) in cert_field_order:
Expand Down Expand Up @@ -295,7 +295,7 @@
"""
canonical = {}
for (key, val) in user_dict.items():
if not key in limit_fields:

Check warning on line 298 in mig/shared/base.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

test for membership should be 'not in'
continue
if isinstance(val, basestring):
val = val.strip()
Expand Down Expand Up @@ -450,7 +450,7 @@
return re.sub(r'[^a-zA-Z0-9:/._-]+', '', page_path)


def requested_url_base(environ=None, include_unsafe=False, uri_field='SCRIPT_URI'):

Check warning on line 453 in mig/shared/base.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (83 > 80 characters)
"""Lookup requested url base from environ or os.environ if not provided.
If the include_unsafe arg is set the result includes potentially unsafe
values without proper filtering and thus MUST be used very carefully,
Expand Down Expand Up @@ -503,7 +503,7 @@
`isinstance(val, unicode)`
and the like since it breaks when combined with python-future and futurize.
"""
return (type(u"") == type(val))

Check warning on line 506 in mig/shared/base.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

do not compare types, for exact checks use `is` / `is not`, for instance checks use `isinstance()`


def force_utf8(val, highlight='', stringify=True):
Expand Down Expand Up @@ -540,7 +540,7 @@
elif isinstance(input_obj, list):
return [force_utf8_rec(i, highlight, stringify) for i in input_obj]
elif isinstance(input_obj, tuple):
return tuple([force_utf8_rec(i, highlight, stringify) for i in input_obj])

Check warning on line 543 in mig/shared/base.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (82 > 80 characters)
elif is_unicode(input_obj):
return force_utf8(input_obj, highlight, stringify)
else:
Expand Down Expand Up @@ -581,7 +581,7 @@
elif isinstance(input_obj, list):
return [force_unicode_rec(i, highlight, stringify) for i in input_obj]
elif isinstance(input_obj, tuple):
return tuple([force_unicode_rec(i, highlight, stringify) for i in input_obj])

Check warning on line 584 in mig/shared/base.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

line too long (85 > 80 characters)
elif not is_unicode(input_obj):
return force_unicode(input_obj, highlight, stringify)
else:
Expand Down Expand Up @@ -801,7 +801,7 @@
ext_oidc_url = configuration.migserver_https_ext_oidc_url
locations = []
for i in configuration.site_login_methods:
if i == 'migcert' and mig_cert_url and not mig_cert_url in locations:

Check warning on line 804 in mig/shared/base.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

test for membership should be 'not in'
locations.append(mig_cert_url)
elif i == 'extcert' and ext_cert_url and not ext_cert_url in locations:
locations.append(ext_cert_url)
Expand Down Expand Up @@ -951,7 +951,7 @@
return auth_map.get(auth_type, 'UNKNOWN')


def legacy_main(_exit=sys.exit, _print=print):
def legacy_main(configuration, print=print, _exit=sys.exit):
"""Run module self-tests"""
orig_id = '/C=DK/ST=NA/L=NA/O=Ajax Inc/OU=NA/CN=John Doe/emailAddress=john.doe@ajaxinc.org'
client_dir = client_id_dir(orig_id)
Expand Down Expand Up @@ -986,9 +986,6 @@
for path in legal:
print(" %s: %s" % (path, not invisible_path(path)))

from mig.shared.conf import get_configuration_object
configuration = get_configuration_object(
skip_log=True, disable_auth_log=True)
print("check script restrictions:")
for script_name in ['reqoid.py', 'ls.py', 'sharelink.py', 'put']:
(allow, msg) = allow_script(configuration, script_name, '')
Expand Down Expand Up @@ -1035,6 +1032,10 @@
print("Found unsafe page for %r : %s" %
(script_uri, requested_page(include_unsafe=True)))

_exit(0)


if __name__ == '__main__':
legacy_main()
from mig.shared.conf import get_configuration_object
conf = get_configuration_object(skip_log=True, disable_auth_log=True)
legacy_main(conf)
10 changes: 6 additions & 4 deletions mig/shared/pwcrypto.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
Expand Down Expand Up @@ -61,12 +61,12 @@
from cryptography.fernet import Fernet
except ImportError:
# Optional Fernet not available - fail gracefully and check before use
Fernet = None

Check failure on line 64 in mig/shared/pwcrypto.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Cannot assign to a type [misc]
try:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
except ImportError:
# Optional AESGCM not available - fail gracefully and check before use
AESGCM = None

Check failure on line 69 in mig/shared/pwcrypto.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Cannot assign to a type [misc]

from mig.shared.defaults import POLICY_NONE, POLICY_WEAK, POLICY_MEDIUM, \
POLICY_HIGH, POLICY_MODERN, POLICY_CUSTOM, PASSWORD_POLICIES
Expand Down Expand Up @@ -373,7 +373,7 @@
invocation.
"""
_logger = configuration.logger
if cryptography and Fernet:

Check failure on line 376 in mig/shared/pwcrypto.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Function "Fernet" could always be true in boolean context [truthy-function]
key = prepare_fernet_key(configuration, secret)
password = force_utf8(password)
fernet_helper = Fernet(key)
Expand All @@ -387,7 +387,7 @@
def fernet_decrypt_password(configuration, encrypted, secret=keyword_auto):
"""Decrypt Fernet encrypted password"""
_logger = configuration.logger
if cryptography and Fernet:

Check failure on line 390 in mig/shared/pwcrypto.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Function "Fernet" could always be true in boolean context [truthy-function]
key = prepare_fernet_key(configuration, secret)
encrypted = force_utf8(encrypted)
fernet_helper = Fernet(key)
Expand Down Expand Up @@ -508,7 +508,7 @@
_logger = configuration.logger
# Based on complete example of securely encrypting with AES GCM from
# https://cryptography.io/en/latest/hazmat/primitives/symmetric-encryption
if cryptography and AESGCM:

Check failure on line 511 in mig/shared/pwcrypto.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Function "AESGCM" could always be true in boolean context [truthy-function]
init_vector = prepare_aesgcm_iv(configuration, init_vector)
auth_data = prepare_aesgcm_aad(configuration, AAD_PREFIX, aad_stamp)
password = force_utf8(password)
Expand All @@ -534,7 +534,7 @@
_logger = configuration.logger
# Based on complete example of securely decrypting with AES GCM from
# https://cryptography.io/en/latest/hazmat/primitives/symmetric-encryption
if cryptography and AESGCM:

Check failure on line 537 in mig/shared/pwcrypto.py

View workflow job for this annotation

GitHub Actions / Style check python and annotate

Function "AESGCM" could always be true in boolean context [truthy-function]
(iv, crypt, aad) = __aesgcm_unpack_tuple(encrypted, base64_enc)
if init_vector == keyword_auto:
init_vector = iv
Expand Down Expand Up @@ -1028,10 +1028,8 @@
raise ValueError("Failed to generate suitable password!")


def main(_exit=sys.exit, _print=print):
def legacy_main(configuration, print=print, _exit=sys.exit):
"""Run module self-tests"""
from mig.shared.conf import get_configuration_object
configuration = get_configuration_object()
dummy_user = {'distinguished_name': 'Test User', 'password_hash': ''}
pw_tests = (
'', 'abc', 'dbey3h', 'abcdefgh', '12345678', 'test1234',
Expand Down Expand Up @@ -1117,6 +1115,10 @@
print(
"Failed to handle aesgcm static encrypt/decrypt %s : %s" % (pw, exc))

_exit(0)


if __name__ == "__main__":
main()
from mig.shared.conf import get_configuration_object
conf = get_configuration_object(skip_log=True, disable_auth_log=True)
legacy_main(conf)
18 changes: 10 additions & 8 deletions mig/shared/userio.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
Expand Down Expand Up @@ -575,20 +575,18 @@
exc))


def legacy_main(_exit=sys.exit, _print=print):
def legacy_main(configuration, print=print, _exit=sys.exit, _argv=sys.argv):
"""Run module self-tests"""
from mig.shared.base import client_id_dir
from mig.shared.conf import get_configuration_object
from mig.shared.defaults import htaccess_filename
print("Unit testing user I/O")
client_id = "/C=DK/ST=NA/L=NA/O=NBI/OU=NA/CN=Jonas Bardino/emailAddress=bardino@nbi.ku.dk"
sub_dir = '.'
if sys.argv[1:]:
client_id = sys.argv[1]
if sys.argv[2:]:
sub_dir = sys.argv[2]
if _argv[1:]:
client_id = _argv[1]
if _argv[2:]:
sub_dir = _argv[2]
client_dir = client_id_dir(client_id)
configuration = get_configuration_object()
configuration.mig_server_id = "localhost"
tmp_dir = os.path.join("userio-testdir", sub_dir)
real_tmp = os.path.normpath(os.path.join(configuration.user_home,
Expand Down Expand Up @@ -680,6 +678,10 @@
events_path = os.path.join(configuration.events_home, name)
print("%s : %db" % (name, os.stat(events_path).st_size))

_exit(0)


if __name__ == "__main__":
legacy_main()
from mig.shared.conf import get_configuration_object
conf = get_configuration_object(skip_log=True, disable_auth_log=True)
legacy_main(conf)
10 changes: 6 additions & 4 deletions mig/shared/vgrid.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
Expand Down Expand Up @@ -2321,9 +2321,7 @@
return (success, msg)


def legacy_main(_exit=sys.exit, _print=print):
from mig.shared.conf import get_configuration_object
conf = get_configuration_object(skip_log=True, disable_auth_log=True)
def legacy_main(conf, print=print, _exit=sys.exit):
client_id = '/C=DK/CN=John Doe/emailAddress=john@doe.org'
if sys.argv[1:]:
client_id = sys.argv[1]
Expand Down Expand Up @@ -2445,6 +2443,10 @@
print(vgrid_set_members(conf, dummy_vgrid, orig_members))
print(vgrid_members(dummy_vgrid, conf))

_exit(0)


if __name__ == "__main__":
legacy_main()
from mig.shared.conf import get_configuration_object
conf = get_configuration_object(skip_log=True, disable_auth_log=True)
legacy_main(conf)
42 changes: 3 additions & 39 deletions mig/unittest/testcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,48 +36,11 @@
import time
import logging

sys.path.append(os.path.realpath(
os.path.join(os.path.dirname(__file__), "../..")))
from tests.support import MIG_BASE, PY2, is_path_within

from mig.shared.base import client_id_dir, client_dir_id, get_short_id, \
invisible_path, allow_script, brief_list


_TEST_CONF_FILE = os.environ['MIG_CONF']
_TEST_CONF_DIR = os.path.dirname(_TEST_CONF_FILE)
_TEST_CONF_SYMLINK = os.path.join(MIG_BASE, "envhelp/output/testconfs")


def _assert_local_config():
try:
#link_stat = os.lstat(_TEST_CONF_SYMLINK)
#assert stat.S_ISLNK(link_stat.st_mode)
_test_conf_dir = os.path.dirname(_TEST_CONF_DIR)
configdir_stat = os.stat(_test_conf_dir)
assert stat.S_ISDIR(configdir_stat.st_mode)
config = ConfigParser()
config.read([_TEST_CONF_FILE])
return config
except Exception as exc:
raise AssertionError(
'local configuration invalid or missing: %s' % (str(exc),))


def _assert_local_config_global_values(config):
config_global_values = dict(config.items('GLOBAL'))

for path in ('mig_path', 'certs_path', 'state_path'):
path_value = config_global_values.get(path)
if not is_path_within(path_value, start=MIG_BASE):
raise AssertionError('local config contains bad path: %s=%s' % (path, path_value))

return config_global_values


def main(configuration, _exit=sys.exit):
config = _assert_local_config()
config_global_values = _assert_local_config_global_values(config)
def legacy_main(configuration, print=print, _exit=sys.exit):

print("Running unit test on shared core functions ..")

Expand Down Expand Up @@ -188,4 +151,5 @@ def main(configuration, _exit=sys.exit):

if __name__ == "__main__":
from mig.shared.conf import get_configuration_object
main(get_configuration_object())
conf = get_configuration_object(skip_log=True, disable_auth_log=True)
legacy_main(conf)
8 changes: 3 additions & 5 deletions tests/test_mig_lib_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@
# Imports of the code under test
from mig.lib.events import _restore_env, _save_env, at_remain, cron_match, \
get_path_expand_map, get_time_expand_map, load_atjobs, load_crontab
from mig.lib.events import main as events_main
from mig.lib.events import parse_and_save_atjobs, parse_and_save_crontab, \
parse_atjobs, parse_atjobs_contents, parse_crontab, \
parse_crontab_contents, run_cron_command, run_events_command
parse_crontab_contents, run_cron_command, run_events_command, legacy_main
# Imports required for the unit tests themselves
from tests.support import MigTestCase, ensure_dirs_exist

Expand Down Expand Up @@ -3395,15 +3394,14 @@ def raise_on_error_exit(exit_code):
else:
identifying_message = "unknown"
raise AssertionError(
"failure in unittest/testcore: %s" % (identifying_message,)
)
'legacy test failure: %s' % (identifying_message,))

raise_on_error_exit.last_print = None

def record_last_print(value):
raise_on_error_exit.last_print = value

events_main(_exit=raise_on_error_exit, _print=record_last_print)
legacy_main(self.configuration, print=record_last_print, _exit=raise_on_error_exit)


if __name__ == "__main__":
Expand Down
11 changes: 7 additions & 4 deletions tests/test_mig_shared_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,25 +1347,28 @@ def test_brief_list_edge_cases(self):
['a', 'b', ' ... shortened ... ', 'f', 'g'])


class TestMigSharedBase__legacy(MigTestCase):
class TestMigSharedBase__legacy_main(MigTestCase):
Comment thread
albu-diku marked this conversation as resolved.
"""Run mig.shared.base legacy self-test"""

def _provide_configuration(self):
return 'testconfig'

# TODO: migrate all legacy self-check functionality into the above?
def test_existing_main(self):
"""Run built-in self-tests and check output"""

def raise_on_error_exit(exit_code):
if exit_code != 0:
if raise_on_error_exit.last_print is not None:
identifying_message = raise_on_error_exit.last_print
else:
identifying_message = 'unknown'
raise AssertionError(
'failure in unittest/testcore: %s' %
(identifying_message,))
'legacy test failure: %s' % (identifying_message,))
raise_on_error_exit.last_print = None

def record_last_print(value):
"""Keep track of printed output"""
raise_on_error_exit.last_print = value

legacy_main(_exit=raise_on_error_exit, _print=record_last_print)
legacy_main(self.configuration, print=record_last_print, _exit=raise_on_error_exit)
12 changes: 9 additions & 3 deletions tests/test_mig_shared_pwcrypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

from mig.shared.defaults import POLICY_NONE, POLICY_WEAK, POLICY_MEDIUM, \
POLICY_HIGH, POLICY_MODERN, POLICY_CUSTOM, PASSWORD_POLICIES
from mig.shared.pwcrypto import main as pwcrypto_main
from mig.shared.pwcrypto import *

DUMMY_USER = "dummy-user"
Expand Down Expand Up @@ -708,6 +707,13 @@ def test_generate_random_password_fixed_seed(self):
self.assertEqual(expected, result,
"failed generate password with fixed seed")


class MigSharedPwCrypto__legacy_main(MigTestCase):
Comment thread
albu-diku marked this conversation as resolved.
"""Legacy tests for corresponding module self-checks"""

def _provide_configuration(self):
return 'testconfig'

# TODO: migrate remaining inline checks from module here instead
def test_existing_main(self):
def raise_on_error_exit(exit_code):
Expand All @@ -717,13 +723,13 @@ def raise_on_error_exit(exit_code):
else:
identifying_message = 'unknown'
raise AssertionError(
'failure in unittest/testcore: %s' % (identifying_message,))
'legacy test failure: %s' % (identifying_message,))
raise_on_error_exit.last_print = None

def record_last_print(value):
raise_on_error_exit.last_print = value

pwcrypto_main(_exit=raise_on_error_exit, _print=record_last_print)
legacy_main(self.configuration, print=record_last_print, _exit=raise_on_error_exit)


if __name__ == '__main__':
Expand Down
15 changes: 9 additions & 6 deletions tests/test_mig_shared_userio.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
TEST_OWNER_DN = "/C=DK/ST=NA/L=NA/O=Test Org/OU=NA/CN=Test Owner/emailAddress=owner@example.org"


class TestMigSharedUserIO_legacy_main(MigTestCase, UserAssertMixin):
class TestMigSharedUserIO(MigTestCase, UserAssertMixin):
"""Unit tests for userio related helper functions"""

def _provide_configuration(self):
Expand Down Expand Up @@ -520,29 +520,32 @@ def test_touch_path(self):


class TestMigSharedUserIO__legacy_main(MigTestCase):
"""Legacy tests for safeinput module self-checks"""
"""Legacy tests for corresponding module self-checks"""

def _provide_configuration(self):
return 'testconfig'

# TODO: migrate all legacy self-check functionality into the above?
def test_existing_main(self):
"""Wrap old inline self-test as an additional unit test"""

self.logger.forgive_errors()

def raise_on_error_exit(exit_code):
if exit_code != 0:
if raise_on_error_exit.last_print is not None:
identifying_message = raise_on_error_exit.last_print
else:
identifying_message = "unknown"
raise AssertionError(
"failure in unittest/testcore: %s" % (identifying_message,)
)

'legacy test failure: %s' % (identifying_message,))
raise_on_error_exit.last_print = None

def record_last_print(value):
"""Helper to show last print on error"""
raise_on_error_exit.last_print = value

legacy_main(_exit=raise_on_error_exit, _print=record_last_print)
legacy_main(self.configuration, print=record_last_print, _exit=raise_on_error_exit, _argv=[])


if __name__ == "__main__":
Expand Down
11 changes: 7 additions & 4 deletions tests/test_mig_shared_vgrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -1418,26 +1418,29 @@ def test_workflow_job_priority(self):
class TestMigSharedVgrid__legacy_main(MigTestCase):
"""Unit tests for legacy vgrid self-checks"""

def _provide_configuration(self):
return 'testconfig'

def test_existing_main(self):
"""Run the legacy self-tests directly in module"""

self.logger.forgive_errors()

def raise_on_error_exit(exit_code):
if exit_code != 0:
if raise_on_error_exit.last_print is not None:
identifying_message = raise_on_error_exit.last_print
else:
identifying_message = "unknown"
raise AssertionError(
"failure in unittest/testcore: %s" % (identifying_message,)
)

'legacy test failure: %s' % (identifying_message,))
raise_on_error_exit.last_print = None

def record_last_print(value):
"""Keep track of printed output"""
raise_on_error_exit.last_print = value

legacy_main(_exit=raise_on_error_exit, _print=record_last_print)
legacy_main(self.configuration, print=record_last_print, _exit=raise_on_error_exit)


if __name__ == "__main__":
Expand Down
Loading
Loading