Skip to content

Commit 36432fc

Browse files
committed
Add PAT (Personal Access Token) auth support via Bearer header
- Add bearer_token field to Configuration with DD_BEARER_TOKEN env var support - Add bearerAuth entry to auth_settings() when bearer_token is set - Modify update_params_for_auth() to send Authorization: Bearer instead of DD-APPLICATION-KEY when bearer_token is configured - Delegated auth takes priority over bearer token when both are configured - Update generator templates (configuration.j2, api_client.j2) to match - Add comprehensive tests in tests/test_pat_auth.py Ref: CRED-2146
1 parent 9f2b67a commit 36432fc

File tree

5 files changed

+237
-0
lines changed

5 files changed

+237
-0
lines changed

.generator/src/generator/templates/api_client.j2

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,9 +893,34 @@ class Endpoint:
893893
self.api_client.configuration.delegated_auth_org_uuid is not None
894894
)
895895

896+
# Check if bearer token (PAT) auth is configured
897+
has_bearer_token = self.api_client.configuration.bearer_token is not None
898+
896899
if has_app_key_auth and has_delegated_auth:
897900
# Use delegated token authentication
898901
self.api_client.use_delegated_token_auth(headers)
902+
elif has_app_key_auth and has_bearer_token:
903+
# Use bearer token (PAT) authentication:
904+
# Send API key + Authorization: Bearer instead of app key
905+
all_auth_settings = self.api_client.configuration.auth_settings()
906+
for auth in self.settings["auth"]:
907+
if auth == "appKeyAuth":
908+
# Replace app key with bearer token
909+
bearer_setting = all_auth_settings.get("bearerAuth")
910+
if bearer_setting:
911+
headers[bearer_setting["key"]] = bearer_setting["value"]
912+
continue
913+
auth_setting = all_auth_settings.get(auth)
914+
if auth_setting:
915+
if auth_setting["in"] == "header":
916+
if auth_setting["type"] != "http-signature":
917+
if auth_setting["value"] is None:
918+
raise ApiValueError("Invalid authentication token for {}".format(auth_setting["key"]))
919+
headers[auth_setting["key"]] = auth_setting["value"]
920+
elif auth_setting["in"] == "query":
921+
queries.append((auth_setting["key"], auth_setting["value"]))
922+
else:
923+
raise ApiValueError("Authentication token must be in `query` or `header`")
899924
else:
900925
# Use regular authentication
901926
for auth in self.settings["auth"]:

.generator/src/generator/templates/configuration.j2

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ class Configuration:
179179
retry_policy=None,
180180
delegated_auth_provider=None,
181181
delegated_auth_org_uuid=None,
182+
bearer_token=None,
182183
):
183184
"""Constructor."""
184185
self._base_path = "https://api.datadoghq.com" if host is None else host
@@ -190,6 +191,7 @@ class Configuration:
190191

191192
# Authentication Settings
192193
self.access_token = access_token
194+
self.bearer_token = bearer_token
193195
self.api_key = {}
194196
if api_key:
195197
self.api_key = api_key
@@ -269,6 +271,8 @@ class Configuration:
269271
self.api_key["apiKeyAuth"] = os.environ["DD_API_KEY"]
270272
if "DD_APP_KEY" in os.environ and not self.api_key.get("appKeyAuth"):
271273
self.api_key["appKeyAuth"] = os.environ["DD_APP_KEY"]
274+
if "DD_BEARER_TOKEN" in os.environ and not self.bearer_token:
275+
self.bearer_token = os.environ["DD_BEARER_TOKEN"]
272276

273277
def __deepcopy__(self, memo):
274278
cls = self.__class__
@@ -557,5 +561,12 @@ class Configuration:
557561
}
558562
{%- endif %}
559563
{%- endfor %}
564+
if self.bearer_token is not None:
565+
auth["bearerAuth"] = {
566+
"type": "bearer",
567+
"in": "header",
568+
"key": "Authorization",
569+
"value": "Bearer " + self.bearer_token,
570+
}
560571
return auth
561572
{# keep new line #}

src/datadog_api_client/api_client.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -902,9 +902,34 @@ def update_params_for_auth(self, headers, queries) -> None:
902902
and self.api_client.configuration.delegated_auth_org_uuid is not None
903903
)
904904

