44from functools import wraps
55import json
66from os import environ as env
7+ from typing import Dict
8+
79from six .moves .urllib .request import urlopen
810
911from dotenv import load_dotenv , find_dotenv
10- from flask import Flask , request , jsonify , _request_ctx_stack
12+ from flask import Flask , request , jsonify , _request_ctx_stack , Response
1113from flask_cors import cross_origin
1214from jose import jwt
1315
@@ -25,14 +27,15 @@ class AuthError(Exception):
2527 """
2628 An AuthError is raised whenever the authentication failed.
2729 """
30+ def __init__ (self , error : Dict [str , str ], status_code : int ):
2831 def __init__ (self , error , status_code ):
2932 super ().__init__ ()
3033 self .error = error
3134 self .status_code = status_code
3235
3336
3437@APP .errorhandler (AuthError )
35- def handle_auth_error (ex ) :
38+ def handle_auth_error (ex : AuthError ) -> Response :
3639 """
3740 serializes the given AuthError as json and sets the response status code accordingly.
3841 :param ex: an auth error
@@ -43,14 +46,14 @@ def handle_auth_error(ex):
4346 return response
4447
4548
46- def get_token_auth_header ():
49+ def get_token_auth_header () -> str :
4750 """Obtains the access token from the Authorization Header
4851 """
4952 auth = request .headers .get ("Authorization" , None )
5053 if not auth :
5154 raise AuthError ({"code" : "authorization_header_missing" ,
52- "description" :
53- "Authorization header is expected" }, 401 )
55+ "description" :
56+ "Authorization header is expected" }, 401 )
5457
5558 parts = auth .split ()
5659
@@ -64,15 +67,15 @@ def get_token_auth_header():
6467 "description" : "Token not found" }, 401 )
6568 if len (parts ) > 2 :
6669 raise AuthError ({"code" : "invalid_header" ,
67- "description" :
68- "Authorization header must be"
69- " Bearer token" }, 401 )
70+ "description" :
71+ "Authorization header must be"
72+ " Bearer token" }, 401 )
7073
7174 token = parts [1 ]
7275 return token
7376
7477
75- def requires_scope (required_scope ) :
78+ def requires_scope (required_scope : str ) -> bool :
7679 """Determines if the required scope is present in the access token
7780 Args:
7881 required_scope (str): The scope required to access the resource
@@ -90,10 +93,11 @@ def requires_scope(required_scope):
9093def requires_auth (func ):
9194 """Determines if the access token is valid
9295 """
96+
9397 @wraps (func )
9498 def decorated (* args , ** kwargs ):
9599 token = get_token_auth_header ()
96- jsonurl = urlopen ("https://" + AUTH0_DOMAIN + "/.well-known/jwks.json" )
100+ jsonurl = urlopen ("https://" + AUTH0_DOMAIN + "/.well-known/jwks.json" )
97101 jwks = json .loads (jsonurl .read ())
98102 try :
99103 unverified_header = jwt .get_unverified_header (token )
@@ -104,9 +108,9 @@ def decorated(*args, **kwargs):
104108 "Use an RS256 signed JWT Access Token" }, 401 ) from jwt_error
105109 if unverified_header ["alg" ] == "HS256" :
106110 raise AuthError ({"code" : "invalid_header" ,
107- "description" :
108- "Invalid header. "
109- "Use an RS256 signed JWT Access Token" }, 401 )
111+ "description" :
112+ "Invalid header. "
113+ "Use an RS256 signed JWT Access Token" }, 401 )
110114 rsa_key = {}
111115 for key in jwks ["keys" ]:
112116 if key ["kid" ] == unverified_header ["kid" ]:
@@ -124,7 +128,7 @@ def decorated(*args, **kwargs):
124128 rsa_key ,
125129 algorithms = ALGORITHMS ,
126130 audience = API_IDENTIFIER ,
127- issuer = "https://" + AUTH0_DOMAIN + "/"
131+ issuer = "https://" + AUTH0_DOMAIN + "/"
128132 )
129133 except jwt .ExpiredSignatureError as expired_sign_error :
130134 raise AuthError ({"code" : "token_expired" ,
@@ -143,7 +147,8 @@ def decorated(*args, **kwargs):
143147 _request_ctx_stack .top .current_user = payload
144148 return func (* args , ** kwargs )
145149 raise AuthError ({"code" : "invalid_header" ,
146- "description" : "Unable to find appropriate key" }, 401 )
150+ "description" : "Unable to find appropriate key" }, 401 )
151+
147152 return decorated
148153
149154
0 commit comments