import re
from pathlib import Path
from typing import Any, Dict, Optional
from django.conf import settings
from django.core.exceptions import ValidationError as DjangoValidationError
from django.core.validators import FileExtensionValidator
from django.utils.translation import gettext_lazy as _
from moviepy.editor import VideoFileClip
from rest_framework import serializers
from rest_framework.fields import get_error_detail
from api.posts.exceptions import (
CategoriesDontExistAPIException,
CommentLimitImageAPIException,
PostUnavailableForCommentingAPIException,
TooManyCategoriesExceptionAPIException,
)
from api.users.serializers import AuthorCommentSerializer, CategorySerializer, UserSerializer
from apps.posts.constants import ContentTypeChoices
from apps.posts.models import Comment, CommentImage, Favorite, Post, PostMedia
from apps.posts.validators import (
FileSizeValidator,
ImageResolutionValidator,
VideoDurationValidator,
VideoResolutionValidator,
)
from apps.users.models import Category
from common.serializers import ListUploadMediaField
from services.posts import PostService
from services.posts.comment import CommentService
from services.posts.exceptions import (
CategoriesDontExistException,
CommentLimitImageException,
PostUnavailableForCommentingException,
TooManyCategoriesException,
)
[docs]
class PostMediaSerializer(serializers.ModelSerializer):
class Meta:
model = PostMedia
fields = "__all__"
extra_kwargs = {"post": {"read_only": True}}
[docs]
class PostSerializer(serializers.ModelSerializer, MediaSerializer):
author = UserSerializer(read_only=True)
tags: serializers.SlugRelatedField = serializers.SlugRelatedField(
slug_field="name",
many=True,
read_only=True,
)
media_list = PostMediaSerializer(read_only=True, source="linked_media", many=True)
category = serializers.PrimaryKeyRelatedField(queryset=Category.objects.all(), required=True, write_only=True)
categories = CategorySerializer(read_only=True, many=True)
is_liked = serializers.BooleanField(read_only=True)
is_favorite = serializers.BooleanField(read_only=True)
likes_quantity = serializers.IntegerField(read_only=True, default=0)
comments_quantity = serializers.IntegerField(read_only=True, default=0)
[docs]
def to_representation(self, instance):
is_current_user_following = getattr(instance, "is_current_user_following", False)
setattr(instance.author, "is_current_user_following", is_current_user_following) # noqa: B010
return super().to_representation(instance)
[docs]
def to_internal_value(self, data):
validated_data = super().to_internal_value(data)
hashtags = re.findall(r"#[\w_]+", validated_data.get("title", ""))
for tag in hashtags:
validated_data["title"] = validated_data["title"].replace(tag, "", 1).strip()
tags = [tag[1:] for tag in hashtags]
validated_data["tags"] = tags
return validated_data
[docs]
def update(self, instance, validated_data):
try:
updated_instance = PostService(instance).update(validated_data)
except CategoriesDontExistException:
raise CategoriesDontExistAPIException
except TooManyCategoriesException:
raise TooManyCategoriesExceptionAPIException
return updated_instance
class Meta:
model = Post
fields = "__all__"
extra_kwargs = {"author": {"read_only": True}}
[docs]
class FavoriteSerializer(serializers.ModelSerializer):
post = PostSerializer()
class Meta:
model = Favorite
fields = "__all__"
[docs]
def to_representation(self, instance):
is_current_user_following = getattr(instance, "is_current_user_following", False)
setattr(instance.post, "is_current_user_following", is_current_user_following) # noqa: B010
return super().to_representation(instance)
[docs]
class PostMediaContentSerializer(serializers.ModelSerializer):
"""Test post media serializer"""
content_type: str
class Meta:
model = PostMedia
fields = ["original"]
default_error_messages = {
"unsupported_extension": _("Unsupported extension format."),
}
image_validators = [
FileSizeValidator(settings.IMAGE_MAX_SIZE),
ImageResolutionValidator(settings.IMAGE_MIN_RESOLUTION_WIDTH, settings.IMAGE_MIN_RESOLUTION_HEIGHT),
]
video_validators = [FileSizeValidator(settings.VIDEO_MAX_SIZE)]
clip_validators = [
VideoResolutionValidator(
settings.VIDEO_HORIZONTAL_MIN_RESOLUTION_WIDTH,
settings.VIDEO_HORIZONTAL_MIN_RESOLUTION_HEIGHT,
settings.VIDEO_HORIZONTAL_MAX_RESOLUTION_WIDTH,
min_width_vertical=settings.VIDEO_VERTICAL_MIN_RESOLUTION_WIDTH,
min_height_vertical=settings.VIDEO_VERTICAL_MIN_RESOLUTION_HEIGHT,
max_width_vertical=settings.VIDEO_VERTICAL_MAX_RESOLUTION_HEIGHT,
),
VideoDurationValidator(settings.VIDEO_MAX_TIMELINE),
]
[docs]
def get_type_of_content(self, file) -> str:
extension = Path(file.name).suffix[1:].lower()
if extension in settings.IMAGE_ALLOWED_FORMAT: # Image format
return ContentTypeChoices.IMAGE
elif extension in settings.VIDEO_ALLOWED_FORMAT:
return ContentTypeChoices.VIDEO
else:
raise serializers.ValidationError(
self.default_error_messages["unsupported_extension"], code="unsupported_extension"
)
[docs]
def validate_original(self, value) -> Dict[str, Any]:
errors = []
self.content_type = self.get_type_of_content(value)
if self.content_type == ContentTypeChoices.IMAGE:
for validators in self.image_validators:
try:
validators(value) # type: ignore
except serializers.ValidationError as exc:
errors.append(exc.detail[0]) # type: ignore
except DjangoValidationError as exc:
errors.append(get_error_detail(exc)[0])
else:
for validators in self.video_validators:
try:
validators(value)
except serializers.ValidationError as exc:
errors.append(exc.detail[0]) # type: ignore
except DjangoValidationError as exc:
errors.append(get_error_detail(exc)[0])
with VideoFileClip(value.temporary_file_path()) as clip:
for validators in self.clip_validators: # TODO: Fix me in the next sprint
try:
validators(clip) # type: ignore
except serializers.ValidationError as exc:
errors.append(exc.detail[0]) # type: ignore
except DjangoValidationError as exc:
errors.append(get_error_detail(exc)[0])
if errors:
raise serializers.ValidationError(errors)
return value
[docs]
def validate(self, data: Dict[str, Any]) -> Dict[str, Any]:
data["type"] = self.content_type
return data
[docs]
class PostContentsSerializer(serializers.Serializer):
content = ListUploadMediaField(child=PostMediaContentSerializer())
default_errors_messages = {
"content_video_invalid": _("Unable to upload more than 1 video."),
"mix_content_error": _("Unable to upload video and image together."),
"max_content_items": _("Content items cannot be bigger than %s items.") % settings.MAX_CONTENT_ITEMS,
}
[docs]
def validate(self, data: Dict[str, Any]):
content_types = [content["type"] for content in data["content"]]
if ContentTypeChoices.IMAGE in content_types and ContentTypeChoices.VIDEO in content_types:
raise serializers.ValidationError(
{"content": self.default_errors_messages["mix_content_error"]}, "mix_content_error"
)
elif ContentTypeChoices.VIDEO in content_types and len(content_types) > 1:
raise serializers.ValidationError(
{"content": self.default_errors_messages["content_video_invalid"]}, "content_video_invalid"
)
elif len(content_types) > settings.MAX_CONTENT_ITEMS:
raise serializers.ValidationError(
{"content": self.default_errors_messages["max_content_items"]}, "max_content_items"
)
return data
[docs]
class WebhookPostSerializer(serializers.Serializer):
"""Webhook post serializer"""
post = serializers.PrimaryKeyRelatedField(queryset=Post.objects.all(), required=True)
post_media = serializers.PrimaryKeyRelatedField(queryset=PostMedia.objects.all(), required=True)
upload_video_path = serializers.CharField(required=True)
upload_preview_path = serializers.CharField(required=True)
[docs]
@staticmethod
def formatted_path(value: str):
return value.split("media/")[1]
[docs]
def validate_upload_video_path(self, value: str):
return self.formatted_path(value)
[docs]
def validate_upload_preview_path(self, value: str):
return self.formatted_path(value)
[docs]
class PostNotificationSerializer(serializers.ModelSerializer):
"""Post notification serializer"""
preview = serializers.SerializerMethodField()
class Meta:
model = Post
fields = ("id", "preview")
[docs]
@staticmethod
def get_preview(obj: "Post") -> Optional[str]:
media = obj.linked_media.values("preview_path").first()
if media:
return media["preview_path"] # noqa
return None