Source code for social_core.backends.open_id

from __future__ import annotations

from typing import TYPE_CHECKING, Any, cast

from openid.consumer.consumer import CANCEL, FAILURE, SUCCESS, Consumer
from openid.consumer.discover import DiscoveryFailure
from openid.extensions import ax, pape, sreg
from openid.fetchers import HTTPFetchingError

from social_core.exceptions import (
    AuthCanceled,
    AuthException,
    AuthFailed,
    AuthMissingParameter,
    AuthUnknownError,
    AuthUnreachableProvider,
)
from social_core.utils import url_add_parameters

from .base import BaseAuth

if TYPE_CHECKING:
    from social_core.store import OpenIdStore

# OpenID configuration
OLD_AX_ATTRS = [
    ("http://schema.openid.net/contact/email", "old_email"),
    ("http://schema.openid.net/namePerson", "old_fullname"),
    ("http://schema.openid.net/namePerson/friendly", "old_nickname"),
]
AX_SCHEMA_ATTRS = [
    # Request both the full name and first/last components since some
    # providers offer one but not the other.
    ("http://axschema.org/contact/email", "email"),
    ("http://axschema.org/namePerson", "fullname"),
    ("http://axschema.org/namePerson/first", "first_name"),
    ("http://axschema.org/namePerson/last", "last_name"),
    ("http://axschema.org/namePerson/friendly", "nickname"),
]
SREG_ATTR = [("email", "email"), ("fullname", "fullname"), ("nickname", "nickname")]
OPENID_ID_FIELD = "openid_identifier"
SESSION_NAME = "openid"


