from __future__ import annotations
from typing import TYPE_CHECKING, Any, cast
from openid.consumer.consumer import CANCEL, FAILURE, SUCCESS, Consumer
from openid.consumer.discover import DiscoveryFailure
from openid.extensions import ax, pape, sreg
from openid.fetchers import HTTPFetchingError
from social_core.exceptions import (
AuthCanceled,
AuthException,
AuthFailed,
AuthMissingParameter,
AuthUnknownError,
AuthUnreachableProvider,
)
from social_core.utils import url_add_parameters
from .base import BaseAuth
if TYPE_CHECKING:
from social_core.store import OpenIdStore
# OpenID configuration
OLD_AX_ATTRS = [
("http://schema.openid.net/contact/email", "old_email"),
("http://schema.openid.net/namePerson", "old_fullname"),
("http://schema.openid.net/namePerson/friendly", "old_nickname"),
]
AX_SCHEMA_ATTRS = [
# Request both the full name and first/last components since some
# providers offer one but not the other.
("http://axschema.org/contact/email", "email"),
("http://axschema.org/namePerson", "fullname"),
("http://axschema.org/namePerson/first", "first_name"),
("http://axschema.org/namePerson/last", "last_name"),
("http://axschema.org/namePerson/friendly", "nickname"),
]
SREG_ATTR = [("email", "email"), ("fullname", "fullname"), ("nickname", "nickname")]
OPENID_ID_FIELD = "openid_identifier"
SESSION_NAME = "openid"
[docs]
class OpenIdAuth(BaseAuth):
"""Generic OpenID authentication backend"""
name = "openid"
URL: str | None = None
USERNAME_KEY = "username"
_consumer = None
[docs]
def get_user_id(self, details, response):
"""Return user unique id provided by service"""
return response.identity_url
[docs]
def get_ax_attributes(self) -> list[tuple[str, str]]:
attrs = cast("list[tuple[str, str]]", self.setting("AX_SCHEMA_ATTRS", []))
if attrs and self.setting("IGNORE_DEFAULT_AX_ATTRS", True):
return attrs
return attrs + AX_SCHEMA_ATTRS + OLD_AX_ATTRS
[docs]
def get_sreg_attributes(self):
return self.setting("SREG_ATTR") or SREG_ATTR
[docs]
def values_from_response(self, response, sreg_names=None, ax_names=None):
"""Return values from SimpleRegistration response or
AttributeExchange response if present.
@sreg_names and @ax_names must be a list of name and aliases
for such name. The alias will be used as mapping key.
"""
values: dict[str, str] = {}
# Use Simple Registration attributes if provided
if sreg_names:
resp = cast("Any", sreg.SRegResponse).fromSuccessResponse(response)
if resp:
values.update(
(alias, resp.get(name) or "") for name, alias in sreg_names
)
# Use Attribute Exchange attributes if provided
if ax_names:
resp = cast("Any", ax.FetchResponse).fromSuccessResponse(response)
if resp:
for src, alias in ax_names:
name = alias.replace("old_", "")
values[name] = cast(
"str | None", resp.getSingle(src, "")
) or values.get(name, "")
return values
[docs]
def get_user_details(self, response):
"""Return user details from an OpenID request"""
values = {
"username": "",
"email": "",
"fullname": "",
"first_name": "",
"last_name": "",
}
# update values using SimpleRegistration or AttributeExchange
# values
values.update(
self.values_from_response(
response, self.get_sreg_attributes(), self.get_ax_attributes()
)
)
fullname = values.get("fullname") or ""
first_name = values.get("first_name") or ""
last_name = values.get("last_name") or ""
email = values.get("email") or ""
if not fullname and first_name and last_name:
fullname = f"{first_name} {last_name}"
elif fullname:
try:
first_name, last_name = fullname.rsplit(" ", 1)
except ValueError:
last_name = fullname
username_key = cast("str", self.setting("USERNAME_KEY") or self.USERNAME_KEY)
values.update(
{
"fullname": fullname,
"first_name": first_name,
"last_name": last_name,
"username": values.get(username_key)
or (first_name.title() + last_name.title()),
"email": email,
}
)
return values
[docs]
def get_return_to(self) -> str:
params: dict[str, str] = {}
if session_id := self.strategy.get_session_id():
params[self.strategy.SESSION_SAVE_KEY] = session_id
return url_add_parameters(self.strategy.absolute_uri(self.redirect_uri), params)
[docs]
def auth_url(self):
"""Return auth URL returned by service"""
openid_request = self.setup_request(self.auth_extra_arguments())
# Construct completion URL, including page we should redirect to
return openid_request.redirectURL(self.trust_root(), self.get_return_to())
[docs]
def auth_html(self):
"""Return auth HTML returned by service"""
openid_request = self.setup_request(self.auth_extra_arguments())
form_tag = {"id": "openid_message"}
return openid_request.htmlMarkup(
self.trust_root(), self.get_return_to(), form_tag_attrs=form_tag
)
[docs]
def trust_root(self):
"""Return trust-root option"""
return self.setting("OPENID_TRUST_ROOT") or self.strategy.absolute_uri("/")
[docs]
def continue_pipeline(self, partial):
"""Continue previous halted pipeline"""
response = self.consumer().complete(
dict(self.data.items()), self.get_return_to()
)
return self.strategy.authenticate(
self,
*partial.args,
response=response,
pipeline_index=partial.next_step,
**partial.kwargs,
)
[docs]
def auth_complete(self, *args, **kwargs):
"""Complete auth process"""
try:
response = self.consumer().complete(
dict(self.data.items()), self.get_return_to()
)
except HTTPFetchingError as error:
raise AuthUnreachableProvider(self) from error
self.process_error(response)
if session_id := self.data.get(self.strategy.SESSION_SAVE_KEY):
self.strategy.restore_session(session_id, kwargs)
return self.strategy.authenticate(self, *args, response=response, **kwargs)
[docs]
def process_error(self, data) -> None:
if not data:
raise AuthException(self, "OpenID relying party endpoint")
if data.status == FAILURE:
raise AuthFailed(self, data.message)
if data.status == CANCEL:
raise AuthCanceled(self)
if data.status != SUCCESS:
raise AuthUnknownError(self, data.status)
[docs]
def setup_request(self, params=None):
"""Setup request"""
request = self.openid_request(params)
# Request some user details. Use attribute exchange if provider
# advertises support.
if request.endpoint.supportsType(ax.AXMessage.ns_uri):
fetch_request = ax.FetchRequest()
# Mark all attributes as required, Google ignores optional ones
for attr, alias in self.get_ax_attributes():
fetch_request.add(ax.AttrInfo(attr, alias=alias, required=True))
else:
fetch_request = sreg.SRegRequest(
optional=list(dict(self.get_sreg_attributes()).keys())
)
request.addExtension(fetch_request)
# Add PAPE Extension for if configured
preferred_policies = self.setting("OPENID_PAPE_PREFERRED_AUTH_POLICIES")
preferred_level_types = self.setting("OPENID_PAPE_PREFERRED_AUTH_LEVEL_TYPES")
max_age = self.setting("OPENID_PAPE_MAX_AUTH_AGE")
if max_age is not None:
try:
max_age = int(max_age)
except (ValueError, TypeError):
max_age = None
if max_age is not None or preferred_policies or preferred_level_types:
pape_request = pape.Request(
max_auth_age=max_age,
preferred_auth_policies=preferred_policies,
preferred_auth_level_types=preferred_level_types,
)
request.addExtension(pape_request)
return request
[docs]
def get_consumer_store(self) -> OpenIdStore | None:
return self.strategy.openid_store()
[docs]
def consumer(self):
"""Create an OpenID Consumer object for the given Django request."""
if self._consumer is None:
self._consumer = self.create_consumer(self.get_consumer_store())
return self._consumer
[docs]
def create_consumer(self, store=None):
return Consumer(self.strategy.openid_session_dict(SESSION_NAME), store)
[docs]
def uses_redirect(self):
"""Return true if openid request will be handled with redirect or
HTML content will be returned.
"""
return self.openid_request().shouldSendRedirect()
[docs]
def openid_request(self, params: dict[str, str] | None = None):
"""Return openid request"""
try:
return self.consumer().begin(url_add_parameters(self.openid_url(), params))
except DiscoveryFailure as err:
raise AuthException(self, f"OpenID discovery error: {err}") from err
[docs]
def openid_url(self):
"""Return service provider URL.
This base class is generic accepting a POST parameter that specifies
provider URL."""
if self.URL:
return self.URL
if OPENID_ID_FIELD in self.data:
return self.data[OPENID_ID_FIELD]
raise AuthMissingParameter(self, OPENID_ID_FIELD)