Source code for services.notifications.notification

from typing import TYPE_CHECKING, Any, Dict, Optional

from django.db import transaction

from api.centrifugo.connection import Centrifugo
from apps.notifications.constants import NotificationTypeChoices
from apps.notifications.models import Notification

if TYPE_CHECKING:
    from apps.posts.models import Post
    from apps.users.models import User


[docs] class NotificationService: """Notification Service""" @classmethod def _create(cls, type: str, data: Dict[str, Any]) -> Notification: return Notification.objects.create(type=type, **data)
[docs] @classmethod @transaction.atomic def create( cls, sender: "User", receiver: "User", post: Optional["Post"], data: Optional[Dict[str, Any]], type: str ) -> None: if sender != receiver or type == NotificationTypeChoices.PUBLISH: payload = {"sender": sender, "receiver": receiver, "post": post, "data": data} cls._create(type, payload) Centrifugo().send_ws_notification( receiver.id, Notification.objects.filter(receiver=receiver, is_read=False).count() )
[docs] @classmethod def send_notification(cls, receiver: "User") -> None: Centrifugo().send_ws_notification( receiver.id, Notification.objects.filter(receiver=receiver, is_read=False).count() )
[docs] @classmethod def mark_as_read_notification(cls, user: "User") -> None: Notification.objects.filter(receiver=user).update(is_read=True) cls.send_notification(user)
[docs] @classmethod def send_like_notification( cls, sender: "User", receiver: "User", post: "Post", ) -> None: """Send like notification""" cls.create(sender=sender, receiver=receiver, post=post, data=None, type=NotificationTypeChoices.LIKE)
[docs] @classmethod def send_follow_notification(cls, sender: "User", receiver: "User") -> None: """Send follow notification""" cls.create(sender=sender, receiver=receiver, post=None, data=None, type=NotificationTypeChoices.FOLLOW)
[docs] @classmethod def send_comment_notification(cls, sender: "User", receiver: "User", post: "Post", data: Dict[str, Any]) -> None: """Send commnet notification""" cls.create(sender=sender, receiver=receiver, post=post, data=data, type=NotificationTypeChoices.COMMENT)
[docs] @classmethod def send_publish_notification(cls, sender: "User", receiver: "User", post: "Post") -> None: """Send publish notification""" cls.create(sender=sender, receiver=receiver, post=post, data=None, type=NotificationTypeChoices.PUBLISH)
[docs] @classmethod def send_publish_followers_notification(cls, sender: "User", receiver: "User", post: "Post") -> None: """Send follower publish post notification""" cls.create( sender=sender, receiver=receiver, post=post, data=None, type=NotificationTypeChoices.PUBLISH_FOLLOWERS )