|
| 1 | +"""Module for setting up test that require TLS gateway and/or certificates""" |
| 2 | + |
| 3 | +from weakget import weakget |
| 4 | +import pytest |
| 5 | + |
| 6 | +from testsuite.certificates import Certificate, CertificateManager |
| 7 | +from testsuite.certificates.cfssl.cli import CFSSLProviderCLI |
| 8 | +from testsuite.certificates.stores import InMemoryCertificateStore |
| 9 | +from testsuite.gateways import gateway |
| 10 | +from testsuite.gateways.apicast.tls import TLSApicast |
| 11 | +from testsuite.openshift.objects import SecretKinds |
| 12 | +from testsuite.rhsso import OIDCClientAuthHook |
| 13 | +from testsuite.utils import blame, warn_and_skip |
| 14 | +from keycloak import KeycloakOpenID |
| 15 | + |
| 16 | + |
| 17 | + |
| 18 | + |
| 19 | +from testsuite.rhsso import RHSSOServiceConfiguration, RHSSO |
| 20 | + |
| 21 | + |
| 22 | +###################### setting staging apicast start |
| 23 | + |
| 24 | +@pytest.fixture(scope="session") |
| 25 | +def manager(testconfig): |
| 26 | + """Certificate Manager""" |
| 27 | + provider = CFSSLProviderCLI(binary=testconfig["cfssl"]["binary"]) |
| 28 | + store = InMemoryCertificateStore() |
| 29 | + return CertificateManager(provider, provider, store) |
| 30 | + |
| 31 | +@pytest.fixture(scope="session") |
| 32 | +def superdomain(testconfig): |
| 33 | + """3scale superdomain""" |
| 34 | + return testconfig["threescale"]["superdomain"] |
| 35 | + |
| 36 | +# todo dat tam moji CA |
| 37 | +@pytest.fixture(scope="session") |
| 38 | +def server_authority(request, superdomain, manager, testconfig): |
| 39 | + """CA Authority to be used in the gateway""" |
| 40 | + wildcard_domain = "*." + superdomain |
| 41 | + authority = manager.get_or_create_ca("server-ca", hosts=[wildcard_domain]) |
| 42 | + if not testconfig["skip_cleanup"]: |
| 43 | + request.addfinalizer(authority.delete_files) |
| 44 | + return authority |
| 45 | + |
| 46 | +# pylint: disable=too-many-arguments |
| 47 | +@pytest.fixture(scope="module") |
| 48 | +def staging_gateway(request, server_authority, superdomain, manager, testconfig): |
| 49 | + """Deploy tls apicast gateway. We need APIcast listening on https port""" |
| 50 | + kwargs = { |
| 51 | + "name": blame(request, "tls-gw"), |
| 52 | + "manager": manager, |
| 53 | + "server_authority": server_authority, |
| 54 | + "superdomain": superdomain, |
| 55 | + } |
| 56 | + gw = gateway(kind=TLSApicast, staging=True, **kwargs) |
| 57 | + |
| 58 | + if not testconfig["skip_cleanup"]: |
| 59 | + request.addfinalizer(gw.destroy) |
| 60 | + gw.create() |
| 61 | + |
| 62 | + return gw |
| 63 | + |
| 64 | +###################### setting staging apicast end |
| 65 | + |
| 66 | + |
| 67 | +@pytest.fixture(scope="module", autouse=True) |
| 68 | +def rhsso_setup(lifecycle_hooks, rhsso_service_info): |
| 69 | + """Have application/service with RHSSO auth configured""" |
| 70 | + |
| 71 | + lifecycle_hooks.append(OIDCClientAuthHook(rhsso_service_info)) # todo tady se nastavuje RHSSO pro product na apicastu |
| 72 | + |
| 73 | + |
| 74 | +# todo fuj fuj predelat na fixture? neni to DRY (vykopirovano z tests/conftest.py) |
| 75 | +def _resolve_rhsso(testconfig, tools, rhsso_kind): |
| 76 | + cnf = testconfig["rhsso"] |
| 77 | + if "password" not in cnf: |
| 78 | + return None |
| 79 | + if "username" not in cnf: |
| 80 | + return None |
| 81 | + if "url" in cnf: |
| 82 | + return RHSSO(server_url=cnf["url"], username=cnf["username"], password=cnf["password"]) |
| 83 | + key = "no-ssl-rhbk" |
| 84 | + if rhsso_kind == "rhsso": |
| 85 | + key = "no-ssl-sso" |
| 86 | + return RHSSO(server_url=tools[key], username=cnf["username"], password=cnf["password"]) |
| 87 | + |
| 88 | + |
| 89 | + |
| 90 | +# todo predelat konfiguraci clienta |
| 91 | +@pytest.fixture(scope="session") |
| 92 | +def rhsso_service_info(request, testconfig, tools, rhsso_kind): |
| 93 | + """ |
| 94 | + Set up client for zync |
| 95 | + :return: dict with all important details |
| 96 | + """ |
| 97 | + rhsso = _resolve_rhsso(testconfig, tools, rhsso_kind) |
| 98 | + if not rhsso: |
| 99 | + warn_and_skip("SSO admin password neither discovered not set in config", "fail") |
| 100 | + realm = rhsso.create_realm(blame(request, "realm"), accessTokenLifespan=24 * 60 * 60) |
| 101 | + # todo misto noveho realmu jsem priradil manualne vytvoreny |
| 102 | + # realm = KeycloakOpenID( |
| 103 | + # server_url="https://ssl-rhbk-mstastny-foo.apps.ocp-cluster.osp.api-qe.eng.rdu2.redhat.com", |
| 104 | + # client_id='mtls_client', |
| 105 | + # realm_name='basic', |
| 106 | + # cert=("/home/mstastny/Repos/rh/gitlab/3scale-qe/testsuite-tools/tools/base/rhbk/secrets/client.crt", "/home/mstastny/Repos/rh/gitlab/3scale-qe/testsuite-tools/tools/base/rhbk/secrets/client.key"), |
| 107 | + # verify="/home/mstastny/Repos/rh/gitlab/3scale-qe/testsuite-tools/tools/base/rhbk/secrets/rootCA.pem", |
| 108 | + # ) |
| 109 | + |
| 110 | + if not testconfig["skip_cleanup"]: |
| 111 | + request.addfinalizer(realm.delete) |
| 112 | + |
| 113 | + client = realm.create_client( |
| 114 | + name=blame(request, "mtls_client"), |
| 115 | + serviceAccountsEnabled=True, |
| 116 | + directAccessGrantsEnabled=True, |
| 117 | + publicClient=False, |
| 118 | + protocol="openid-connect", |
| 119 | + standardFlowEnabled=False, |
| 120 | + #verify=False, # todo zkontrolovat |
| 121 | + ) |
| 122 | + |
| 123 | + # todo priradit existijiho clienta |
| 124 | + |
| 125 | + |
| 126 | + cnf = testconfig["rhsso"] |
| 127 | + username = cnf["test_user"]["username"] |
| 128 | + password = cnf["test_user"]["password"] |
| 129 | + user = realm.create_user(username, password) |
| 130 | + |
| 131 | + client.assign_role("manage-clients") |
| 132 | + |
| 133 | + return RHSSOServiceConfiguration(rhsso, realm, client, user, username, password) |
| 134 | + |
| 135 | + |
| 136 | +# @pytest.fixture(scope="module", autouse=True) |
| 137 | +# def require_openshift(testconfig): |
| 138 | +# """These tests require openshift available""" |
| 139 | +# if not weakget(testconfig)["openshift"]["servers"]["default"] % False: |
| 140 | +# warn_and_skip("All from tls skipped due to missing openshift") |
| 141 | +# |
| 142 | +# |
| 143 | +# @pytest.fixture(scope="session") |
| 144 | +# def manager(testconfig): |
| 145 | +# """Certificate Manager""" |
| 146 | +# provider = CFSSLProviderCLI(binary=testconfig["cfssl"]["binary"]) |
| 147 | +# store = InMemoryCertificateStore() |
| 148 | +# return CertificateManager(provider, provider, store) |
| 149 | +# |
| 150 | +# |
| 151 | +# @pytest.fixture(scope="session") |
| 152 | +# def superdomain(testconfig): |
| 153 | +# """3scale superdomain""" |
| 154 | +# return testconfig["threescale"]["superdomain"] |
| 155 | +# |
| 156 | +# |
| 157 | +# @pytest.fixture(scope="session") |
| 158 | +# def server_authority(request, superdomain, manager, testconfig): |
| 159 | +# """CA Authority to be used in the gateway""" |
| 160 | +# wildcard_domain = "*." + superdomain |
| 161 | +# authority = manager.get_or_create_ca("server-ca", hosts=[wildcard_domain]) |
| 162 | +# if not testconfig["skip_cleanup"]: |
| 163 | +# request.addfinalizer(authority.delete_files) |
| 164 | +# return authority |
| 165 | +# |
| 166 | +# |
| 167 | +# # pylint: disable=too-many-arguments |
| 168 | +# @pytest.fixture(scope="module") |
| 169 | +# def staging_gateway(request, server_authority, superdomain, manager, gateway_options, gateway_environment, testconfig): |
| 170 | +# """Deploy tls apicast gateway. We need APIcast listening on https port""" |
| 171 | +# kwargs = { |
| 172 | +# "name": blame(request, "tls-gw"), |
| 173 | +# "manager": manager, |
| 174 | +# "server_authority": server_authority, |
| 175 | +# "superdomain": superdomain, |
| 176 | +# } |
| 177 | +# kwargs.update(gateway_options) |
| 178 | +# gw = gateway(kind=TLSApicast, staging=True, **kwargs) |
| 179 | +# |
| 180 | +# if not testconfig["skip_cleanup"]: |
| 181 | +# request.addfinalizer(gw.destroy) |
| 182 | +# gw.create() |
| 183 | +# |
| 184 | +# if len(gateway_environment) > 0: |
| 185 | +# gw.environ.set_many(gateway_environment) |
| 186 | +# |
| 187 | +# return gw |
| 188 | +# |
| 189 | +# |
| 190 | +# @pytest.fixture(scope="session") |
| 191 | +# def create_cert(request, superdomain, manager, testconfig): |
| 192 | +# """Creates certificate that is valid for entire 3scale subdomain""" |
| 193 | +# host = "*." + superdomain |
| 194 | +# |
| 195 | +# def _create(name: str, certificate_authority: Certificate) -> Certificate: |
| 196 | +# cert = manager.get_or_create(name, common_name=host, hosts=[host], certificate_authority=certificate_authority) |
| 197 | +# |
| 198 | +# if not testconfig["skip_cleanup"]: |
| 199 | +# request.addfinalizer(cert.delete_files) |
| 200 | +# return cert |
| 201 | +# |
| 202 | +# return _create |
| 203 | +# |
| 204 | +# |
| 205 | +# @pytest.fixture(scope="session") |
| 206 | +# def chainify(request, testconfig): |
| 207 | +# """Creates chain from certificate and his certificate authorities""" |
| 208 | +# |
| 209 | +# def _chain(certificate: Certificate, *authorities: Certificate) -> Certificate: |
| 210 | +# entire_chain = [certificate] |
| 211 | +# entire_chain.extend(authorities) |
| 212 | +# chain = Certificate(certificate="".join(cert.certificate for cert in entire_chain), key=certificate.key) |
| 213 | +# if not testconfig["skip_cleanup"]: |
| 214 | +# request.addfinalizer(chain.delete_files) |
| 215 | +# return chain |
| 216 | +# |
| 217 | +# return _chain |
| 218 | +# |
| 219 | +# |
| 220 | +# @pytest.fixture(scope="session") |
| 221 | +# def valid_authority(request, manager, testconfig) -> Certificate: |
| 222 | +# """To be used in tests validating server certificates""" |
| 223 | +# certificate_authority = manager.get_or_create_ca("valid_ca", hosts=["*.com"]) |
| 224 | +# if not testconfig["skip_cleanup"]: |
| 225 | +# request.addfinalizer(certificate_authority.delete_files) |
| 226 | +# return certificate_authority |
| 227 | +# |
| 228 | +# |
| 229 | +# @pytest.fixture(scope="session") |
| 230 | +# def certificate(valid_authority, create_cert) -> Certificate: |
| 231 | +# """Valid certificate for entire 3scale superdomain""" |
| 232 | +# return create_cert("valid", valid_authority) |
| 233 | +# |
| 234 | +# |
| 235 | +# @pytest.fixture(scope="module") |
| 236 | +# def invalid_authority(staging_gateway): |
| 237 | +# """Authority for testing negative case, same as the CA used in gateway""" |
| 238 | +# return staging_gateway.server_authority |
| 239 | +# |
| 240 | +# |
| 241 | +# @pytest.fixture(scope="module") |
| 242 | +# def invalid_certificate(invalid_authority, create_cert) -> Certificate: |
| 243 | +# """Valid certificate for entire 3scale superdomain""" |
| 244 | +# return create_cert("invalid", invalid_authority) |
| 245 | +# |
| 246 | +# |
| 247 | +# @pytest.fixture(scope="module") |
| 248 | +# def mount_certificate_secret(request, staging_gateway, testconfig): |
| 249 | +# """Mount volume from TLS secret on staging gateway.""" |
| 250 | +# |
| 251 | +# def _mount(mount_path, certificate): |
| 252 | +# secret_name = blame(request, "tls") |
| 253 | +# |
| 254 | +# def turn_down(): |
| 255 | +# staging_gateway.openshift.delete("secret", secret_name) |
| 256 | +# |
| 257 | +# if not testconfig["skip_cleanup"]: |
| 258 | +# request.addfinalizer(turn_down) |
| 259 | +# |
| 260 | +# staging_gateway.openshift.secrets.create(secret_name, SecretKinds.TLS, certificate=certificate) |
| 261 | +# staging_gateway.deployment.add_volume(secret_name, mount_path, secret_name) |
| 262 | +# |
| 263 | +# return _mount |
| 264 | +# |
| 265 | +# |
| 266 | +# # pylint: disable=too-many-arguments |
| 267 | +# @pytest.fixture(scope="module") |
| 268 | +# def service(request, backends_mapping, custom_service, service_proxy_settings, lifecycle_hooks, policy_settings): |
| 269 | +# """Preconfigured service, which is created for each policy settings, which are often parametrized in this module""" |
| 270 | +# service = custom_service( |
| 271 | +# {"name": blame(request, "svc")}, service_proxy_settings, backends_mapping, hooks=lifecycle_hooks |
| 272 | +# ) |
| 273 | +# if policy_settings: |
| 274 | +# service.proxy.list().policies.append(policy_settings) |
| 275 | +# return service |
| 276 | +# |
| 277 | +# |
| 278 | +# @pytest.fixture(scope="module") |
| 279 | +# def gateway_options(): |
| 280 | +# """Additional options to pass to staging gateway constructor""" |
| 281 | +# return {} |
| 282 | +# |
| 283 | +# |
| 284 | +# @pytest.fixture(scope="module") |
| 285 | +# def gateway_environment(): |
| 286 | +# """Allows setting environment for tls tests""" |
| 287 | +# return {} |
0 commit comments