Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add first set of functions for protected web API scenario #16

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions identity/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from jose import jwt

from identity.exceptions import AuthenticationError
from identity.helpers import RequestsHelper

class Auth:

def __init__(self):
pass

def get_token(self, request, keyword="bearer"):
# Retrieve token from authorization header.
# Microsoft Entra gives bearer tokens. Keyword by default is bearet but
# can a dveloper want to have a custome keyword e.g in Django by default its Token

auth_header = request.headers.get('Authorization')

if not auth_header:
raise AuthenticationError(
{
"code": "missing_authentication_header",
"details": "Expected authorization header but couldn't find it."
},
401
)

auth_header_sections = auth_header.split()
auth_header_length = len(auth_header_sections)

if auth_header_length <= 1 or auth_header_length > 2:
raise AuthenticationError(
{
"code": "invalid_header",
"details": f"Authorization header is malformed."
},
401
)

if auth_header_sections[0].lower() != keyword.lower():
raise AuthenticationError(
{
"code": "invalid_header",
"details": f"Authorization header must start with {keyword}"
},
401
)

return auth_header_sections[1]


def is_valid_aud(self, aud, app_id_uris):
# app_id_uris is an array. This is beause of the possibility of an app registration having multiple
# appid URIS https://learn.microsoft.com/entra/identity-platform/reference-app-manifest#identifieruris-attribute
return aud in app_id_uris

def is_valid_issuer(self, iss, tenant_id, multitenant=False):
# Validating the issuer may require an exact match or a pattern match
# For multitenant apps, you'll need a pattern match as the tenant id varies
# https://learn.microsoft.com/en-us/entra/identity-platform/access-tokens
# multi tenant apps dont need to have tenant_id. Do we need to make
# tenant_id optional?

if multitenant:
return iss.startswith("https://login.microsoftonline.com/") and iss.endswith("/v2")
return iss == f"https://login.microsoftonline.com/{tenant_id}/v2"

def get_rsa_key(self, authority, token):

key_url = f"{authority}/discovery/v2.0/keys"
jwks = RequestsHelper.get_discovery_key_session().get(key_url).json()
unverified_header = jwt.get_unverified_header(token)

try:
rsa_key = {}

for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
except Exception as exc:
# Is there possibility of an exceotion being raised here?
# adding a try except incase something breaks
raise AuthenticationError(
{
"code": "invalid_header",
"detailsn":"Unable to generate rsa key"
},
401
) from exc

return rsa_key

def validate_token_signing(self, authority, token):

rsa_key = self.get_rsa_key(authority, token)

if rsa_key:

# we are not checking aud and issuer as those need to be explicitly checked

try:
jwt.decode(
token,
rsa_key,
algorithms=["RS256"]
)

return {"code": "valid_signature","details": "Token singature is valid"},
except jwt.ExpiredSignatureError as jwt_expired_exc:
raise AuthenticationError(
{"code": "token_expired","details": "Token is expired"},
401
) from jwt_expired_exc
except jwt.JWTClaimsError as jwt_claims_exc:
# Only claim here now is algorithms?
raise AuthenticationError(
{"code": "invalid_claims","details":"incorrect claims. Wrong algorithm used"},
401
) from jwt_claims_exc
except Exception as exc:
raise AuthenticationError(
{"code": "invalid_header","details":"Error parsing token."},
401
) from exc

raise AuthenticationError(
{"code": "invalid_header","details": "Invalid RSA key"},
401
)
4 changes: 4 additions & 0 deletions identity/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
class AuthenticationError(Exception):
def __init__(self, error, status_code):
self.error = error
self.status_code = status_code
22 changes: 22 additions & 0 deletions identity/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from requests import Session, adapters
from urllib3.util.retry import Retry

class RequestsHelper:

_discovery_key_session = None

def __init__(self):
raise RuntimeError('Call get_discovery_key_session instead')

@staticmethod
def get_discovery_key_session():
if RequestsHelper._discovery_key_session is None:
session = Session()
retries = Retry(total=5,
backoff_factor=0.1,
status_forcelist=[ 500, 502, 503, 504 ])
session.mount('https://', adapters.HTTPAdapter(max_retries=retries))
session.mount('http://', adapters.HTTPAdapter(max_retries=retries))
RequestsHelper._discovery_key_session = session

return RequestsHelper._discovery_key_session
5 changes: 5 additions & 0 deletions identity/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def complete_log_in(self, auth_response=None):
That dict is actually the claims from an already-validated ID token.
"""
auth_flow = self._session.get(self._AUTH_FLOW, {})

if not auth_flow:
logger.warning(
"We found no prior log_in() info from current session. "
Expand All @@ -157,7 +158,9 @@ def complete_log_in(self, auth_response=None):
"or a load balancer with sticky session (a.k.a. affinity cookie)."
)
return {} # Return a no-op for this non-actionable error

cache = self._load_cache()

if auth_response: # Auth Code flow
try:
result = self._build_msal_app(
Expand All @@ -172,12 +175,14 @@ def complete_log_in(self, auth_response=None):
auth_flow,
exit_condition=lambda flow: True,
)

if "error" in result:
return result
# TODO: Reject a re-log-in with a different account?
self._save_user_into_session(result["id_token_claims"])
self._save_cache(cache)
self._session.pop(self._AUTH_FLOW, None)

return self._load_user_from_session()

def get_user(self):
Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ long_description_content_type = text/markdown
python_requires = >=3.7
install_requires =
msal>=1.16,<2
# requests>=2.0.0,<3
requests>=2.0.0,<3
python-jose >=3.3.0,<4
# importlib; python_version == "2.6"
# See also https://setuptools.readthedocs.io/en/latest/userguide/quickstart.html#dependency-management

Expand Down