905+
# Check if bearer token (PAT) auth is configured
906+
has_bearer_token = self.api_client.configuration.bearer_token is not None
907+
905908
if has_app_key_auth and has_delegated_auth:
906909
# Use delegated token authentication
907910
self.api_client.use_delegated_token_auth(headers)
911+
elif has_app_key_auth and has_bearer_token:
912+
# Use bearer token (PAT) authentication:
913+
# Send API key + Authorization: Bearer instead of app key
914+
all_auth_settings = self.api_client.configuration.auth_settings()
915+
for auth in self.settings["auth"]:
916+
if auth == "appKeyAuth":
917+
# Replace app key with bearer token
918+
bearer_setting = all_auth_settings.get("bearerAuth")
919+
if bearer_setting:
920+
headers[bearer_setting["key"]] = bearer_setting["value"]
921+
continue
922+
auth_setting = all_auth_settings.get(auth)
923+
if auth_setting:
924+
if auth_setting["in"] == "header":
925+
if auth_setting["type"] != "http-signature":
926+
if auth_setting["value"] is None:
927+
raise ApiValueError("Invalid authentication token for {}".format(auth_setting["key"]))
928+
headers[auth_setting["key"]] = auth_setting["value"]
929+
elif auth_setting["in"] == "query":
930+
queries.append((auth_setting["key"], auth_setting["value"]))
931+
else:
932+
raise ApiValueError("Authentication token must be in `query` or `header`")
908933
else:
909934
# Use regular authentication
910935
for auth in self.settings["auth"]:

src/datadog_api_client/configuration.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ def __init__(
180180
retry_policy=None,
181181
delegated_auth_provider=None,
182182
delegated_auth_org_uuid=None,
183+
bearer_token=None,
183184
):
184185
"""Constructor."""
185186
self._base_path = "https://api.datadoghq.com" if host is None else host
@@ -191,6 +192,7 @@ def __init__(
191192

192193
# Authentication Settings
193194
self.access_token = access_token
195+
self.bearer_token = bearer_token
194196
self.api_key = {}
195197
if api_key:
196198
self.api_key = api_key
@@ -478,6 +480,8 @@ def __init__(
478480
self.api_key["apiKeyAuth"] = os.environ["DD_API_KEY"]
479481
if "DD_APP_KEY" in os.environ and not self.api_key.get("appKeyAuth"):
480482
self.api_key["appKeyAuth"] = os.environ["DD_APP_KEY"]
483+
if "DD_BEARER_TOKEN" in os.environ and not self.bearer_token:
484+
self.bearer_token = os.environ["DD_BEARER_TOKEN"]
481485

482486
def __deepcopy__(self, memo):
483487
cls = self.__class__
@@ -777,4 +781,11 @@ def auth_settings(self):
777781
"appKeyAuth",
778782
),
779783
}
784+
if self.bearer_token is not None:
785+
auth["bearerAuth"] = {
786+
"type": "bearer",
787+
"in": "header",
788+
"key": "Authorization",
789+
"value": "Bearer " + self.bearer_token,
790+
}
780791
return auth

tests/test_pat_auth.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
"""Tests for Personal Access Token (PAT) authentication support."""
2+
3+
import pytest
4+
from datetime import datetime, timedelta
5+
from unittest.mock import patch
6+
7+
from datadog_api_client.api_client import ApiClient, Endpoint as _Endpoint
8+
from datadog_api_client.configuration import Configuration
9+
from datadog_api_client.delegated_auth import (
10+
DelegatedTokenCredentials,
11+
DelegatedTokenConfig,
12+
DelegatedTokenProvider,
13+
)
14+
15+
16+
class TestBearerTokenConfiguration:
17+
"""Test bearer_token field on Configuration."""
18+
19+
def test_bearer_token_stored(self):
20+
config = Configuration(bearer_token="ddapp_test123")
21+
assert config.bearer_token == "ddapp_test123"
22+
23+
def test_bearer_token_default_none(self):
24+
config = Configuration()
25+
assert config.bearer_token is None
26+
27+
@patch.dict("os.environ", {"DD_BEARER_TOKEN": "ddapp_from_env"})
28+
def test_bearer_token_env_var(self):
29+
config = Configuration()
30+
assert config.bearer_token == "ddapp_from_env"
31+
32+
@patch.dict("os.environ", {"DD_BEARER_TOKEN": "ddapp_from_env"})
33+
def test_bearer_token_env_var_no_override(self):
34+
config = Configuration(bearer_token="ddapp_explicit")
35+
assert config.bearer_token == "ddapp_explicit"
36+
37+
38+
class TestAuthSettingsWithBearerToken:
39+
"""Test auth_settings() with bearer_token configured."""
40+
41+
def test_auth_settings_includes_bearer(self):
42+
config = Configuration(bearer_token="ddapp_test123")
43+
auth = config.auth_settings()
44+
assert "bearerAuth" in auth
45+
assert auth["bearerAuth"]["type"] == "bearer"
46+
assert auth["bearerAuth"]["in"] == "header"
47+
assert auth["bearerAuth"]["key"] == "Authorization"
48+
assert auth["bearerAuth"]["value"] == "Bearer ddapp_test123"
49+
50+
def test_auth_settings_without_bearer(self):
51+
config = Configuration()
52+
auth = config.auth_settings()
53+
assert "bearerAuth" not in auth
54+
55+
def test_auth_settings_bearer_coexists_with_api_keys(self):
56+
config = Configuration(
57+
api_key={"apiKeyAuth": "test-api-key", "appKeyAuth": "test-app-key"},
58+
bearer_token="ddapp_test123",
59+
)
60+
auth = config.auth_settings()
61+
assert "bearerAuth" in auth
62+
assert "apiKeyAuth" in auth
63+
assert "appKeyAuth" in auth
64+
65+
66+
class TestUpdateParamsForAuthWithBearerToken:
67+
"""Test that update_params_for_auth uses bearer token correctly."""
68+
69+
def _make_endpoint(self, config, auth_schemes):
70+
"""Helper to create an Endpoint with given auth schemes."""
71+
api_client = ApiClient(config)
72+
return _Endpoint(
73+
settings={
74+
"response_type": None,
75+
"auth": auth_schemes,
76+
"endpoint_path": "/api/v2/test",
77+
"operation_id": "test_op",
78+
"http_method": "GET",
79+
"version": "v2",
80+
},
81+
params_map={},
82+
headers_map={"accept": ["application/json"]},
83+
api_client=api_client,
84+
)
85+
86+
def test_bearer_replaces_app_key(self):
87+
config = Configuration(
88+
api_key={"apiKeyAuth": "test-api-key", "appKeyAuth": "test-app-key"},
89+
bearer_token="ddapp_test_pat",
90+
)
91+
endpoint = self._make_endpoint(config, ["apiKeyAuth", "appKeyAuth"])
92+
headers = {}
93+
queries = []
94+
endpoint.update_params_for_auth(headers, queries)
95+
96+
assert headers["DD-API-KEY"] == "test-api-key"
97+
assert headers["Authorization"] == "Bearer ddapp_test_pat"
98+
assert "DD-APPLICATION-KEY" not in headers
99+
100+
def test_bearer_without_app_key_auth_scheme(self):
101+
"""If endpoint doesn't use appKeyAuth, bearer token is not used."""
102+
config = Configuration(
103+
api_key={"apiKeyAuth": "test-api-key"},
104+
bearer_token="ddapp_test_pat",
105+
)
106+
endpoint = self._make_endpoint(config, ["apiKeyAuth"])
107+
headers = {}
108+
queries = []
109+
endpoint.update_params_for_auth(headers, queries)
110+
111+
assert headers["DD-API-KEY"] == "test-api-key"
112+
assert "Authorization" not in headers
113+
114+
def test_regular_auth_without_bearer(self):
115+
config = Configuration(
116+
api_key={"apiKeyAuth": "test-api-key", "appKeyAuth": "test-app-key"},
117+
)
118+
endpoint = self._make_endpoint(config, ["apiKeyAuth", "appKeyAuth"])
119+
headers = {}
120+
queries = []
121+
endpoint.update_params_for_auth(headers, queries)
122+
123+
assert headers["DD-API-KEY"] == "test-api-key"
124+
assert headers["DD-APPLICATION-KEY"] == "test-app-key"
125+
assert "Authorization" not in headers
126+
127+
def test_delegated_auth_takes_priority_over_bearer(self):
128+
"""When both delegated auth and bearer token are configured, delegated wins."""
129+
130+
class MockProvider(DelegatedTokenProvider):
131+
def authenticate(self, config, api_config):
132+
return DelegatedTokenCredentials(
133+
org_uuid="test-org",
134+
delegated_token="delegated-token-123",
135+
delegated_proof="proof",
136+
expiration=datetime.now() + timedelta(minutes=10),
137+
)
138+
139+
config = Configuration(
140+
api_key={"apiKeyAuth": "test-api-key", "appKeyAuth": "test-app-key"},
141+
bearer_token="ddapp_test_pat",
142+
delegated_auth_provider=MockProvider(),
143+
delegated_auth_org_uuid="test-org",
144+
)
145+
endpoint = self._make_endpoint(config, ["apiKeyAuth", "appKeyAuth"])
146+
headers = {}
147+
queries = []
148+
endpoint.update_params_for_auth(headers, queries)
149+
150+
# Delegated auth should be used, not bearer token
151+
assert headers["Authorization"] == "Bearer delegated-token-123"
152+
assert "DD-APPLICATION-KEY" not in headers
153+
154+
def test_bearer_with_no_api_key_set(self):
155+
"""Bearer token with apiKeyAuth in scheme but no key configured."""
156+
config = Configuration(bearer_token="ddapp_test_pat")
157+
endpoint = self._make_endpoint(config, ["apiKeyAuth", "appKeyAuth"])
158+
headers = {}
159+
queries = []
160+
endpoint.update_params_for_auth(headers, queries)
161+
162+
# apiKeyAuth is not configured, so it won't be in headers
163+
assert "DD-API-KEY" not in headers
164+
# Bearer token should still be set (replacing appKeyAuth)
165+
assert headers["Authorization"] == "Bearer ddapp_test_pat"

0 commit comments

Comments
 (0)