from typing import TYPE_CHECKING, Any, Dict, Optional
from django.db import transaction
from api.centrifugo.connection import Centrifugo
from apps.notifications.constants import NotificationTypeChoices
from apps.notifications.models import Notification
if TYPE_CHECKING:
from apps.posts.models import Post
from apps.users.models import User
[docs]
class NotificationService:
"""Notification Service"""
@classmethod
def _create(cls, type: str, data: Dict[str, Any]) -> Notification:
return Notification.objects.create(type=type, **data)
[docs]
@classmethod
@transaction.atomic
def create(
cls, sender: "User", receiver: "User", post: Optional["Post"], data: Optional[Dict[str, Any]], type: str
) -> None:
if sender != receiver or type == NotificationTypeChoices.PUBLISH:
payload = {"sender": sender, "receiver": receiver, "post": post, "data": data}
cls._create(type, payload)
Centrifugo().send_ws_notification(
receiver.id, Notification.objects.filter(receiver=receiver, is_read=False).count()
)
[docs]
@classmethod
def send_notification(cls, receiver: "User") -> None:
Centrifugo().send_ws_notification(
receiver.id, Notification.objects.filter(receiver=receiver, is_read=False).count()
)
[docs]
@classmethod
def mark_as_read_notification(cls, user: "User") -> None:
Notification.objects.filter(receiver=user).update(is_read=True)
cls.send_notification(user)
[docs]
@classmethod
def send_like_notification(
cls,
sender: "User",
receiver: "User",
post: "Post",
) -> None:
"""Send like notification"""
cls.create(sender=sender, receiver=receiver, post=post, data=None, type=NotificationTypeChoices.LIKE)
[docs]
@classmethod
def send_follow_notification(cls, sender: "User", receiver: "User") -> None:
"""Send follow notification"""
cls.create(sender=sender, receiver=receiver, post=None, data=None, type=NotificationTypeChoices.FOLLOW)
[docs]
@classmethod
def send_publish_notification(cls, sender: "User", receiver: "User", post: "Post") -> None:
"""Send publish notification"""
cls.create(sender=sender, receiver=receiver, post=post, data=None, type=NotificationTypeChoices.PUBLISH)
[docs]
@classmethod
def send_publish_followers_notification(cls, sender: "User", receiver: "User", post: "Post") -> None:
"""Send follower publish post notification"""
cls.create(
sender=sender, receiver=receiver, post=post, data=None, type=NotificationTypeChoices.PUBLISH_FOLLOWERS
)