|
9 | 9 | import hashlib |
10 | 10 | import time |
11 | 11 | from collections.abc import Callable |
12 | | -from typing import Protocol |
| 12 | +from typing import Any, Protocol |
13 | 13 | from unittest.mock import patch |
14 | 14 | from urllib.parse import parse_qs, urlparse |
15 | 15 |
|
@@ -696,3 +696,182 @@ def test_landing_page_no_user_info_without_pkce(self) -> None: |
696 | 696 | result = client.simulate_get("/vgi", headers={"Accept": "text/html"}) |
697 | 697 | assert result.status_code == 200 |
698 | 698 | assert b"vgi-user-info" not in result.content |
| 699 | + |
| 700 | + |
| 701 | +# --------------------------------------------------------------------------- |
| 702 | +# OAuth token proxy ({prefix}/_oauth/token) |
| 703 | +# --------------------------------------------------------------------------- |
| 704 | + |
| 705 | + |
| 706 | +def _mock_httpx_for_proxy( |
| 707 | + mock_client_cls: object, |
| 708 | + upstream_status: int = 200, |
| 709 | + upstream_body: bytes | None = None, |
| 710 | + capture: dict[str, Any] | None = None, |
| 711 | +) -> None: |
| 712 | + """Wire up the httpx.Client mock for OIDC discovery and the proxy's upstream call. |
| 713 | +
|
| 714 | + Stubs both ``.get()`` (discovery) and ``.post()`` (token forward). Captures |
| 715 | + the POST data into *capture* if provided so tests can assert what was |
| 716 | + forwarded to the IdP. |
| 717 | + """ |
| 718 | + if upstream_body is None: |
| 719 | + upstream_body = b'{"access_token":"new-access","token_type":"Bearer","expires_in":3600}' |
| 720 | + |
| 721 | + class _GetResp: |
| 722 | + status_code = 200 |
| 723 | + |
| 724 | + def raise_for_status(self) -> None: |
| 725 | + pass |
| 726 | + |
| 727 | + def json(self) -> dict[str, object]: |
| 728 | + return dict(_MOCK_OIDC_CONFIG) |
| 729 | + |
| 730 | + class _PostResp: |
| 731 | + def __init__(self) -> None: |
| 732 | + self.status_code = upstream_status |
| 733 | + self.content = upstream_body |
| 734 | + self.headers = {"content-type": "application/json"} |
| 735 | + |
| 736 | + class _Client: |
| 737 | + def __enter__(self) -> _Client: |
| 738 | + return self |
| 739 | + |
| 740 | + def __exit__(self, *a: object) -> None: |
| 741 | + return None |
| 742 | + |
| 743 | + def get(self, *_a: object, **_k: object) -> _GetResp: |
| 744 | + return _GetResp() |
| 745 | + |
| 746 | + def post(self, url: str, *, data: dict[str, str], timeout: float) -> _PostResp: |
| 747 | + if capture is not None: |
| 748 | + capture["url"] = url |
| 749 | + capture["data"] = dict(data) |
| 750 | + return _PostResp() |
| 751 | + |
| 752 | + mock_client_cls.return_value = _Client() # type: ignore[attr-defined] |
| 753 | + |
| 754 | + |
| 755 | +class TestOAuthTokenProxy: |
| 756 | + """Tests for the {prefix}/_oauth/token PKCE token-exchange proxy.""" |
| 757 | + |
| 758 | + def setup_method(self) -> None: |
| 759 | + """Generate RSA keys and create local authenticator.""" |
| 760 | + self.priv, self.pub = _make_rsa_key() |
| 761 | + self.auth = _make_local_auth(self.pub) |
| 762 | + |
| 763 | + @patch("vgi_rpc.http._oauth_pkce.httpx.Client") |
| 764 | + def test_well_known_advertises_token_endpoint(self, mock_client_cls) -> None: # type: ignore[no-untyped-def] |
| 765 | + """When client_secret is configured, /.well-known/... advertises the proxy URL.""" |
| 766 | + _mock_httpx_for_proxy(mock_client_cls) |
| 767 | + client = _make_test_app(authenticate=self.auth, oauth_metadata=_METADATA_PKCE) |
| 768 | + result = client.simulate_get("/.well-known/oauth-protected-resource/vgi") |
| 769 | + assert result.status_code == 200 |
| 770 | + body = result.json |
| 771 | + assert body["token_endpoint"] == "http://localhost:8000/vgi/_oauth/token" |
| 772 | + |
| 773 | + @patch("vgi_rpc.http._oauth_pkce.httpx.Client") |
| 774 | + def test_options_preflight(self, mock_client_cls) -> None: # type: ignore[no-untyped-def] |
| 775 | + """OPTIONS preflight returns 204 with CORS headers when Origin is allowed.""" |
| 776 | + _mock_httpx_for_proxy(mock_client_cls) |
| 777 | + client = _make_test_app(authenticate=self.auth, oauth_metadata=_METADATA_PKCE) |
| 778 | + result = client.simulate_options( |
| 779 | + "/vgi/_oauth/token", |
| 780 | + headers={"Origin": "https://cupola.query-farm.services"}, |
| 781 | + ) |
| 782 | + assert result.status_code == 204 |
| 783 | + assert result.headers.get("access-control-allow-origin") == "https://cupola.query-farm.services" |
| 784 | + assert "POST" in result.headers.get("access-control-allow-methods", "") |
| 785 | + |
| 786 | + @patch("vgi_rpc.http._oauth_pkce.httpx.Client") |
| 787 | + def test_authorization_code_forwards_with_injected_secret(self, mock_client_cls) -> None: # type: ignore[no-untyped-def] |
| 788 | + """SPA POSTs without client_secret; proxy injects it before forwarding.""" |
| 789 | + captured: dict[str, Any] = {} |
| 790 | + _mock_httpx_for_proxy(mock_client_cls, capture=captured) |
| 791 | + client = _make_test_app(authenticate=self.auth, oauth_metadata=_METADATA_PKCE) |
| 792 | + result = client.simulate_post( |
| 793 | + "/vgi/_oauth/token", |
| 794 | + headers={"Content-Type": "application/x-www-form-urlencoded"}, |
| 795 | + body="grant_type=authorization_code&code=abc&code_verifier=v&redirect_uri=https://x/cb&client_id=my-client-id", |
| 796 | + ) |
| 797 | + assert result.status_code == 200 |
| 798 | + assert captured["url"] == "https://auth.example.com/token" |
| 799 | + assert captured["data"]["grant_type"] == "authorization_code" |
| 800 | + assert captured["data"]["client_id"] == "my-client-id" |
| 801 | + assert captured["data"]["client_secret"] == "my-client-secret" |
| 802 | + assert captured["data"]["code"] == "abc" |
| 803 | + assert captured["data"]["code_verifier"] == "v" |
| 804 | + assert captured["data"]["redirect_uri"] == "https://x/cb" |
| 805 | + |
| 806 | + @patch("vgi_rpc.http._oauth_pkce.httpx.Client") |
| 807 | + def test_refresh_token_forwards_with_injected_secret(self, mock_client_cls) -> None: # type: ignore[no-untyped-def] |
| 808 | + """refresh_token grant is forwarded with the proxy-injected client_secret.""" |
| 809 | + captured: dict[str, Any] = {} |
| 810 | + _mock_httpx_for_proxy(mock_client_cls, capture=captured) |
| 811 | + client = _make_test_app(authenticate=self.auth, oauth_metadata=_METADATA_PKCE) |
| 812 | + result = client.simulate_post( |
| 813 | + "/vgi/_oauth/token", |
| 814 | + headers={"Content-Type": "application/x-www-form-urlencoded"}, |
| 815 | + body="grant_type=refresh_token&refresh_token=rtok&client_id=my-client-id&scope=openid", |
| 816 | + ) |
| 817 | + assert result.status_code == 200 |
| 818 | + assert captured["data"]["grant_type"] == "refresh_token" |
| 819 | + assert captured["data"]["refresh_token"] == "rtok" |
| 820 | + assert captured["data"]["scope"] == "openid" |
| 821 | + assert captured["data"]["client_secret"] == "my-client-secret" |
| 822 | + |
| 823 | + @patch("vgi_rpc.http._oauth_pkce.httpx.Client") |
| 824 | + def test_mismatched_client_id_rejected(self, mock_client_cls) -> None: # type: ignore[no-untyped-def] |
| 825 | + """Submitting a client_id that differs from the configured one returns 400.""" |
| 826 | + _mock_httpx_for_proxy(mock_client_cls) |
| 827 | + client = _make_test_app(authenticate=self.auth, oauth_metadata=_METADATA_PKCE) |
| 828 | + result = client.simulate_post( |
| 829 | + "/vgi/_oauth/token", |
| 830 | + headers={"Content-Type": "application/x-www-form-urlencoded"}, |
| 831 | + body="grant_type=authorization_code&client_id=evil&code=x&code_verifier=v&redirect_uri=https://x", |
| 832 | + ) |
| 833 | + assert result.status_code == 400 |
| 834 | + assert result.json["error"] == "invalid_client" |
| 835 | + |
| 836 | + @patch("vgi_rpc.http._oauth_pkce.httpx.Client") |
| 837 | + def test_unsupported_grant_type_rejected(self, mock_client_cls) -> None: # type: ignore[no-untyped-def] |
| 838 | + """Grant types other than authorization_code/refresh_token are rejected.""" |
| 839 | + _mock_httpx_for_proxy(mock_client_cls) |
| 840 | + client = _make_test_app(authenticate=self.auth, oauth_metadata=_METADATA_PKCE) |
| 841 | + result = client.simulate_post( |
| 842 | + "/vgi/_oauth/token", |
| 843 | + headers={"Content-Type": "application/x-www-form-urlencoded"}, |
| 844 | + body="grant_type=client_credentials", |
| 845 | + ) |
| 846 | + assert result.status_code == 400 |
| 847 | + assert result.json["error"] == "unsupported_grant_type" |
| 848 | + |
| 849 | + @patch("vgi_rpc.http._oauth_pkce.httpx.Client") |
| 850 | + def test_idp_error_passthrough(self, mock_client_cls) -> None: # type: ignore[no-untyped-def] |
| 851 | + """IdP error response is forwarded verbatim with its status code.""" |
| 852 | + _mock_httpx_for_proxy( |
| 853 | + mock_client_cls, |
| 854 | + upstream_status=400, |
| 855 | + upstream_body=b'{"error":"invalid_grant","error_description":"bad code"}', |
| 856 | + ) |
| 857 | + client = _make_test_app(authenticate=self.auth, oauth_metadata=_METADATA_PKCE) |
| 858 | + result = client.simulate_post( |
| 859 | + "/vgi/_oauth/token", |
| 860 | + headers={"Content-Type": "application/x-www-form-urlencoded"}, |
| 861 | + body="grant_type=authorization_code&code=bad&code_verifier=v&redirect_uri=https://x", |
| 862 | + ) |
| 863 | + assert result.status_code == 400 |
| 864 | + assert result.json["error"] == "invalid_grant" |
| 865 | + assert result.json["error_description"] == "bad code" |
| 866 | + |
| 867 | + @patch("vgi_rpc.http._oauth_pkce.httpx.Client") |
| 868 | + def test_wrong_content_type_rejected(self, mock_client_cls) -> None: # type: ignore[no-untyped-def] |
| 869 | + """Non-form Content-Type is rejected with 415.""" |
| 870 | + _mock_httpx_for_proxy(mock_client_cls) |
| 871 | + client = _make_test_app(authenticate=self.auth, oauth_metadata=_METADATA_PKCE) |
| 872 | + result = client.simulate_post( |
| 873 | + "/vgi/_oauth/token", |
| 874 | + headers={"Content-Type": "application/json"}, |
| 875 | + body='{"grant_type":"authorization_code"}', |
| 876 | + ) |
| 877 | + assert result.status_code == 415 |
0 commit comments