From 3adec4f587504450e2c652360755703313bf2e7a Mon Sep 17 00:00:00 2001 From: danish9039 Date: Tue, 3 Mar 2026 14:04:04 +0530 Subject: [PATCH 01/16] dex: use sticky service with two replicas Signed-off-by: danish9039 --- common/dex/base/deployment.yaml | 2 +- common/dex/base/service.yaml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/common/dex/base/deployment.yaml b/common/dex/base/deployment.yaml index 237b97efb3..ee3b24c23b 100644 --- a/common/dex/base/deployment.yaml +++ b/common/dex/base/deployment.yaml @@ -5,7 +5,7 @@ metadata: app: dex name: dex spec: - replicas: 1 + replicas: 2 selector: matchLabels: app: dex diff --git a/common/dex/base/service.yaml b/common/dex/base/service.yaml index 7c865c8f0f..27fb15dc32 100644 --- a/common/dex/base/service.yaml +++ b/common/dex/base/service.yaml @@ -4,6 +4,7 @@ metadata: name: dex spec: type: ClusterIP + sessionAffinity: ClientIP ports: - name: dex port: 5556 From 625039bea4e00cef3e280bb9898853077eb4ff54 Mon Sep 17 00:00:00 2001 From: danish9039 Date: Wed, 4 Mar 2026 00:47:01 +0530 Subject: [PATCH 02/16] dex: bound sticky service timeout Signed-off-by: danish9039 --- common/dex/base/service.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/common/dex/base/service.yaml b/common/dex/base/service.yaml index 27fb15dc32..8e8de84a21 100644 --- a/common/dex/base/service.yaml +++ b/common/dex/base/service.yaml @@ -5,6 +5,13 @@ metadata: spec: type: ClusterIP sessionAffinity: ClientIP + sessionAffinityConfig: + clientIP: + # ClientIP affinity uses the source IP seen by this Service. + # In practice that is typically an in-cluster ingress or proxy pod IP, + # not the original end-user IP. Keep the timeout bounded so a single + # authentication flow stays sticky without creating long-lived skew. + timeoutSeconds: 600 ports: - name: dex port: 5556 From 16479b99428b786dbf79af747846132642014313 Mon Sep 17 00:00:00 2001 From: danish9039 Date: Wed, 4 Mar 2026 01:01:47 +0530 Subject: [PATCH 03/16] test: rerun ci Signed-off-by: danish9039 From b80cacef1c443ad73a7d5802c9411d078bb21278 Mon Sep 17 00:00:00 2001 From: danish9039 Date: Wed, 4 Mar 2026 02:12:04 +0530 Subject: [PATCH 04/16] test: rerun ci Signed-off-by: danish9039 From d1b9474e935ef96d6786722b6c2d4c077e50674b Mon Sep 17 00:00:00 2001 From: Julius von Kohout <45896133+juliusvonkohout@users.noreply.github.com> Date: Wed, 4 Mar 2026 08:06:42 +0100 Subject: [PATCH 05/16] Apply suggestion from @juliusvonkohout Signed-off-by: Julius von Kohout <45896133+juliusvonkohout@users.noreply.github.com> --- common/dex/base/service.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/dex/base/service.yaml b/common/dex/base/service.yaml index 8e8de84a21..3636620a28 100644 --- a/common/dex/base/service.yaml +++ b/common/dex/base/service.yaml @@ -11,7 +11,7 @@ spec: # In practice that is typically an in-cluster ingress or proxy pod IP, # not the original end-user IP. Keep the timeout bounded so a single # authentication flow stays sticky without creating long-lived skew. - timeoutSeconds: 600 + timeoutSeconds: 60 ports: - name: dex port: 5556 From 3313dce081df20115d7a1cd5fc57d621a763edbf Mon Sep 17 00:00:00 2001 From: danish9039 Date: Sat, 7 Mar 2026 16:43:13 +0000 Subject: [PATCH 06/16] test: extend dex authentication validation Signed-off-by: danish9039 --- tests/dex_login_test.py | 519 ++++++++++++++++++++++++++++++---------- 1 file changed, 386 insertions(+), 133 deletions(-) diff --git a/tests/dex_login_test.py b/tests/dex_login_test.py index bb8fad8eb9..f66cb66181 100755 --- a/tests/dex_login_test.py +++ b/tests/dex_login_test.py @@ -1,12 +1,42 @@ #!/usr/bin/env python3 +import concurrent.futures import re +import subprocess +import sys import time -from urllib.parse import urlsplit, urlencode +from dataclasses import dataclass +from urllib.parse import urlencode, urlsplit + import requests import urllib3 +ENDPOINT_URL = "http://localhost:8080" +DEX_USERNAME = "user@example.com" +DEX_PASSWORD = "12341234" +DEX_AUTH_TYPE = "local" +# Matches replicas: 2 in common/dex/base/deployment.yaml. +# Three sessions is sufficient to verify concurrent authentication across both replicas. +PARALLEL_SESSIONS = 3 +# Dex authcode GC window: authcodes must be deleted after token exchange completes. +GC_WAIT_SECONDS = 90 +REQUEST_TIMEOUT_SECONDS = 15 +KUBECTL_TIMEOUT_SECONDS = 120 +KUBECTL_REQUEST_TIMEOUT = "30s" + +AUTHENTICATION_SUCCESS_LOG_MARKER = "login successful" +DEX_POD_SELECTOR = "app=dex" +DEX_AUTHCODE_RESOURCE = "authcodes.dex.coreos.com" + + +@dataclass +class ParallelLoginResult: + index: int + ok: bool + error: str = "" + + class DexSessionManager: """ This is a version of the KFPClientManager() which only generates the Dex session cookies. @@ -19,7 +49,7 @@ def __init__( dex_username: str, dex_password: str, dex_auth_type: str = "local", - skip_tls_verify: bool = False, + skip_tls_verify: bool = True, ): """ Initialize the DexSessionManager @@ -35,165 +65,388 @@ def __init__( self._dex_username = dex_username self._dex_password = dex_password self._dex_auth_type = dex_auth_type - self._client = None - # disable SSL verification, if requested if self._skip_tls_verify: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - # ensure `dex_default_auth_type` is valid + # ensure `dex_auth_type` is valid if self._dex_auth_type not in ["ldap", "local"]: raise ValueError( f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']" ) + def _request_get(self, session: requests.Session, url: str) -> requests.Response: + return session.get( + url, + allow_redirects=True, + verify=not self._skip_tls_verify, + timeout=REQUEST_TIMEOUT_SECONDS, + ) + + def _request_post( + self, session: requests.Session, url: str, data: dict[str, str] + ) -> requests.Response: + return session.post( + url, + data=data, + allow_redirects=True, + verify=not self._skip_tls_verify, + timeout=REQUEST_TIMEOUT_SECONDS, + ) + + @staticmethod + def _has_oauth2_session_cookie(session: requests.Session) -> bool: + return any(cookie.name.startswith("oauth2_proxy") for cookie in session.cookies) + + def _resolve_dex_login_url(self, session: requests.Session, url_object) -> str: + """ + Given a URL object, navigate to the Dex login page and return its URL. + Handles the optional /auth selector step before the /auth//login page. + """ + # if we are at `../auth` path, we need to select an authentication type + if re.search(r"/auth$", url_object.path): + url_object = url_object._replace( + path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_object.path) + ) + + # if we are already at `../auth/xxxx/login`, use it directly + if re.search(r"/auth/.*/login$", url_object.path): + return url_object.geturl() + + # otherwise follow the redirect to the login page + response = self._request_get(session, url_object.geturl()) + if response.status_code != 200: + raise RuntimeError( + f"HTTP status code '{response.status_code}' for GET against: {url_object.geturl()}" + ) + return response.url + def get_session_cookies(self) -> str: """ - Get the session cookies by authenticating against Dex + Get the session cookies by authenticating against Dex. :return: a string of session cookies in the form "key1=value1; key2=value2" """ - max_retries = 3 - base_retry_delay = 2 - - for attempt in range(max_retries): - # Create a fresh session for each attempt to avoid stale state - session = requests.Session() - - if attempt > 0: - delay = base_retry_delay * (2 ** (attempt - 1)) - time.sleep(delay) - - try: - # GET the endpoint_url, which should redirect to Dex - response = session.get( - self._endpoint_url, - allow_redirects=True, - verify=not self._skip_tls_verify + session = requests.Session() + + try: + # GET the endpoint URL, which should redirect to Dex + response = self._request_get(session, self._endpoint_url) + if response.status_code == 200: + pass + elif response.status_code in [401, 403]: + # We may be at the oauth2-proxy sign-in page. + # The standard path to start the sign-in flow is /oauth2/start?rd= + url_object = urlsplit(response.url) + url_object = url_object._replace( + path="/oauth2/start", + query=urlencode({"rd": url_object.path}), ) - if response.status_code == 200: - pass - elif response.status_code in [401, 403]: - # if we get 401/403, we might be at the oauth2-proxy sign-in page - # the default path to start the sign-in flow is `/oauth2/start?rd=` - url_object = urlsplit(response.url) - url_object = url_object._replace( - path="/oauth2/start", - query=urlencode({"rd": url_object.path}) - ) - response = session.get( - url_object.geturl(), - allow_redirects=True, - verify=not self._skip_tls_verify - ) - if response.status_code not in [200, 302]: - raise RuntimeError( - f"HTTP status code '{response.status_code}' for GET against oauth2/start" - ) - else: + response = self._request_get(session, url_object.geturl()) + if response.status_code not in [200, 302]: raise RuntimeError( - f"HTTP status code '{response.status_code}' for GET against: {self._endpoint_url}" + f"HTTP status code '{response.status_code}' for GET against oauth2/start" ) + else: + raise RuntimeError( + f"HTTP status code '{response.status_code}' for GET against: {self._endpoint_url}" + ) - # if we were NOT redirected, then the endpoint is unsecured - if len(response.history) == 0: - # No cookies are needed - return "" + # if we were NOT redirected, the endpoint is unsecured — no cookies needed + if len(response.history) == 0: + return "" - # if we are at `../auth` path, we need to select an auth type - url_object = urlsplit(response.url) - if re.search(r"/auth$", url_object.path): - url_object = url_object._replace( - path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_object.path) + dex_login_url = self._resolve_dex_login_url(session, urlsplit(response.url)) + + # submit the login credentials + response = self._request_post( + session, + dex_login_url, + data={"login": self._dex_username, "password": self._dex_password}, + ) + + if response.status_code == 403: + # 403 after login POST can mean the oauth2-proxy session expired mid-flow. + # If the redirect chain passed through /oauth2/callback and we already have + # a valid oauth2 session cookie, we are actually authenticated — return early. + history_urls = [h.url for h in response.history] + if ( + any("/oauth2/callback" in u for u in history_urls) + and self._has_oauth2_session_cookie(session) + ): + return "; ".join( + [f"{cookie.name}={cookie.value}" for cookie in session.cookies] ) - # if we are at `../auth/xxxx/login` path, then we are at the login page - if re.search(r"/auth/.*/login$", url_object.path): - dex_login_url = url_object.geturl() - else: - # otherwise, we need to follow a redirect to the login page - response = session.get( - url_object.geturl(), - allow_redirects=True, - verify=not self._skip_tls_verify + # Otherwise restart the oauth2 flow and retry the login once + oauth_url = ( + f"{urlsplit(self._endpoint_url).scheme}://" + f"{urlsplit(self._endpoint_url).netloc}/oauth2/start" + ) + response = self._request_get(session, oauth_url) + if response.status_code not in [200, 302]: + raise RuntimeError( + "HTTP status code " + f"'{response.status_code}' for GET against oauth2/start during 403 recovery" ) - if response.status_code != 200: - raise RuntimeError( - f"HTTP status code '{response.status_code}' for GET against: {url_object.geturl()}" - ) - dex_login_url = response.url - - # attempt Dex login - response = session.post( + + dex_login_url = self._resolve_dex_login_url(session, urlsplit(response.url)) + response = self._request_post( + session, dex_login_url, data={"login": self._dex_username, "password": self._dex_password}, - allow_redirects=True, - verify=not self._skip_tls_verify, ) - # Handle 403 specifically - might need to restart oauth flow - if response.status_code == 403: - # Try one more approach - go through the oauth2 flow again - oauth_url = f"{urlsplit(self._endpoint_url).scheme}://{urlsplit(self._endpoint_url).netloc}/oauth2/start" - response = session.get( - oauth_url, - allow_redirects=True, - verify=not self._skip_tls_verify, - ) - # Continue with normal flow after restart - if response.status_code == 200 and session.cookies: - return "; ".join([f"{c.name}={c.value}" for c in session.cookies]) + if response.status_code != 200: + raise RuntimeError( + f"HTTP status code '{response.status_code}' for POST against: {dex_login_url}" + ) + + # no redirect after login POST means credentials were invalid + if len(response.history) == 0: + raise RuntimeError( + "Authentication credentials are probably invalid - " + f"no redirect after POST to: {dex_login_url}" + ) + # if we are at `../approval` path, we need to approve the login + url_object = urlsplit(response.url) + if re.search(r"/approval$", url_object.path): + dex_approval_url = url_object.geturl() + response = self._request_post( + session, dex_approval_url, data={"approval": "approve"} + ) if response.status_code != 200: raise RuntimeError( - f"HTTP status code '{response.status_code}' for POST against: {dex_login_url}" + f"HTTP status code '{response.status_code}' for POST against: {url_object.geturl()}" ) - # if we were NOT redirected, then the login credentials were probably invalid - if len(response.history) == 0: - raise RuntimeError( - f"Login credentials are probably invalid - " - f"No redirect after POST to: {dex_login_url}" - ) + return "; ".join([f"{cookie.name}={cookie.value}" for cookie in session.cookies]) - # if we are at `../approval` path, we need to approve the login - url_object = urlsplit(response.url) - if re.search(r"/approval$", url_object.path): - dex_approval_url = url_object.geturl() - # Approve the login - response = session.post( - dex_approval_url, - data={"approval": "approve"}, - allow_redirects=True, - verify=not self._skip_tls_verify, - ) - if response.status_code != 200: - raise RuntimeError( - f"HTTP status code '{response.status_code}' for POST against: {url_object.geturl()}" - ) - - return "; ".join([f"{c.name}={c.value}" for c in session.cookies]) - - except Exception as e: - if attempt == max_retries - 1: # Last attempt - print(f"All {max_retries} attempts failed. Last error: {str(e)}") - raise - next_delay = base_retry_delay * (2 ** attempt) - print(f"Attempt {attempt + 1} failed: {str(e)}") - - -KUBEFLOW_ENDPOINT = "http://localhost:8080" -KUBEFLOW_USERNAME = "user@example.com" -KUBEFLOW_PASSWORD = "12341234" - -# initialize a DexSessionManager -dex_session_manager = DexSessionManager( - endpoint_url=KUBEFLOW_ENDPOINT, - skip_tls_verify=True, - dex_username=KUBEFLOW_USERNAME, - dex_password=KUBEFLOW_PASSWORD, - dex_auth_type="local", -) - -# try to get the session cookies -# NOTE: this will raise an exception if something goes wrong -session_cookies = dex_session_manager.get_session_cookies() \ No newline at end of file + except requests.RequestException as exc: + raise RuntimeError(f"Dex authentication request failed: {exc}") from exc + + +def run_cmd(cmd: list[str], timeout: int = KUBECTL_TIMEOUT_SECONDS) -> subprocess.CompletedProcess: + try: + return subprocess.run( + cmd, + check=False, + text=True, + capture_output=True, + timeout=timeout, + ) + except subprocess.TimeoutExpired as exc: + raise RuntimeError(f"Command timed out after {timeout}s: {' '.join(cmd)}") from exc + + +def run_cmd_or_fail(cmd: list[str], timeout: int = KUBECTL_TIMEOUT_SECONDS) -> subprocess.CompletedProcess: + result = run_cmd(cmd, timeout=timeout) + if result.returncode != 0: + raise RuntimeError( + "Command failed " + f"(rc={result.returncode}): {' '.join(cmd)}\n" + f"stdout:\n{result.stdout.strip()}\n" + f"stderr:\n{result.stderr.strip()}" + ) + return result + + +def get_dex_pods(min_replicas: int = 2) -> list[str]: + """ + Return the names of running Dex pods in the auth namespace. + Raises if fewer than min_replicas pods are found — the parallel authentication + test requires at least two replicas to verify cross-replica load distribution. + """ + cmd = [ + "kubectl", + "--request-timeout", KUBECTL_REQUEST_TIMEOUT, + "-n", "auth", + "get", "pods", + "-l", DEX_POD_SELECTOR, + "-o", "jsonpath={.items[*].metadata.name}", + ] + result = run_cmd_or_fail(cmd) + pods = [pod for pod in result.stdout.strip().split() if pod] + if len(pods) < min_replicas: + raise RuntimeError( + f"Expected at least {min_replicas} Dex pods (selector: {DEX_POD_SELECTOR}) " + f"in namespace auth, found: {pods}. " + "The Dex deployment at common/dex/base/deployment.yaml is configured with " + "replicas: 2 — ensure all pods have reached the Ready state before running this test." + ) + return pods + + +def count_authentication_hits_for_pod(pod: str, since_seconds: int) -> int: + """Count how many successful authentication events appear in a pod's logs.""" + cmd = [ + "kubectl", + "--request-timeout", KUBECTL_REQUEST_TIMEOUT, + "-n", "auth", + "logs", pod, + f"--since={since_seconds}s", + ] + result = run_cmd_or_fail(cmd) + return len(re.findall(re.escape(AUTHENTICATION_SUCCESS_LOG_MARKER), result.stdout)) + + +def count_authcodes_objects() -> int: + """ + Count the number of Dex authcode CRD objects currently in the cluster. + Dex creates one authcode object per login; the GC process deletes them after + the token exchange completes. Returns 0 if no instances exist. + """ + cmd = [ + "kubectl", + "--request-timeout", KUBECTL_REQUEST_TIMEOUT, + "get", DEX_AUTHCODE_RESOURCE, + "-A", "--no-headers", + ] + result = run_cmd(cmd) + # "no resources found" is a normal state — return 0 rather than raising + if result.returncode != 0: + combined = (result.stdout + "\n" + result.stderr).lower() + if "no resources found" in combined: + return 0 + raise RuntimeError( + f"Failed to query {DEX_AUTHCODE_RESOURCE}: {result.stderr.strip()}" + ) + return len([line for line in result.stdout.splitlines() if line.strip()]) + + +def run_single_login() -> str: + manager = DexSessionManager( + endpoint_url=ENDPOINT_URL, + skip_tls_verify=True, + dex_username=DEX_USERNAME, + dex_password=DEX_PASSWORD, + dex_auth_type=DEX_AUTH_TYPE, + ) + return manager.get_session_cookies() + + +def run_parallel_login_session(index: int) -> ParallelLoginResult: + try: + run_single_login() + return ParallelLoginResult(index=index, ok=True) + except Exception as exc: + return ParallelLoginResult(index=index, ok=False, error=str(exc)) + + +def run_parallel_validation() -> None: + """ + Validates that: + 1. PARALLEL_SESSIONS concurrent authentication sessions all succeed against a + multi-replica Dex deployment. + 2. Login traffic is distributed across at least two Dex replicas (load balancer + is working). With no sessionAffinity on the Dex Service, the Kubernetes load + balancer distributes connections freely, so a single burst is sufficient to + observe both replicas receiving traffic. + 3. Dex authcode CRD objects created during the burst are garbage collected after + the GC_WAIT_SECONDS window. With storage.type=kubernetes, authcodes are + Kubernetes CRD objects that Dex actively deletes after each token exchange. + + Requires at least 2 Dex replicas (replicas: 2 in common/dex/base/deployment.yaml). + The since_seconds log window is sized to cover the burst plus GC wait plus a buffer + so that baseline and post-burst reads always observe the same window. + """ + pods = get_dex_pods(min_replicas=2) + print(f"Dex pods: {pods}") + + # Size the log window to cover the burst duration plus GC wait plus a buffer. + since_seconds = max(GC_WAIT_SECONDS + 120, 300) + + # Snapshot state before the burst + baseline_hits = { + pod: count_authentication_hits_for_pod(pod, since_seconds) + for pod in pods + } + authcodes_before = count_authcodes_objects() + + print(f"Running parallel authentication burst with sessions={PARALLEL_SESSIONS}") + + # Run all parallel authentication sessions and collect results + failures = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=PARALLEL_SESSIONS) as executor: + futures = [ + executor.submit(run_parallel_login_session, index) + for index in range(PARALLEL_SESSIONS) + ] + for future in concurrent.futures.as_completed(futures, timeout=REQUEST_TIMEOUT_SECONDS * 3): + result = future.result() + if not result.ok: + failures.append(result) + + if failures: + error_summary = "; ".join( + [f"session={f.index} error={f.error}" for f in failures] + ) + raise RuntimeError( + f"Parallel authentication session failures: {error_summary}" + ) + + # Verify that at least two distinct replicas handled authentication requests. + # This confirms the load balancer is distributing traffic across pods. + # Requires sessionAffinity to be absent from the Dex Service — affinity would pin + # all sessions from the same source IP to a single pod, defeating this check. + post_hits = { + pod: count_authentication_hits_for_pod(pod, since_seconds) + for pod in pods + } + hit_delta = { + pod: max(post_hits[pod] - baseline_hits[pod], 0) + for pod in pods + } + print(f"Authentication hit delta by pod: {hit_delta}") + + hit_pods = [pod for pod, delta in hit_delta.items() if delta > 0] + if len(hit_pods) < 2: + raise RuntimeError( + "Expected authentication traffic across at least two Dex replicas " + f"but observed: {hit_delta}. " + "Verify that the Dex Service has no sessionAffinity configured." + ) + + # Verify GC: authcodes created during the burst must be cleaned up after the wait window. + # Dex creates one authcode CRD object per login and deletes it after the token exchange. + # If GC is broken, authcodes accumulate indefinitely. + authcodes_after_burst = count_authcodes_objects() + print(f"Authcodes count: before={authcodes_before} after_burst={authcodes_after_burst}") + + time.sleep(GC_WAIT_SECONDS) + authcodes_after_wait = count_authcodes_objects() + print(f"Authcodes count after GC wait ({GC_WAIT_SECONDS}s): {authcodes_after_wait}") + + if authcodes_after_burst > authcodes_before: + # The burst created new authcodes — GC must reduce the count + if authcodes_after_wait >= authcodes_after_burst: + raise RuntimeError( + "Authcodes did not decrease after GC wait window — " + "Dex GC may not be functioning correctly. " + f"before={authcodes_before} burst={authcodes_after_burst} " + f"after_wait={authcodes_after_wait}" + ) + elif authcodes_after_wait > authcodes_after_burst: + # No burst growth but count increased during wait — unexpected leak + raise RuntimeError( + "Authcodes increased during GC wait window despite no observed burst growth — " + "possible authcode leak from another process. " + f"before={authcodes_before} burst={authcodes_after_burst} " + f"after_wait={authcodes_after_wait}" + ) + + +def main() -> None: + run_single_login() + print("Dex single authentication validation passed") + + run_parallel_validation() + print("Dex parallel authentication and GC validation passed") + + +if __name__ == "__main__": + try: + main() + except Exception as exc: + print(f"Dex authentication test failed: {exc}", file=sys.stderr) + raise SystemExit(1) \ No newline at end of file From 4daa3c50539d18fd1d15d497604da733164a6659 Mon Sep 17 00:00:00 2001 From: danish9039 Date: Sat, 7 Mar 2026 16:49:30 +0000 Subject: [PATCH 07/16] dex: remove service affinity Signed-off-by: danish9039 --- common/dex/base/service.yaml | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/common/dex/base/service.yaml b/common/dex/base/service.yaml index 3636620a28..e37b3d4bcf 100644 --- a/common/dex/base/service.yaml +++ b/common/dex/base/service.yaml @@ -4,14 +4,8 @@ metadata: name: dex spec: type: ClusterIP - sessionAffinity: ClientIP - sessionAffinityConfig: - clientIP: - # ClientIP affinity uses the source IP seen by this Service. - # In practice that is typically an in-cluster ingress or proxy pod IP, - # not the original end-user IP. Keep the timeout bounded so a single - # authentication flow stays sticky without creating long-lived skew. - timeoutSeconds: 60 + # No sessionAffinity needed: storage.type=kubernetes stores all auth state + # as CRDs shared across pods, so any replica can complete any login flow. ports: - name: dex port: 5556 From 15a49093383048e279d5490d29d3eae5ee56c4cb Mon Sep 17 00:00:00 2001 From: danish9039 Date: Sat, 7 Mar 2026 17:32:46 +0000 Subject: [PATCH 08/16] test: rename dex auth helpers Signed-off-by: danish9039 --- tests/dex_login_test.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/dex_login_test.py b/tests/dex_login_test.py index f66cb66181..aa1e454c55 100755 --- a/tests/dex_login_test.py +++ b/tests/dex_login_test.py @@ -31,7 +31,7 @@ @dataclass -class ParallelLoginResult: +class ParallelAuthenticationResult: index: int ok: bool error: str = "" @@ -314,7 +314,7 @@ def count_authcodes_objects() -> int: return len([line for line in result.stdout.splitlines() if line.strip()]) -def run_single_login() -> str: +def run_single_authentication() -> str: manager = DexSessionManager( endpoint_url=ENDPOINT_URL, skip_tls_verify=True, @@ -325,12 +325,12 @@ def run_single_login() -> str: return manager.get_session_cookies() -def run_parallel_login_session(index: int) -> ParallelLoginResult: +def run_parallel_authentication_session(index: int) -> ParallelAuthenticationResult: try: - run_single_login() - return ParallelLoginResult(index=index, ok=True) + run_single_authentication() + return ParallelAuthenticationResult(index=index, ok=True) except Exception as exc: - return ParallelLoginResult(index=index, ok=False, error=str(exc)) + return ParallelAuthenticationResult(index=index, ok=False, error=str(exc)) def run_parallel_validation() -> None: @@ -338,7 +338,7 @@ def run_parallel_validation() -> None: Validates that: 1. PARALLEL_SESSIONS concurrent authentication sessions all succeed against a multi-replica Dex deployment. - 2. Login traffic is distributed across at least two Dex replicas (load balancer + 2. Authentication traffic is distributed across at least two Dex replicas (load balancer is working). With no sessionAffinity on the Dex Service, the Kubernetes load balancer distributes connections freely, so a single burst is sufficient to observe both replicas receiving traffic. @@ -369,7 +369,7 @@ def run_parallel_validation() -> None: failures = [] with concurrent.futures.ThreadPoolExecutor(max_workers=PARALLEL_SESSIONS) as executor: futures = [ - executor.submit(run_parallel_login_session, index) + executor.submit(run_parallel_authentication_session, index) for index in range(PARALLEL_SESSIONS) ] for future in concurrent.futures.as_completed(futures, timeout=REQUEST_TIMEOUT_SECONDS * 3): @@ -437,7 +437,7 @@ def run_parallel_validation() -> None: def main() -> None: - run_single_login() + run_single_authentication() print("Dex single authentication validation passed") run_parallel_validation() From 52fc6b48593c532037b41e068d459f1e6d19e6f2 Mon Sep 17 00:00:00 2001 From: danish9039 Date: Sat, 7 Mar 2026 17:58:55 +0000 Subject: [PATCH 09/16] test: increase dex auth burst Signed-off-by: danish9039 --- tests/dex_login_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/dex_login_test.py b/tests/dex_login_test.py index aa1e454c55..48449b4c35 100755 --- a/tests/dex_login_test.py +++ b/tests/dex_login_test.py @@ -17,8 +17,8 @@ DEX_PASSWORD = "12341234" DEX_AUTH_TYPE = "local" # Matches replicas: 2 in common/dex/base/deployment.yaml. -# Three sessions is sufficient to verify concurrent authentication across both replicas. -PARALLEL_SESSIONS = 3 +# Use a larger burst so the replica-distribution assertion is statistically stable in CI. +PARALLEL_SESSIONS = 8 # Dex authcode GC window: authcodes must be deleted after token exchange completes. GC_WAIT_SECONDS = 90 REQUEST_TIMEOUT_SECONDS = 15 @@ -449,4 +449,4 @@ def main() -> None: main() except Exception as exc: print(f"Dex authentication test failed: {exc}", file=sys.stderr) - raise SystemExit(1) \ No newline at end of file + raise SystemExit(1) From 45d1a8d8c0c96975cc7eec686e15abf2d65be586 Mon Sep 17 00:00:00 2001 From: hippie-danish <133037056+danish9039@users.noreply.github.com> Date: Sun, 5 Apr 2026 00:05:52 +0530 Subject: [PATCH 10/16] tests: add dex_authorization_test.py for RBAC authorization probes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new authorization test that sits one layer above dex_login_test.py. While dex_login_test.py validates the authentication flow (Dex session cookie issuance and multi-replica distribution), this test validates that the Kubeflow RBAC layer correctly enforces access control after a valid session is established. The test reuses DexSessionManager from dex_login_test.py to obtain a session cookie, then probes the Kubeflow Pipelines v2 API with four scenarios: Probe 1: authenticated user accessing their own namespace → HTTP 200 Probe 2: unauthenticated request (no cookie) → HTTP 302/401/403 Probe 3: authenticated user accessing an unauthorized namespace → HTTP 403 Probe 4: ServiceAccount token from unauthorized namespace → HTTP 401/403 This mirrors the pattern established in kserve_test.sh Test 3 (unauthorized ServiceAccount token rejection) and volumes_web_application_test.sh (namespace-scoped RBAC enforcement). Keeping this separate from dex_login_test.py preserves clean failure signals in CI: a broken AuthorizationPolicy will fail this test without polluting the Dex replica/GC validation signal. Signed-off-by: hippie-danish <133037056+danish9039@users.noreply.github.com> From 4785e8cd2e7e228abb321d1e2897b160d59f8197 Mon Sep 17 00:00:00 2001 From: danish9039 Date: Sun, 5 Apr 2026 02:28:20 +0530 Subject: [PATCH 11/16] test: align dex authorization coverage Signed-off-by: danish9039 --- common/dex/base/service.yaml | 4 +- tests/dex_authorization_test.py | 158 ++++++++++++++++++++++++++++++++ tests/dex_login_test.py | 2 +- 3 files changed, 161 insertions(+), 3 deletions(-) create mode 100644 tests/dex_authorization_test.py diff --git a/common/dex/base/service.yaml b/common/dex/base/service.yaml index e37b3d4bcf..cf40b8e0f2 100644 --- a/common/dex/base/service.yaml +++ b/common/dex/base/service.yaml @@ -4,8 +4,8 @@ metadata: name: dex spec: type: ClusterIP - # No sessionAffinity needed: storage.type=kubernetes stores all auth state - # as CRDs shared across pods, so any replica can complete any login flow. + # No sessionAffinity needed: storage.type=kubernetes stores Dex authentication + # state as CRDs shared across pods, so any replica can complete the login flow. ports: - name: dex port: 5556 diff --git a/tests/dex_authorization_test.py b/tests/dex_authorization_test.py new file mode 100644 index 0000000000..66bcf80aa8 --- /dev/null +++ b/tests/dex_authorization_test.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +""" +Authorization checks for the Dex + oauth2-proxy stack. + +This test reuses the Dex login flow from dex_login_test.py, then verifies that +the volumes web app honors namespace-scoped authorization decisions. +""" + +import sys + +import requests +import urllib3 + +from dex_login_test import DexSessionManager, run_cmd_or_fail + +ENDPOINT_URL = "http://localhost:8080" +DEX_USERNAME = "user@example.com" +DEX_PASSWORD = "12341234" +DEX_AUTH_TYPE = "local" + +AUTHORIZED_NAMESPACE = "kubeflow-user-example-com" +UNAUTHORIZED_NAMESPACE = "default" + +VOLUMES_UI_PATH = "/volumes/" +VOLUMES_API_TEMPLATE = "/volumes/api/namespaces/{namespace}/pvcs" + +REQUEST_TIMEOUT_SECONDS = 15 + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + +def _request( + url: str, + headers: dict[str, str], + label: str, + expected_codes: list[int], + allow_redirects: bool = True, +) -> requests.Response: + response = requests.get( + url, + headers=headers, + allow_redirects=allow_redirects, + verify=False, + timeout=REQUEST_TIMEOUT_SECONDS, + ) + print(f"{label}: HTTP {response.status_code}") + if response.status_code not in expected_codes: + raise RuntimeError( + f"{label}: expected one of {expected_codes}, got {response.status_code}. " + f"Response body (first 300 chars): {response.text[:300]}" + ) + return response + + +def _session_cookie_header(session_cookies: str, xsrf_token: str | None = None) -> dict[str, str]: + cookie_header = session_cookies + headers = {"Cookie": cookie_header} + if xsrf_token: + headers["Cookie"] = f"{cookie_header}; XSRF-TOKEN={xsrf_token}" + headers["X-XSRF-TOKEN"] = xsrf_token + return headers + + +def _get_xsrf_token(session_cookies: str) -> str: + response = _request( + url=f"{ENDPOINT_URL}{VOLUMES_UI_PATH}", + headers=_session_cookie_header(session_cookies), + label="Get volumes UI XSRF token", + expected_codes=[200], + ) + xsrf_token = response.cookies.get("XSRF-TOKEN") + if not xsrf_token: + raise RuntimeError("Volumes UI did not return an XSRF-TOKEN cookie") + return xsrf_token + + +def _volumes_api_url(namespace: str) -> str: + return f"{ENDPOINT_URL}{VOLUMES_API_TEMPLATE.format(namespace=namespace)}" + + +def run_authorized_access_validation(session_cookies: str, xsrf_token: str) -> None: + _request( + url=_volumes_api_url(AUTHORIZED_NAMESPACE), + headers=_session_cookie_header(session_cookies, xsrf_token), + label="Probe 1 (authorized cookie, own namespace)", + expected_codes=[200], + ) + print("PASS: Authorized Dex session can access the volumes API in its own namespace") + + +def run_unauthenticated_access_validation() -> None: + response = _request( + url=_volumes_api_url(AUTHORIZED_NAMESPACE), + headers={}, + label="Probe 2 (no cookie)", + expected_codes=[302, 401, 403], + allow_redirects=False, + ) + if response.status_code == 200: + raise RuntimeError("Unauthenticated request unexpectedly reached a protected endpoint") + print("PASS: Unauthenticated request is rejected before namespace access is granted") + + +def run_cross_namespace_authorization_validation( + session_cookies: str, xsrf_token: str +) -> None: + _request( + url=_volumes_api_url(UNAUTHORIZED_NAMESPACE), + headers=_session_cookie_header(session_cookies, xsrf_token), + label=f"Probe 3 (authorized cookie, unauthorized namespace={UNAUTHORIZED_NAMESPACE})", + expected_codes=[403], + ) + print("PASS: Authenticated user is denied access to a different namespace") + + +def run_unauthorized_serviceaccount_validation() -> None: + unauthorized_token = run_cmd_or_fail( + ["kubectl", "-n", UNAUTHORIZED_NAMESPACE, "create", "token", "default"] + ).stdout.strip() + _request( + url=_volumes_api_url(AUTHORIZED_NAMESPACE), + headers={"Authorization": f"Bearer {unauthorized_token}"}, + label=( + f"Probe 4 (unauthorized ServiceAccount token from " + f"namespace={UNAUTHORIZED_NAMESPACE})" + ), + expected_codes=[401, 403], + allow_redirects=False, + ) + print("PASS: ServiceAccount token from an unauthorized namespace is rejected") + + +def main() -> None: + print("Obtaining Dex session cookie for authorization probes...") + manager = DexSessionManager( + endpoint_url=ENDPOINT_URL, + skip_tls_verify=True, + dex_username=DEX_USERNAME, + dex_password=DEX_PASSWORD, + dex_auth_type=DEX_AUTH_TYPE, + ) + session_cookies = manager.get_session_cookies() + xsrf_token = _get_xsrf_token(session_cookies) + + run_authorized_access_validation(session_cookies, xsrf_token) + run_unauthenticated_access_validation() + run_cross_namespace_authorization_validation(session_cookies, xsrf_token) + run_unauthorized_serviceaccount_validation() + + print("\nAll authorization probes passed") + + +if __name__ == "__main__": + try: + main() + except Exception as exc: + print(f"Dex authorization test failed: {exc}", file=sys.stderr) + raise SystemExit(1) diff --git a/tests/dex_login_test.py b/tests/dex_login_test.py index 48449b4c35..78b92128fa 100755 --- a/tests/dex_login_test.py +++ b/tests/dex_login_test.py @@ -49,7 +49,7 @@ def __init__( dex_username: str, dex_password: str, dex_auth_type: str = "local", - skip_tls_verify: bool = True, + skip_tls_verify: bool = False, ): """ Initialize the DexSessionManager From cd8d038c08f26ac7e003634b65809e73077c45a5 Mon Sep 17 00:00:00 2001 From: danish9039 Date: Sun, 5 Apr 2026 03:07:44 +0530 Subject: [PATCH 12/16] test: drop dex authorization sidecar test Signed-off-by: danish9039 --- tests/dex_authorization_test.py | 158 -------------------------------- 1 file changed, 158 deletions(-) delete mode 100644 tests/dex_authorization_test.py diff --git a/tests/dex_authorization_test.py b/tests/dex_authorization_test.py deleted file mode 100644 index 66bcf80aa8..0000000000 --- a/tests/dex_authorization_test.py +++ /dev/null @@ -1,158 +0,0 @@ -#!/usr/bin/env python3 -""" -Authorization checks for the Dex + oauth2-proxy stack. - -This test reuses the Dex login flow from dex_login_test.py, then verifies that -the volumes web app honors namespace-scoped authorization decisions. -""" - -import sys - -import requests -import urllib3 - -from dex_login_test import DexSessionManager, run_cmd_or_fail - -ENDPOINT_URL = "http://localhost:8080" -DEX_USERNAME = "user@example.com" -DEX_PASSWORD = "12341234" -DEX_AUTH_TYPE = "local" - -AUTHORIZED_NAMESPACE = "kubeflow-user-example-com" -UNAUTHORIZED_NAMESPACE = "default" - -VOLUMES_UI_PATH = "/volumes/" -VOLUMES_API_TEMPLATE = "/volumes/api/namespaces/{namespace}/pvcs" - -REQUEST_TIMEOUT_SECONDS = 15 - -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - - -def _request( - url: str, - headers: dict[str, str], - label: str, - expected_codes: list[int], - allow_redirects: bool = True, -) -> requests.Response: - response = requests.get( - url, - headers=headers, - allow_redirects=allow_redirects, - verify=False, - timeout=REQUEST_TIMEOUT_SECONDS, - ) - print(f"{label}: HTTP {response.status_code}") - if response.status_code not in expected_codes: - raise RuntimeError( - f"{label}: expected one of {expected_codes}, got {response.status_code}. " - f"Response body (first 300 chars): {response.text[:300]}" - ) - return response - - -def _session_cookie_header(session_cookies: str, xsrf_token: str | None = None) -> dict[str, str]: - cookie_header = session_cookies - headers = {"Cookie": cookie_header} - if xsrf_token: - headers["Cookie"] = f"{cookie_header}; XSRF-TOKEN={xsrf_token}" - headers["X-XSRF-TOKEN"] = xsrf_token - return headers - - -def _get_xsrf_token(session_cookies: str) -> str: - response = _request( - url=f"{ENDPOINT_URL}{VOLUMES_UI_PATH}", - headers=_session_cookie_header(session_cookies), - label="Get volumes UI XSRF token", - expected_codes=[200], - ) - xsrf_token = response.cookies.get("XSRF-TOKEN") - if not xsrf_token: - raise RuntimeError("Volumes UI did not return an XSRF-TOKEN cookie") - return xsrf_token - - -def _volumes_api_url(namespace: str) -> str: - return f"{ENDPOINT_URL}{VOLUMES_API_TEMPLATE.format(namespace=namespace)}" - - -def run_authorized_access_validation(session_cookies: str, xsrf_token: str) -> None: - _request( - url=_volumes_api_url(AUTHORIZED_NAMESPACE), - headers=_session_cookie_header(session_cookies, xsrf_token), - label="Probe 1 (authorized cookie, own namespace)", - expected_codes=[200], - ) - print("PASS: Authorized Dex session can access the volumes API in its own namespace") - - -def run_unauthenticated_access_validation() -> None: - response = _request( - url=_volumes_api_url(AUTHORIZED_NAMESPACE), - headers={}, - label="Probe 2 (no cookie)", - expected_codes=[302, 401, 403], - allow_redirects=False, - ) - if response.status_code == 200: - raise RuntimeError("Unauthenticated request unexpectedly reached a protected endpoint") - print("PASS: Unauthenticated request is rejected before namespace access is granted") - - -def run_cross_namespace_authorization_validation( - session_cookies: str, xsrf_token: str -) -> None: - _request( - url=_volumes_api_url(UNAUTHORIZED_NAMESPACE), - headers=_session_cookie_header(session_cookies, xsrf_token), - label=f"Probe 3 (authorized cookie, unauthorized namespace={UNAUTHORIZED_NAMESPACE})", - expected_codes=[403], - ) - print("PASS: Authenticated user is denied access to a different namespace") - - -def run_unauthorized_serviceaccount_validation() -> None: - unauthorized_token = run_cmd_or_fail( - ["kubectl", "-n", UNAUTHORIZED_NAMESPACE, "create", "token", "default"] - ).stdout.strip() - _request( - url=_volumes_api_url(AUTHORIZED_NAMESPACE), - headers={"Authorization": f"Bearer {unauthorized_token}"}, - label=( - f"Probe 4 (unauthorized ServiceAccount token from " - f"namespace={UNAUTHORIZED_NAMESPACE})" - ), - expected_codes=[401, 403], - allow_redirects=False, - ) - print("PASS: ServiceAccount token from an unauthorized namespace is rejected") - - -def main() -> None: - print("Obtaining Dex session cookie for authorization probes...") - manager = DexSessionManager( - endpoint_url=ENDPOINT_URL, - skip_tls_verify=True, - dex_username=DEX_USERNAME, - dex_password=DEX_PASSWORD, - dex_auth_type=DEX_AUTH_TYPE, - ) - session_cookies = manager.get_session_cookies() - xsrf_token = _get_xsrf_token(session_cookies) - - run_authorized_access_validation(session_cookies, xsrf_token) - run_unauthenticated_access_validation() - run_cross_namespace_authorization_validation(session_cookies, xsrf_token) - run_unauthorized_serviceaccount_validation() - - print("\nAll authorization probes passed") - - -if __name__ == "__main__": - try: - main() - except Exception as exc: - print(f"Dex authorization test failed: {exc}", file=sys.stderr) - raise SystemExit(1) From 22da49fe9d3be87b00e17a402d8cb51e2f8455bd Mon Sep 17 00:00:00 2001 From: danish9039 Date: Fri, 10 Apr 2026 19:44:37 +0530 Subject: [PATCH 13/16] test: harden dex login validation Signed-off-by: danish9039 --- tests/dex_login_test.py | 289 ++++++++++++++++++++++++---------------- 1 file changed, 175 insertions(+), 114 deletions(-) diff --git a/tests/dex_login_test.py b/tests/dex_login_test.py index 78b92128fa..510ced15dd 100755 --- a/tests/dex_login_test.py +++ b/tests/dex_login_test.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import concurrent.futures +import json import re import subprocess import sys @@ -22,6 +23,7 @@ # Dex authcode GC window: authcodes must be deleted after token exchange completes. GC_WAIT_SECONDS = 90 REQUEST_TIMEOUT_SECONDS = 15 +PARALLEL_TEST_TIMEOUT_SECONDS = PARALLEL_SESSIONS * REQUEST_TIMEOUT_SECONDS KUBECTL_TIMEOUT_SECONDS = 120 KUBECTL_REQUEST_TIMEOUT = "30s" @@ -32,9 +34,9 @@ @dataclass class ParallelAuthenticationResult: - index: int - ok: bool - error: str = "" + session_index: int + succeeded: bool + error_message: str = "" class DexSessionManager: @@ -75,20 +77,20 @@ def __init__( f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']" ) - def _request_get(self, session: requests.Session, url: str) -> requests.Response: + def _request_get(self, session: requests.Session, request_url: str) -> requests.Response: return session.get( - url, + request_url, allow_redirects=True, verify=not self._skip_tls_verify, timeout=REQUEST_TIMEOUT_SECONDS, ) def _request_post( - self, session: requests.Session, url: str, data: dict[str, str] + self, session: requests.Session, request_url: str, form_data: dict[str, str] ) -> requests.Response: return session.post( - url, - data=data, + request_url, + data=form_data, allow_redirects=True, verify=not self._skip_tls_verify, timeout=REQUEST_TIMEOUT_SECONDS, @@ -98,26 +100,31 @@ def _request_post( def _has_oauth2_session_cookie(session: requests.Session) -> bool: return any(cookie.name.startswith("oauth2_proxy") for cookie in session.cookies) - def _resolve_dex_login_url(self, session: requests.Session, url_object) -> str: + def _resolve_dex_login_url(self, session: requests.Session, split_url_object) -> str: """ Given a URL object, navigate to the Dex login page and return its URL. Handles the optional /auth selector step before the /auth//login page. """ # if we are at `../auth` path, we need to select an authentication type - if re.search(r"/auth$", url_object.path): - url_object = url_object._replace( - path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_object.path) + if re.search(r"/auth$", split_url_object.path): + split_url_object = split_url_object._replace( + path=re.sub( + r"/auth$", + f"/auth/{self._dex_auth_type}", + split_url_object.path, + ) ) # if we are already at `../auth/xxxx/login`, use it directly - if re.search(r"/auth/.*/login$", url_object.path): - return url_object.geturl() + if re.search(r"/auth/.*/login$", split_url_object.path): + return split_url_object.geturl() # otherwise follow the redirect to the login page - response = self._request_get(session, url_object.geturl()) + response = self._request_get(session, split_url_object.geturl()) if response.status_code != 200: raise RuntimeError( - f"HTTP status code '{response.status_code}' for GET against: {url_object.geturl()}" + "HTTP status code " + f"'{response.status_code}' for GET against: {split_url_object.geturl()}" ) return response.url @@ -131,22 +138,20 @@ def get_session_cookies(self) -> str: try: # GET the endpoint URL, which should redirect to Dex response = self._request_get(session, self._endpoint_url) - if response.status_code == 200: - pass - elif response.status_code in [401, 403]: + if response.status_code in [401, 403]: # We may be at the oauth2-proxy sign-in page. # The standard path to start the sign-in flow is /oauth2/start?rd= - url_object = urlsplit(response.url) - url_object = url_object._replace( + split_url_object = urlsplit(response.url) + split_url_object = split_url_object._replace( path="/oauth2/start", - query=urlencode({"rd": url_object.path}), + query=urlencode({"rd": split_url_object.path}), ) - response = self._request_get(session, url_object.geturl()) + response = self._request_get(session, split_url_object.geturl()) if response.status_code not in [200, 302]: raise RuntimeError( f"HTTP status code '{response.status_code}' for GET against oauth2/start" ) - else: + elif response.status_code != 200: raise RuntimeError( f"HTTP status code '{response.status_code}' for GET against: {self._endpoint_url}" ) @@ -161,7 +166,7 @@ def get_session_cookies(self) -> str: response = self._request_post( session, dex_login_url, - data={"login": self._dex_username, "password": self._dex_password}, + form_data={"login": self._dex_username, "password": self._dex_password}, ) if response.status_code == 403: @@ -193,7 +198,7 @@ def get_session_cookies(self) -> str: response = self._request_post( session, dex_login_url, - data={"login": self._dex_username, "password": self._dex_password}, + form_data={"login": self._dex_username, "password": self._dex_password}, ) if response.status_code != 200: @@ -209,46 +214,50 @@ def get_session_cookies(self) -> str: ) # if we are at `../approval` path, we need to approve the login - url_object = urlsplit(response.url) - if re.search(r"/approval$", url_object.path): - dex_approval_url = url_object.geturl() + split_url_object = urlsplit(response.url) + if re.search(r"/approval$", split_url_object.path): + dex_approval_url = split_url_object.geturl() response = self._request_post( - session, dex_approval_url, data={"approval": "approve"} + session, dex_approval_url, form_data={"approval": "approve"} ) if response.status_code != 200: raise RuntimeError( - f"HTTP status code '{response.status_code}' for POST against: {url_object.geturl()}" + "HTTP status code " + f"'{response.status_code}' for POST against: {split_url_object.geturl()}" ) return "; ".join([f"{cookie.name}={cookie.value}" for cookie in session.cookies]) - except requests.RequestException as exc: - raise RuntimeError(f"Dex authentication request failed: {exc}") from exc + except requests.RequestException as request_exception: + raise RuntimeError(f"Dex authentication request failed: {request_exception}") from request_exception -def run_cmd(cmd: list[str], timeout: int = KUBECTL_TIMEOUT_SECONDS) -> subprocess.CompletedProcess: +def run_command(command_arguments: list[str], timeout_seconds: int = KUBECTL_TIMEOUT_SECONDS) -> subprocess.CompletedProcess: try: return subprocess.run( - cmd, + command_arguments, check=False, text=True, capture_output=True, - timeout=timeout, + timeout=timeout_seconds, ) - except subprocess.TimeoutExpired as exc: - raise RuntimeError(f"Command timed out after {timeout}s: {' '.join(cmd)}") from exc + except subprocess.TimeoutExpired as timeout_exception: + raise RuntimeError( + "Command timed out after " + f"{timeout_seconds}s: {' '.join(command_arguments)}" + ) from timeout_exception -def run_cmd_or_fail(cmd: list[str], timeout: int = KUBECTL_TIMEOUT_SECONDS) -> subprocess.CompletedProcess: - result = run_cmd(cmd, timeout=timeout) - if result.returncode != 0: +def run_command_or_fail(command_arguments: list[str], timeout_seconds: int = KUBECTL_TIMEOUT_SECONDS) -> subprocess.CompletedProcess: + command_result = run_command(command_arguments, timeout_seconds=timeout_seconds) + if command_result.returncode != 0: raise RuntimeError( "Command failed " - f"(rc={result.returncode}): {' '.join(cmd)}\n" - f"stdout:\n{result.stdout.strip()}\n" - f"stderr:\n{result.stderr.strip()}" + f"(rc={command_result.returncode}): {' '.join(command_arguments)}\n" + f"stdout:\n{command_result.stdout.strip()}\n" + f"stderr:\n{command_result.stderr.strip()}" ) - return result + return command_result def get_dex_pods(min_replicas: int = 2) -> list[str]: @@ -257,37 +266,57 @@ def get_dex_pods(min_replicas: int = 2) -> list[str]: Raises if fewer than min_replicas pods are found — the parallel authentication test requires at least two replicas to verify cross-replica load distribution. """ - cmd = [ + command_arguments = [ "kubectl", "--request-timeout", KUBECTL_REQUEST_TIMEOUT, "-n", "auth", "get", "pods", "-l", DEX_POD_SELECTOR, - "-o", "jsonpath={.items[*].metadata.name}", + "--field-selector=status.phase=Running", + "-o", "json", ] - result = run_cmd_or_fail(cmd) - pods = [pod for pod in result.stdout.strip().split() if pod] - if len(pods) < min_replicas: + command_result = run_command_or_fail(command_arguments) + try: + pod_list = json.loads(command_result.stdout) + except json.JSONDecodeError as json_decode_error: + raise RuntimeError( + "Failed to parse Dex pod list JSON: " + f"{json_decode_error}" + ) from json_decode_error + + ready_pod_names = [] + for pod_item in pod_list.get("items", []): + readiness_conditions = pod_item.get("status", {}).get("conditions", []) + is_ready = any( + condition.get("type") == "Ready" and condition.get("status") == "True" + for condition in readiness_conditions + ) + if is_ready: + ready_pod_names.append(pod_item["metadata"]["name"]) + + if len(ready_pod_names) < min_replicas: raise RuntimeError( f"Expected at least {min_replicas} Dex pods (selector: {DEX_POD_SELECTOR}) " - f"in namespace auth, found: {pods}. " + f"in namespace auth, found: {ready_pod_names}. " "The Dex deployment at common/dex/base/deployment.yaml is configured with " "replicas: 2 — ensure all pods have reached the Ready state before running this test." ) - return pods + return ready_pod_names -def count_authentication_hits_for_pod(pod: str, since_seconds: int) -> int: +def count_authentication_hits_for_pod(pod_name: str, relative_log_window_seconds: int) -> int: """Count how many successful authentication events appear in a pod's logs.""" - cmd = [ + command_arguments = [ "kubectl", "--request-timeout", KUBECTL_REQUEST_TIMEOUT, "-n", "auth", - "logs", pod, - f"--since={since_seconds}s", + "logs", pod_name, + f"--since={relative_log_window_seconds}s", ] - result = run_cmd_or_fail(cmd) - return len(re.findall(re.escape(AUTHENTICATION_SUCCESS_LOG_MARKER), result.stdout)) + command_result = run_command_or_fail(command_arguments) + return len( + re.findall(re.escape(AUTHENTICATION_SUCCESS_LOG_MARKER), command_result.stdout) + ) def count_authcodes_objects() -> int: @@ -296,22 +325,29 @@ def count_authcodes_objects() -> int: Dex creates one authcode object per login; the GC process deletes them after the token exchange completes. Returns 0 if no instances exist. """ - cmd = [ + command_arguments = [ "kubectl", "--request-timeout", KUBECTL_REQUEST_TIMEOUT, "get", DEX_AUTHCODE_RESOURCE, - "-A", "--no-headers", + "-A", "-o", "json", ] - result = run_cmd(cmd) + command_result = run_command(command_arguments) # "no resources found" is a normal state — return 0 rather than raising - if result.returncode != 0: - combined = (result.stdout + "\n" + result.stderr).lower() - if "no resources found" in combined: - return 0 + combined_output = (command_result.stdout + "\n" + command_result.stderr).lower() + if "no resources found" in combined_output: + return 0 + if command_result.returncode != 0: raise RuntimeError( - f"Failed to query {DEX_AUTHCODE_RESOURCE}: {result.stderr.strip()}" + f"Failed to query {DEX_AUTHCODE_RESOURCE}: {command_result.stderr.strip()}" ) - return len([line for line in result.stdout.splitlines() if line.strip()]) + try: + authcode_list = json.loads(command_result.stdout) + except json.JSONDecodeError as json_decode_error: + raise RuntimeError( + "Failed to parse Dex authcode JSON: " + f"{json_decode_error}" + ) from json_decode_error + return len(authcode_list.get("items", [])) def run_single_authentication() -> str: @@ -325,12 +361,19 @@ def run_single_authentication() -> str: return manager.get_session_cookies() -def run_parallel_authentication_session(index: int) -> ParallelAuthenticationResult: +def run_parallel_authentication_session(session_index: int) -> ParallelAuthenticationResult: try: run_single_authentication() - return ParallelAuthenticationResult(index=index, ok=True) - except Exception as exc: - return ParallelAuthenticationResult(index=index, ok=False, error=str(exc)) + return ParallelAuthenticationResult( + session_index=session_index, + succeeded=True, + ) + except Exception as authentication_exception: + return ParallelAuthenticationResult( + session_index=session_index, + succeeded=False, + error_message=str(authentication_exception), + ) def run_parallel_validation() -> None: @@ -347,39 +390,51 @@ def run_parallel_validation() -> None: Kubernetes CRD objects that Dex actively deletes after each token exchange. Requires at least 2 Dex replicas (replicas: 2 in common/dex/base/deployment.yaml). - The since_seconds log window is sized to cover the burst plus GC wait plus a buffer - so that baseline and post-burst reads always observe the same window. + The relative log window is sized to cover the burst plus GC wait plus a buffer. + Repeated reads still observe a sliding relative window, because `kubectl logs + --since=s` is evaluated relative to the time of each call. """ - pods = get_dex_pods(min_replicas=2) - print(f"Dex pods: {pods}") + ready_pod_names = get_dex_pods(min_replicas=2) + print(f"Dex pods: {ready_pod_names}") - # Size the log window to cover the burst duration plus GC wait plus a buffer. - since_seconds = max(GC_WAIT_SECONDS + 120, 300) + # Size the relative log window to cover the burst duration plus GC wait plus a + # buffer. This reduces the chance of missing burst activity, but it does not + # create a fixed comparison interval across multiple reads. + relative_log_window_seconds = max(GC_WAIT_SECONDS + 120, 300) # Snapshot state before the burst - baseline_hits = { - pod: count_authentication_hits_for_pod(pod, since_seconds) - for pod in pods + baseline_authentication_hits = { + pod_name: count_authentication_hits_for_pod( + pod_name, relative_log_window_seconds + ) + for pod_name in ready_pod_names } authcodes_before = count_authcodes_objects() print(f"Running parallel authentication burst with sessions={PARALLEL_SESSIONS}") # Run all parallel authentication sessions and collect results - failures = [] + authentication_failures = [] with concurrent.futures.ThreadPoolExecutor(max_workers=PARALLEL_SESSIONS) as executor: futures = [ - executor.submit(run_parallel_authentication_session, index) - for index in range(PARALLEL_SESSIONS) + executor.submit(run_parallel_authentication_session, session_index) + for session_index in range(PARALLEL_SESSIONS) ] - for future in concurrent.futures.as_completed(futures, timeout=REQUEST_TIMEOUT_SECONDS * 3): - result = future.result() - if not result.ok: - failures.append(result) - - if failures: + for future in concurrent.futures.as_completed( + futures, timeout=PARALLEL_TEST_TIMEOUT_SECONDS + ): + authentication_result = future.result() + if not authentication_result.succeeded: + authentication_failures.append(authentication_result) + + if authentication_failures: error_summary = "; ".join( - [f"session={f.index} error={f.error}" for f in failures] + [ + "session=" + f"{authentication_failure.session_index} " + f"error={authentication_failure.error_message}" + for authentication_failure in authentication_failures + ] ) raise RuntimeError( f"Parallel authentication session failures: {error_summary}" @@ -389,21 +444,31 @@ def run_parallel_validation() -> None: # This confirms the load balancer is distributing traffic across pods. # Requires sessionAffinity to be absent from the Dex Service — affinity would pin # all sessions from the same source IP to a single pod, defeating this check. - post_hits = { - pod: count_authentication_hits_for_pod(pod, since_seconds) - for pod in pods + post_burst_authentication_hits = { + pod_name: count_authentication_hits_for_pod( + pod_name, relative_log_window_seconds + ) + for pod_name in ready_pod_names } - hit_delta = { - pod: max(post_hits[pod] - baseline_hits[pod], 0) - for pod in pods + authentication_hit_delta_by_pod = { + pod_name: max( + post_burst_authentication_hits[pod_name] + - baseline_authentication_hits[pod_name], + 0, + ) + for pod_name in ready_pod_names } - print(f"Authentication hit delta by pod: {hit_delta}") + print(f"Authentication hit delta by pod: {authentication_hit_delta_by_pod}") - hit_pods = [pod for pod, delta in hit_delta.items() if delta > 0] - if len(hit_pods) < 2: + pods_with_authentication_hits = [ + pod_name + for pod_name, hit_delta in authentication_hit_delta_by_pod.items() + if hit_delta > 0 + ] + if len(pods_with_authentication_hits) < 2: raise RuntimeError( "Expected authentication traffic across at least two Dex replicas " - f"but observed: {hit_delta}. " + f"but observed: {authentication_hit_delta_by_pod}. " "Verify that the Dex Service has no sessionAffinity configured." ) @@ -413,11 +478,12 @@ def run_parallel_validation() -> None: authcodes_after_burst = count_authcodes_objects() print(f"Authcodes count: before={authcodes_before} after_burst={authcodes_after_burst}") - time.sleep(GC_WAIT_SECONDS) - authcodes_after_wait = count_authcodes_objects() - print(f"Authcodes count after GC wait ({GC_WAIT_SECONDS}s): {authcodes_after_wait}") - if authcodes_after_burst > authcodes_before: + time.sleep(GC_WAIT_SECONDS) + authcodes_after_wait = count_authcodes_objects() + print( + f"Authcodes count after GC wait ({GC_WAIT_SECONDS}s): {authcodes_after_wait}" + ) # The burst created new authcodes — GC must reduce the count if authcodes_after_wait >= authcodes_after_burst: raise RuntimeError( @@ -426,14 +492,6 @@ def run_parallel_validation() -> None: f"before={authcodes_before} burst={authcodes_after_burst} " f"after_wait={authcodes_after_wait}" ) - elif authcodes_after_wait > authcodes_after_burst: - # No burst growth but count increased during wait — unexpected leak - raise RuntimeError( - "Authcodes increased during GC wait window despite no observed burst growth — " - "possible authcode leak from another process. " - f"before={authcodes_before} burst={authcodes_after_burst} " - f"after_wait={authcodes_after_wait}" - ) def main() -> None: @@ -447,6 +505,9 @@ def main() -> None: if __name__ == "__main__": try: main() - except Exception as exc: - print(f"Dex authentication test failed: {exc}", file=sys.stderr) + except Exception as authentication_exception: + print( + f"Dex authentication test failed: {authentication_exception}", + file=sys.stderr, + ) raise SystemExit(1) From 0270666db1a886c7798260c218a8f494b5ee3f56 Mon Sep 17 00:00:00 2001 From: danish9039 Date: Fri, 10 Apr 2026 23:44:25 +0530 Subject: [PATCH 14/16] test: clarify dex authentication naming Signed-off-by: danish9039 --- tests/dex_login_test.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/dex_login_test.py b/tests/dex_login_test.py index 510ced15dd..f4f4cd4a92 100755 --- a/tests/dex_login_test.py +++ b/tests/dex_login_test.py @@ -16,7 +16,7 @@ ENDPOINT_URL = "http://localhost:8080" DEX_USERNAME = "user@example.com" DEX_PASSWORD = "12341234" -DEX_AUTH_TYPE = "local" +DEX_AUTHENTICATION_TYPE = "local" # Matches replicas: 2 in common/dex/base/deployment.yaml. # Use a larger burst so the replica-distribution assertion is statistically stable in CI. PARALLEL_SESSIONS = 8 @@ -50,7 +50,7 @@ def __init__( endpoint_url: str, dex_username: str, dex_password: str, - dex_auth_type: str = "local", + dex_authentication_type: str = "local", skip_tls_verify: bool = False, ): """ @@ -60,21 +60,21 @@ def __init__( :param skip_tls_verify: if True, skip TLS verification :param dex_username: the Dex username :param dex_password: the Dex password - :param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local'] + :param dex_authentication_type: the authentication type to use if Dex has multiple enabled, one of: ['ldap', 'local'] """ self._endpoint_url = endpoint_url self._skip_tls_verify = skip_tls_verify self._dex_username = dex_username self._dex_password = dex_password - self._dex_auth_type = dex_auth_type + self._dex_authentication_type = dex_authentication_type if self._skip_tls_verify: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - # ensure `dex_auth_type` is valid - if self._dex_auth_type not in ["ldap", "local"]: + # ensure `dex_authentication_type` is valid + if self._dex_authentication_type not in ["ldap", "local"]: raise ValueError( - f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']" + f"Invalid `dex_authentication_type` '{self._dex_authentication_type}', must be one of: ['ldap', 'local']" ) def _request_get(self, session: requests.Session, request_url: str) -> requests.Response: @@ -110,7 +110,7 @@ def _resolve_dex_login_url(self, session: requests.Session, split_url_object) -> split_url_object = split_url_object._replace( path=re.sub( r"/auth$", - f"/auth/{self._dex_auth_type}", + f"/auth/{self._dex_authentication_type}", split_url_object.path, ) ) @@ -356,7 +356,7 @@ def run_single_authentication() -> str: skip_tls_verify=True, dex_username=DEX_USERNAME, dex_password=DEX_PASSWORD, - dex_auth_type=DEX_AUTH_TYPE, + dex_authentication_type=DEX_AUTHENTICATION_TYPE, ) return manager.get_session_cookies() From c29a65db83d8ba5bbf8472b96b292fa4c24085f2 Mon Sep 17 00:00:00 2001 From: danish9039 Date: Sat, 2 May 2026 17:27:10 +0530 Subject: [PATCH 15/16] test: handle dex batch timeout Signed-off-by: danish9039 --- tests/dex_login_test.py | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/tests/dex_login_test.py b/tests/dex_login_test.py index f4f4cd4a92..e08a807712 100755 --- a/tests/dex_login_test.py +++ b/tests/dex_login_test.py @@ -23,7 +23,15 @@ # Dex authcode GC window: authcodes must be deleted after token exchange completes. GC_WAIT_SECONDS = 90 REQUEST_TIMEOUT_SECONDS = 15 -PARALLEL_TEST_TIMEOUT_SECONDS = PARALLEL_SESSIONS * REQUEST_TIMEOUT_SECONDS +# One authentication session can perform several sequential HTTP requests: +# endpoint GET, oauth2-proxy start, Dex login GET and POST, optional approval, +# and optional recovery after a 403 response. +MAXIMUM_SEQUENTIAL_HTTP_REQUESTS_PER_SESSION = 8 +PARALLEL_TEST_TIMEOUT_BUFFER_SECONDS = 30 +PARALLEL_TEST_TIMEOUT_SECONDS = ( + REQUEST_TIMEOUT_SECONDS * MAXIMUM_SEQUENTIAL_HTTP_REQUESTS_PER_SESSION + + PARALLEL_TEST_TIMEOUT_BUFFER_SECONDS +) KUBECTL_TIMEOUT_SECONDS = 120 KUBECTL_REQUEST_TIMEOUT = "30s" @@ -420,12 +428,27 @@ def run_parallel_validation() -> None: executor.submit(run_parallel_authentication_session, session_index) for session_index in range(PARALLEL_SESSIONS) ] - for future in concurrent.futures.as_completed( - futures, timeout=PARALLEL_TEST_TIMEOUT_SECONDS - ): - authentication_result = future.result() - if not authentication_result.succeeded: - authentication_failures.append(authentication_result) + completed_futures = set() + try: + for future in concurrent.futures.as_completed( + futures, timeout=PARALLEL_TEST_TIMEOUT_SECONDS + ): + completed_futures.add(future) + authentication_result = future.result() + if not authentication_result.succeeded: + authentication_failures.append(authentication_result) + except concurrent.futures.TimeoutError as timeout_error: + pending_futures = [ + future for future in futures if future not in completed_futures + ] + for future in pending_futures: + future.cancel() + raise RuntimeError( + "Parallel authentication sessions exceeded the batch timeout of " + f"{PARALLEL_TEST_TIMEOUT_SECONDS} seconds: " + f"completed={len(completed_futures)} " + f"pending={len(pending_futures)}" + ) from timeout_error if authentication_failures: error_summary = "; ".join( From 38d9d36ba891cf24a0100070d40fdfea00633cb1 Mon Sep 17 00:00:00 2001 From: danish9039 Date: Mon, 4 May 2026 16:07:05 +0530 Subject: [PATCH 16/16] test: scope dex authcodes to auth namespace Signed-off-by: danish9039 --- tests/dex_login_test.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/dex_login_test.py b/tests/dex_login_test.py index e08a807712..ad29a1959d 100755 --- a/tests/dex_login_test.py +++ b/tests/dex_login_test.py @@ -20,8 +20,8 @@ # Matches replicas: 2 in common/dex/base/deployment.yaml. # Use a larger burst so the replica-distribution assertion is statistically stable in CI. PARALLEL_SESSIONS = 8 -# Dex authcode GC window: authcodes must be deleted after token exchange completes. -GC_WAIT_SECONDS = 90 +# Dex authcode garbage-collection window: authcodes must be deleted after token exchange completes. +GARBAGE_COLLECTION_WAIT_SECONDS = 90 REQUEST_TIMEOUT_SECONDS = 15 # One authentication session can perform several sequential HTTP requests: # endpoint GET, oauth2-proxy start, Dex login GET and POST, optional approval, @@ -329,15 +329,16 @@ def count_authentication_hits_for_pod(pod_name: str, relative_log_window_seconds def count_authcodes_objects() -> int: """ - Count the number of Dex authcode CRD objects currently in the cluster. + Count the number of Dex authcode CRD objects currently in the auth namespace. Dex creates one authcode object per login; the GC process deletes them after the token exchange completes. Returns 0 if no instances exist. """ command_arguments = [ "kubectl", "--request-timeout", KUBECTL_REQUEST_TIMEOUT, + "-n", "auth", "get", DEX_AUTHCODE_RESOURCE, - "-A", "-o", "json", + "-o", "json", ] command_result = run_command(command_arguments) # "no resources found" is a normal state — return 0 rather than raising @@ -394,11 +395,11 @@ def run_parallel_validation() -> None: balancer distributes connections freely, so a single burst is sufficient to observe both replicas receiving traffic. 3. Dex authcode CRD objects created during the burst are garbage collected after - the GC_WAIT_SECONDS window. With storage.type=kubernetes, authcodes are + the GARBAGE_COLLECTION_WAIT_SECONDS window. With storage.type=kubernetes, authcodes are Kubernetes CRD objects that Dex actively deletes after each token exchange. Requires at least 2 Dex replicas (replicas: 2 in common/dex/base/deployment.yaml). - The relative log window is sized to cover the burst plus GC wait plus a buffer. + The relative log window is sized to cover the burst plus the garbage-collection wait plus a buffer. Repeated reads still observe a sliding relative window, because `kubectl logs --since=s` is evaluated relative to the time of each call. """ @@ -408,7 +409,7 @@ def run_parallel_validation() -> None: # Size the relative log window to cover the burst duration plus GC wait plus a # buffer. This reduces the chance of missing burst activity, but it does not # create a fixed comparison interval across multiple reads. - relative_log_window_seconds = max(GC_WAIT_SECONDS + 120, 300) + relative_log_window_seconds = max(GARBAGE_COLLECTION_WAIT_SECONDS + 120, 300) # Snapshot state before the burst baseline_authentication_hits = { @@ -502,10 +503,10 @@ def run_parallel_validation() -> None: print(f"Authcodes count: before={authcodes_before} after_burst={authcodes_after_burst}") if authcodes_after_burst > authcodes_before: - time.sleep(GC_WAIT_SECONDS) + time.sleep(GARBAGE_COLLECTION_WAIT_SECONDS) authcodes_after_wait = count_authcodes_objects() print( - f"Authcodes count after GC wait ({GC_WAIT_SECONDS}s): {authcodes_after_wait}" + f"Authcodes count after garbage-collection wait ({GARBAGE_COLLECTION_WAIT_SECONDS}s): {authcodes_after_wait}" ) # The burst created new authcodes — GC must reduce the count if authcodes_after_wait >= authcodes_after_burst: