Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
467 changes: 467 additions & 0 deletions tools/integration_tests/tokenserver/conftest.py

Large diffs are not rendered by default.

1,173 changes: 621 additions & 552 deletions tools/integration_tests/tokenserver/test_authorization.py

Large diffs are not rendered by default.

305 changes: 110 additions & 195 deletions tools/integration_tests/tokenserver/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,23 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
"""End-to-end integration tests for the tokenserver."""

from base64 import urlsafe_b64decode
import hmac
import json
import jwt
import random
import string
import time
import tokenlib
import unittest
from base64 import urlsafe_b64decode
from hashlib import sha256

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from fxa.core import Client
from fxa.oauth import Client as OAuthClient
from fxa.errors import ClientError, ServerError
from fxa.tests.utils import TestEmailAccount
from hashlib import sha256

from integration_tests.tokenserver.test_support import TestCase
from integration_tests.tokenserver.conftest import (
FXA_METRICS_HASH_SECRET,
TOKEN_SIGNING_SECRET,
unsafe_parse_token,
)

# This is the client ID used for Firefox Desktop. The FxA team confirmed that
# this is the proper client ID to be using for these integration tests.
Expand All @@ -35,187 +32,105 @@
SCOPE = "https://identity.mozilla.com/apps/oldsync"


class TestE2e(TestCase, unittest.TestCase):
"""End-to-end integration tests using real FxA accounts."""

def setUp(self):
"""Set up test fixtures."""
super(TestE2e, self).setUp()

def tearDown(self):
"""Tear down test fixtures."""
super(TestE2e, self).tearDown()

@classmethod
def setUpClass(cls):
"""Set up class-level test fixtures."""
# Create an ephemeral email account to use to create an FxA account
cls.acct = TestEmailAccount()
cls.client = Client(FXA_ACCOUNT_STAGE_HOST)
cls.oauth_client = OAuthClient(CLIENT_ID, None, server_url=FXA_OAUTH_STAGE_HOST)
cls.fxa_password = cls._generate_password()
# Create an FxA account for these end-to-end tests
cls.session = cls.client.create_account(
cls.acct.email, password=cls.fxa_password
)
# Loop until we receive the verification email from FxA
while not cls.acct.messages:
time.sleep(0.5)
cls.acct.fetch()
# Find the message containing the verification code and verify the
# code
for m in cls.acct.messages:
if "x-verify-code" in m["headers"]:
cls.session.verify_email_code(m["headers"]["x-verify-code"])
# Create an OAuth token to be used for the end-to-end tests
cls.oauth_token = cls.oauth_client.authorize_token(cls.session, SCOPE)

@classmethod
def tearDownClass(cls):
"""Tear down class-level test fixtures."""
cls.acct.clear()
# A teardown of some of the tests can produce a 401 error because
# of a race condition, where the record had already been removed.
# This causes `destroy_account` to return an error if it attempts
# to parse the invalid JSON response.
# It's also possible that the `destroy_account` is rejected due to
# missing authentication. It is not known why the authentication
# is considered missing.
# This traps for those events.
try:
cls.client.destroy_account(cls.acct.email, cls.fxa_password)
except (ServerError, ClientError) as ex:
print(f"warning: Encountered error when cleaning up: {ex}")

@staticmethod
def _generate_password():
r = range(PASSWORD_LENGTH)

return "".join(random.choice(PASSWORD_CHARACTERS) for i in r)

def _get_oauth_token_with_bad_scope(self):
bad_scope = "bad_scope"
return self.oauth_client.authorize_token(self.session, bad_scope)

def _get_bad_token(self):
key = rsa.generate_private_key(
backend=default_backend(), public_exponent=65537, key_size=2048
)
format = serialization.PrivateFormat.TraditionalOpenSSL
algorithm = serialization.NoEncryption()
pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=format,
encryption_algorithm=algorithm,
)
private_key = pem.decode("utf-8")
claims = {
"sub": "fake sub",
"iat": 12345,
"exp": 12345,
}

return jwt.encode(claims, private_key, algorithm="RS256")

def _extract_keys_changed_at_from_assertion(self, assertion):
token = assertion.split("~")[-2]
claims = jwt.decode(token, options={"verify_signature": False})

return claims["fxa-keysChangedAt"]

@classmethod
def _change_password(cls):
new_password = cls._generate_password()
cls.session.change_password(cls.fxa_password, new_password)
cls.fxa_password = new_password

