Source code for apps.users.signals

from contextlib import suppress
from typing import Dict

from django.db.models.signals import post_save, pre_delete, pre_save
from django.dispatch import receiver
from elasticsearch import NotFoundError

from apps.users.documents import UserDocument
from apps.users.models import User
from services.users import UsersService


[docs] @receiver(post_save, sender=User) def create_user(sender, instance: User, created: bool, **kwargs: Dict): """ Post signal after creation user :param sender: User model :param instance: User model :param created: check is object created :param kwargs: other params :return: None """ if created: UsersService(instance).send_activation_email(is_created=created)
[docs] @receiver(post_save, sender=User) def index_user(sender, instance, **kwargs): UserDocument().update(instance)
[docs] @receiver(pre_delete, sender=User) def delete_post(sender, instance, **kwargs): with suppress(NotFoundError): UserDocument.get(id=instance.pk).delete(ignore=404)
[docs] @receiver(pre_save, sender=User) def make_user_staff(sender, instance, **kwargs): """Assigns staff user content manager role""" try: old_user = User.objects.get(pk=instance.pk) except User.DoesNotExist: return if not old_user.is_staff and instance.is_staff: UsersService(instance).make_staff_user()