[docs] class OpenIdAuth(BaseAuth): """Generic OpenID authentication backend""" name = "openid" URL: str | None = None USERNAME_KEY = "username" _consumer = None
[docs] def get_user_id(self, details, response): """Return user unique id provided by service""" return response.identity_url
[docs] def get_ax_attributes(self) -> list[tuple[str, str]]: attrs = cast("list[tuple[str, str]]", self.setting("AX_SCHEMA_ATTRS", [])) if attrs and self.setting("IGNORE_DEFAULT_AX_ATTRS", True): return attrs return attrs + AX_SCHEMA_ATTRS + OLD_AX_ATTRS
[docs] def get_sreg_attributes(self): return self.setting("SREG_ATTR") or SREG_ATTR
[docs] def values_from_response(self, response, sreg_names=None, ax_names=None): """Return values from SimpleRegistration response or AttributeExchange response if present. @sreg_names and @ax_names must be a list of name and aliases for such name. The alias will be used as mapping key. """ values: dict[str, str] = {} # Use Simple Registration attributes if provided if sreg_names: resp = cast("Any", sreg.SRegResponse).fromSuccessResponse(response) if resp: values.update( (alias, resp.get(name) or "") for name, alias in sreg_names ) # Use Attribute Exchange attributes if provided if ax_names: resp = cast("Any", ax.FetchResponse).fromSuccessResponse(response) if resp: for src, alias in ax_names: name = alias.replace("old_", "") values[name] = cast( "str | None", resp.getSingle(src, "") ) or values.get(name, "") return values
[docs] def get_user_details(self, response): """Return user details from an OpenID request""" values = { "username": "", "email": "", "fullname": "", "first_name": "", "last_name": "", } # update values using SimpleRegistration or AttributeExchange # values values.update( self.values_from_response( response, self.get_sreg_attributes(), self.get_ax_attributes() ) ) fullname = values.get("fullname") or "" first_name = values.get("first_name") or "" last_name = values.get("last_name") or "" email = values.get("email") or "" if not fullname and first_name and last_name: fullname = f"{first_name} {last_name}" elif fullname: try: first_name, last_name = fullname.rsplit(" ", 1) except ValueError: last_name = fullname username_key = cast("str", self.setting("USERNAME_KEY") or self.USERNAME_KEY) values.update( { "fullname": fullname, "first_name": first_name, "last_name": last_name, "username": values.get(username_key) or (first_name.title() + last_name.title()), "email": email, } ) return values
[docs] def extra_data( self, user, uid: str, response: dict[str, Any], details: dict[str, Any], pipeline_kwargs: dict[str, Any], ) -> dict[str, Any]: """Return defined extra data names to store in extra_data field. Settings will be inspected to get more values names that should be stored on extra_data field. Setting name is created from current backend name (all uppercase) plus _SREG_EXTRA_DATA and _AX_EXTRA_DATA because values can be returned by SimpleRegistration or AttributeExchange schemas. Both list must be a value name and an alias mapping similar to SREG_ATTR, OLD_AX_ATTRS or AX_SCHEMA_ATTRS """ sreg_names = self.setting("SREG_EXTRA_DATA") ax_names = self.setting("AX_EXTRA_DATA") values = self.values_from_response(response, sreg_names, ax_names) from_details = super().extra_data(user, uid, {}, details, pipeline_kwargs) values.update(from_details) return values
[docs] def get_return_to(self) -> str: params: dict[str, str] = {} if session_id := self.strategy.get_session_id(): params[self.strategy.SESSION_SAVE_KEY] = session_id return url_add_parameters(self.strategy.absolute_uri(self.redirect_uri), params)
[docs] def auth_url(self): """Return auth URL returned by service""" openid_request = self.setup_request(self.auth_extra_arguments()) # Construct completion URL, including page we should redirect to return openid_request.redirectURL(self.trust_root(), self.get_return_to())
[docs] def auth_html(self): """Return auth HTML returned by service""" openid_request = self.setup_request(self.auth_extra_arguments()) form_tag = {"id": "openid_message"} return openid_request.htmlMarkup( self.trust_root(), self.get_return_to(), form_tag_attrs=form_tag )
[docs] def trust_root(self): """Return trust-root option""" return self.setting("OPENID_TRUST_ROOT") or self.strategy.absolute_uri("/")
[docs] def continue_pipeline(self, partial): """Continue previous halted pipeline""" response = self.consumer().complete( dict(self.data.items()), self.get_return_to() ) return self.strategy.authenticate( self, *partial.args, response=response, pipeline_index=partial.next_step, **partial.kwargs, )
[docs] def auth_complete(self, *args, **kwargs): """Complete auth process""" try: response = self.consumer().complete( dict(self.data.items()), self.get_return_to() ) except HTTPFetchingError as error: raise AuthUnreachableProvider(self) from error self.process_error(response) if session_id := self.data.get(self.strategy.SESSION_SAVE_KEY): self.strategy.restore_session(session_id, kwargs) return self.strategy.authenticate(self, *args, response=response, **kwargs)
[docs] def process_error(self, data) -> None: if not data: raise AuthException(self, "OpenID relying party endpoint") if data.status == FAILURE: raise AuthFailed(self, data.message) if data.status == CANCEL: raise AuthCanceled(self) if data.status != SUCCESS: raise AuthUnknownError(self, data.status)
[docs] def setup_request(self, params=None): """Setup request""" request = self.openid_request(params) # Request some user details. Use attribute exchange if provider # advertises support. if request.endpoint.supportsType(ax.AXMessage.ns_uri): fetch_request = ax.FetchRequest() # Mark all attributes as required, Google ignores optional ones for attr, alias in self.get_ax_attributes(): fetch_request.add(ax.AttrInfo(attr, alias=alias, required=True)) else: fetch_request = sreg.SRegRequest( optional=list(dict(self.get_sreg_attributes()).keys()) ) request.addExtension(fetch_request) # Add PAPE Extension for if configured preferred_policies = self.setting("OPENID_PAPE_PREFERRED_AUTH_POLICIES") preferred_level_types = self.setting("OPENID_PAPE_PREFERRED_AUTH_LEVEL_TYPES") max_age = self.setting("OPENID_PAPE_MAX_AUTH_AGE") if max_age is not None: try: max_age = int(max_age) except (ValueError, TypeError): max_age = None if max_age is not None or preferred_policies or preferred_level_types: pape_request = pape.Request( max_auth_age=max_age, preferred_auth_policies=preferred_policies, preferred_auth_level_types=preferred_level_types, ) request.addExtension(pape_request) return request
[docs] def get_consumer_store(self) -> OpenIdStore | None: return self.strategy.openid_store()
[docs] def consumer(self): """Create an OpenID Consumer object for the given Django request.""" if self._consumer is None: self._consumer = self.create_consumer(self.get_consumer_store()) return self._consumer
[docs] def create_consumer(self, store=None): return Consumer(self.strategy.openid_session_dict(SESSION_NAME), store)
[docs] def uses_redirect(self): """Return true if openid request will be handled with redirect or HTML content will be returned. """ return self.openid_request().shouldSendRedirect()
[docs] def openid_request(self, params: dict[str, str] | None = None): """Return openid request""" try: return self.consumer().begin(url_add_parameters(self.openid_url(), params)) except DiscoveryFailure as err: raise AuthException(self, f"OpenID discovery error: {err}") from err
[docs] def openid_url(self): """Return service provider URL. This base class is generic accepting a POST parameter that specifies provider URL.""" if self.URL: return self.URL if OPENID_ID_FIELD in self.data: return self.data[OPENID_ID_FIELD] raise AuthMissingParameter(self, OPENID_ID_FIELD)