# Adapted from the original Tokenserver:
# https://github.com/mozilla-services/tokenserver/blob/master/tokenserver/util.py#L24
def _fxa_metrics_hash(self, value):
hasher = hmac.new(self.FXA_METRICS_HASH_SECRET.encode("utf-8"), b"", sha256)
hasher.update(value.encode("utf-8"))
return hasher.hexdigest()

def test_unauthorized_oauth_error_status(self):
"""Test unauthorized oauth error status."""
# Totally busted auth -> generic error.
headers = {
"Authorization": "Unsupported-Auth-Scheme IHACKYOU",
"X-KeyID": "1234-qqo",
}
res = self.app.get("/1.0/sync/1.5", headers=headers, status=401)
expected_error_response = {
"errors": [{"description": "Unsupported", "location": "body", "name": ""}],
"status": "error",
}
self.assertEqual(res.json, expected_error_response)
token = self._get_bad_token()
headers = {"Authorization": f"Bearer {token}", "X-KeyID": "1234-qqo"}
# Bad token -> 'invalid-credentials'
res = self.app.get("/1.0/sync/1.5", headers=headers, status=401)
expected_error_response = {
"errors": [{"description": "Unauthorized", "location": "body", "name": ""}],
"status": "invalid-credentials",
}
self.assertEqual(res.json, expected_error_response)
# Untrusted scopes -> 'invalid-credentials'
token = self._get_oauth_token_with_bad_scope()
headers = {"Authorization": f"Bearer {token}", "X-KeyID": "1234-qqo"}
res = self.app.get("/1.0/sync/1.5", headers=headers, status=401)
self.assertEqual(res.json, expected_error_response)

def test_valid_oauth_request(self):
"""Test valid oauth request."""
oauth_token = self.oauth_token
headers = {"Authorization": f"Bearer {oauth_token}", "X-KeyID": "1234-qqo"}
# Send a valid request, allocating a new user
res = self.app.get("/1.0/sync/1.5", headers=headers)
fxa_uid = self.session.uid
# Retrieve the user from the database
user = self._get_user(res.json["uid"])
# First, let's verify that the token we received is valid. To do this,
# we can unpack the hawk header ID into the payload and its signature
# and then construct a tokenlib token to compute the signature
# ourselves. To obtain a matching signature, we use the same secret as
# is used by Tokenserver.
raw = urlsafe_b64decode(res.json["id"])
payload = raw[:-32]
signature = raw[-32:]
payload_str = payload.decode("utf-8")
payload_dict = json.loads(payload_str)
# The `id` payload should include a field indicating the origin of the
# token
self.assertEqual(payload_dict["tokenserver_origin"], "rust")
signing_secret = self.TOKEN_SIGNING_SECRET
tm = tokenlib.TokenManager(secret=signing_secret)
expected_signature = tm._get_signature(payload_str.encode("utf8"))
# Using the #compare_digest method here is not strictly necessary, as
# this is not a security-sensitive situation, but it's good practice
self.assertTrue(hmac.compare_digest(expected_signature, signature))
# Check that the given key is a secret derived from the hawk ID
expected_secret = tokenlib.get_derived_secret(
res.json["id"], secret=signing_secret
)
self.assertEqual(res.json["key"], expected_secret)
# Check to make sure the remainder of the fields are valid
self.assertEqual(res.json["uid"], user["uid"])
self.assertEqual(res.json["api_endpoint"], f"{self.NODE_URL}/1.5/{user['uid']}")
self.assertEqual(res.json["duration"], DEFAULT_TOKEN_DURATION)
self.assertEqual(res.json["hashalg"], "sha256")
self.assertEqual(
res.json["hashed_fxa_uid"], self._fxa_metrics_hash(fxa_uid)[:32]
)
# Verify the node_type matches the syncstorage backend being tested
self.assertEqual(res.json["node_type"], self.expected_node_type)
# The response should have an X-Timestamp header that contains the
# number of seconds since the UNIX epoch
self.assertIn("X-Timestamp", res.headers)
self.assertIsNotNone(int(res.headers["X-Timestamp"]))
token = self.unsafelyParseToken(res.json["id"])
self.assertIn("hashed_device_id", token)
self.assertEqual(token["uid"], res.json["uid"])
self.assertEqual(token["fxa_uid"], fxa_uid)
self.assertEqual(token["fxa_kid"], "0000000001234-qqo")
self.assertNotEqual(token["hashed_fxa_uid"], token["fxa_uid"])
self.assertEqual(token["hashed_fxa_uid"], res.json["hashed_fxa_uid"])
self.assertIn("hashed_device_id", token)
def _fxa_metrics_hash(value: str) -> str:
"""Compute the FxA metrics hash for a given value."""
hasher = hmac.new(FXA_METRICS_HASH_SECRET.encode("utf-8"), b"", sha256)
hasher.update(value.encode("utf-8"))
return hasher.hexdigest()


