Source code for apps.users.signals
from contextlib import suppress
from typing import Dict
from django.db.models.signals import post_save, pre_delete, pre_save
from django.dispatch import receiver
from elasticsearch import NotFoundError
from apps.users.documents import UserDocument
from apps.users.models import User
from services.users import UsersService
[docs]
@receiver(post_save, sender=User)
def create_user(sender, instance: User, created: bool, **kwargs: Dict):
"""
Post signal after creation user
:param sender: User model
:param instance: User model
:param created: check is object created
:param kwargs: other params
:return: None
"""
if created:
UsersService(instance).send_activation_email(is_created=created)
[docs]
@receiver(post_save, sender=User)
def index_user(sender, instance, **kwargs):
UserDocument().update(instance)
[docs]
@receiver(pre_delete, sender=User)
def delete_post(sender, instance, **kwargs):
with suppress(NotFoundError):
UserDocument.get(id=instance.pk).delete(ignore=404)
[docs]
@receiver(pre_save, sender=User)
def make_user_staff(sender, instance, **kwargs):
"""Assigns staff user content manager role"""
try:
old_user = User.objects.get(pk=instance.pk)
except User.DoesNotExist:
return
if not old_user.is_staff and instance.is_staff:
UsersService(instance).make_staff_user()