Source code for social_core.pipeline.social_auth

from __future__ import annotations

from typing import TYPE_CHECKING

from social_core.exceptions import AuthAlreadyAssociated, AuthException, AuthForbidden

if TYPE_CHECKING:
    from social_core.backends.base import BaseAuth
    from social_core.storage import UserProtocol


[docs] def social_details(backend: BaseAuth, details, response, *args, **kwargs): return {"details": dict(backend.get_user_details(response), **details)}
[docs] def social_uid(backend: BaseAuth, details, response, *args, **kwargs): return {"uid": str(backend.get_user_id(details, response))}
[docs] def auth_allowed(backend: BaseAuth, details, response, *args, **kwargs) -> None: if not backend.auth_allowed(response, details): raise AuthForbidden(backend)
[docs] def social_user( backend: BaseAuth, uid, user: UserProtocol | None = None, *args, **kwargs ): provider = backend.name social = backend.strategy.storage.user.get_social_auth(provider, uid) if social: if user and social.user != user: raise AuthAlreadyAssociated(backend) if not user: user = social.user return { "social": social, "user": user, "is_new": user is None, "new_association": social is None, }
[docs] def associate_user( backend: BaseAuth, uid, user: UserProtocol | None = None, social=None, *args, **kwargs, ): if user and not social: try: social = backend.strategy.storage.user.create_social_auth( user, uid, backend.name ) # pylint: disable-next=broad-exception-caught except Exception as err: if not backend.strategy.storage.is_integrity_error(err): raise # Protect for possible race condition, those bastard with FTL # clicking capabilities, check issue #131: # https://github.com/omab/django-social-auth/issues/131 result = social_user(backend, uid, user, *args, **kwargs) # Check if matching social auth really exists. In case it does # not, the integrity error probably had different cause than # existing entry and should not be hidden. if not result["social"]: raise return result return {"social": social, "user": social.user, "new_association": True} return None
[docs] def associate_by_email( backend: BaseAuth, details, user: UserProtocol | None = None, *args, **kwargs ): """ Associate current auth with a user with the same email address in the DB. This pipeline entry is not 100% secure unless you know that the providers enabled enforce email verification on their side, otherwise a user can attempt to take over another user account by using the same (not validated) email address on some provider. This pipeline entry is disabled by default. """ if user: return None email = details.get("email") if email: # Try to associate accounts registered with the same email address, # only if it's a single object. AuthException is raised if multiple # objects are returned. users = list(backend.strategy.storage.user.get_users_by_email(email)) if len(users) == 0: return None if len(users) > 1: raise AuthException( backend, "The given email address is associated with another account" ) return {"user": users[0], "is_new": False} return None
[docs] def load_extra_data( backend: BaseAuth, details, response, uid, user: UserProtocol | None = None, *args, **kwargs, ) -> None: social = kwargs.get("social") or backend.strategy.storage.user.get_social_auth( backend.name, uid ) if social: extra_data = backend.extra_data(user, uid, response, details, kwargs) social.set_extra_data(extra_data)