def _get_bad_token() -> str:
"""Generate a JWT signed with an untrusted key."""
key = rsa.generate_private_key(
backend=default_backend(), public_exponent=65537, key_size=2048
)
pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
claims = {"sub": "fake sub", "iat": 12345, "exp": 12345}
return jwt.encode(claims, pem.decode("utf-8"), algorithm="RS256")


def test_unauthorized_oauth_error_status(ts_ctx, fxa_auth):
"""Test unauthorized oauth error status."""
app = ts_ctx["app"]
oauth_client = fxa_auth["oauth_client"]
session = fxa_auth["session"]

# Totally busted auth -> generic error.
headers = {
"Authorization": "Unsupported-Auth-Scheme IHACKYOU",
"X-KeyID": "1234-qqo",
}
res = app.get("/1.0/sync/1.5", headers=headers, status=401)
expected_error_response = {
"errors": [{"description": "Unsupported", "location": "body", "name": ""}],
"status": "error",
}
assert res.json == expected_error_response

bad_token = _get_bad_token()
headers = {"Authorization": f"Bearer {bad_token}", "X-KeyID": "1234-qqo"}
# Bad token -> 'invalid-credentials'
res = app.get("/1.0/sync/1.5", headers=headers, status=401)
expected_error_response = {
"errors": [{"description": "Unauthorized", "location": "body", "name": ""}],
"status": "invalid-credentials",
}
assert res.json == expected_error_response

# Untrusted scopes -> 'invalid-credentials'
bad_scope_token = oauth_client.authorize_token(session, "bad_scope")
headers = {"Authorization": f"Bearer {bad_scope_token}", "X-KeyID": "1234-qqo"}
res = app.get("/1.0/sync/1.5", headers=headers, status=401)
assert res.json == expected_error_response


def test_valid_oauth_request(ts_ctx, fxa_auth):
"""Test valid oauth request."""
app = ts_ctx["app"]
expected_node_type = ts_ctx["expected_node_type"]
session = fxa_auth["session"]
oauth_token = fxa_auth["oauth_token"]

headers = {"Authorization": f"Bearer {oauth_token}", "X-KeyID": "1234-qqo"}
# Send a valid request, allocating a new user
res = app.get("/1.0/sync/1.5", headers=headers)
fxa_uid = session.uid

# Verify the token signature using tokenlib
raw = urlsafe_b64decode(res.json["id"])
payload = raw[:-32]
signature = raw[-32:]
payload_str = payload.decode("utf-8")
payload_dict = json.loads(payload_str)
# The `id` payload should include a field indicating the origin of the token
assert payload_dict["tokenserver_origin"] == "rust"
signing_secret = TOKEN_SIGNING_SECRET
tm = tokenlib.TokenManager(secret=signing_secret)
expected_signature = tm._get_signature(payload_str.encode("utf8"))
# Using compare_digest here is good practice even in a test context
assert hmac.compare_digest(expected_signature, signature)
# Check that the given key is a secret derived from the hawk ID
expected_secret = tokenlib.get_derived_secret(res.json["id"], secret=signing_secret)
assert res.json["key"] == expected_secret
# Check to make sure the remainder of the fields are valid
assert res.json["api_endpoint"].startswith("https://example.com/1.5/")
assert res.json["duration"] == 3600
assert res.json["hashalg"] == "sha256"
assert res.json["hashed_fxa_uid"] == _fxa_metrics_hash(fxa_uid)[:32]
# Verify the node_type matches the syncstorage backend being tested
assert res.json["node_type"] == expected_node_type
# The response should have an X-Timestamp header
assert "X-Timestamp" in res.headers
assert int(res.headers["X-Timestamp"]) is not None
token = unsafe_parse_token(res.json["id"])
assert "hashed_device_id" in token
assert token["uid"] == res.json["uid"]
assert token["fxa_uid"] == fxa_uid
assert token["fxa_kid"] == "0000000001234-qqo"
assert token["hashed_fxa_uid"] != token["fxa_uid"]
assert token["hashed_fxa_uid"] == res.json["hashed_fxa_uid"]
assert "hashed_device_id" in token
Loading
Loading