Source code for social_core.pipeline.user

from __future__ import annotations

from typing import TYPE_CHECKING, cast
from uuid import uuid4

from social_core.exceptions import (
    StrategyMissingBackendError,
)
from social_core.utils import module_member, slugify

if TYPE_CHECKING:
    from social_core.backends.base import BaseAuth
    from social_core.storage import UserProtocol
    from social_core.strategy import BaseStrategy

USER_FIELDS = ["username", "email"]


[docs] def get_username( strategy: BaseStrategy, details, backend: BaseAuth, user: UserProtocol | None = None, *args, **kwargs, ): if strategy.storage is None: raise StrategyMissingBackendError if "username" not in cast("set[str]", backend.setting("USER_FIELDS", USER_FIELDS)): return None storage = strategy.storage if not user: email_as_username = backend.setting("USERNAME_IS_FULL_EMAIL", False) uuid_length = cast("int", backend.setting("UUID_LENGTH", 16)) max_length = storage.user.username_max_length() do_slugify = backend.setting("SLUGIFY_USERNAMES", False) do_clean = backend.setting("CLEAN_USERNAMES", True) def identity_func(val): return val if do_clean: override_clean = backend.setting("CLEAN_USERNAME_FUNCTION") if override_clean: clean_func = module_member(override_clean) else: clean_func = storage.user.clean_username else: clean_func = identity_func if do_slugify: override_slug = backend.setting("SLUGIFY_FUNCTION") slug_func = module_member(override_slug) if override_slug else slugify else: slug_func = identity_func if email_as_username and details.get("email"): username = details["email"] elif details.get("username"): username = details["username"] else: username = uuid4().hex short_username = ( username[: max_length - uuid_length] if max_length is not None else username ) final_username = slug_func(clean_func(username[:max_length])) # Generate a unique username for current user using username # as base but adding a unique hash at the end. Original # username is cut to avoid any field max_length. # The final_username may be empty and will skip the loop. while not final_username or storage.user.user_exists(username=final_username): username = short_username + uuid4().hex[:uuid_length] final_username = slug_func(clean_func(username[:max_length])) else: final_username = storage.user.get_username(user) return {"username": final_username}
[docs] def create_user( strategy: BaseStrategy, details, backend: BaseAuth, user: UserProtocol | None = None, *args, **kwargs, ): if user: return {"is_new": False} fields = { name: kwargs.get(name, details.get(name)) for name in cast("list[str]", backend.setting("USER_FIELDS", USER_FIELDS)) } if not fields: return None # Allow overriding the email field if desired by application specification if backend.setting("FORCE_EMAIL_LOWERCASE", False): emailfield = fields.get("email") if emailfield: fields["email"] = emailfield.lower() return {"is_new": True, "user": strategy.create_user(**fields)}
[docs] def user_details( strategy: BaseStrategy, details, backend: BaseAuth | None, user: UserProtocol | None = None, *args, **kwargs, ) -> None: """Update user details using data from provider.""" if strategy.storage is None: raise StrategyMissingBackendError if not user: return changed = False # flag to track changes # Default protected user fields (username, id, pk and email) can be ignored # by setting the SOCIAL_AUTH_NO_DEFAULT_PROTECTED_USER_FIELDS to True protected: tuple[str, ...] if strategy.setting("NO_DEFAULT_PROTECTED_USER_FIELDS", backend=backend) is True: protected = () else: protected = ( "username", "id", "pk", "email", "password", "is_active", "is_staff", "is_superuser", ) protected = protected + tuple( cast( "list[str]", strategy.setting("PROTECTED_USER_FIELDS", [], backend=backend) ) ) # Update user model attributes with the new data sent by the current # provider. Update on some attributes is disabled by default, for # example username and id fields. It's also possible to disable update # on fields defined in SOCIAL_AUTH_PROTECTED_USER_FIELDS. field_mapping = strategy.setting("USER_FIELD_MAPPING", {}, backend=backend) for name, value in details.items(): # Convert to existing user field if mapping exists name = field_mapping.get(name, name) if value is None or not hasattr(user, name) or name in protected: continue current_value = getattr(user, name, None) if current_value == value: continue immutable_fields = tuple( cast( "list[str]", strategy.setting("IMMUTABLE_USER_FIELDS", [], backend=backend), ) ) if name in immutable_fields and current_value: continue changed = True setattr(user, name, value) if changed: strategy.storage.user.changed(user)