import os
import unittest
import pytest
from django.conf import settings
from django.core.files import File
from django.test import TestCase
from PIL import Image
from apps.posts.constants import ContentTypeChoices, StatusPostChoice
from apps.posts.models import Favorite, Post, PostMedia
from core.settings import POSTS_CONFIGS
from services.posts import (
AlreadyLikedPostException,
FavoriteService,
LikeService,
PostCategoryService,
PostCreationService,
PostMediaService,
PostService,
UserNotLikeThisPostException,
)
from services.posts.comment import CommentService
from services.posts.exceptions import (
AlreadyAddedToFavoritesException,
CategoriesDontExistException,
CommentLimitImageException,
PostUnavailableForCommentingException,
TooManyCategoriesException,
UserNotHasThisPostInFavoritesException,
)
from tests.posts.factories import CommentFactory, ImageCommentFactory, LikeFactory, PostFactory, PostMediaFactory
from tests.users.factories import CategoryFactory, UserFactory # type: ignore
[docs]
class PostServiceTestCase(TestCase):
[docs]
def setUp(self) -> None:
self.post = PostFactory(is_deleted=False)
self.service = PostService
self.user = UserFactory()
[docs]
def test_post_delete(self):
PostService(self.post).delete()
self.post.refresh_from_db()
self.assertTrue(self.post.is_deleted)
[docs]
def test_post_like_exception(self):
post = PostFactory()
user = UserFactory()
LikeFactory(post=post, author=user)
with self.assertRaises(AlreadyLikedPostException):
PostService(post).like(user)
[docs]
def test_post_unlike_exception(self):
post = PostFactory()
user = UserFactory()
with self.assertRaises(UserNotLikeThisPostException):
PostService(post).unlike(user)
[docs]
def test_add_to_favorites(self):
service = PostService(self.post)
# Test adding post to favorites when not already added
service.add_to_favorites(self.user)
self.assertTrue(Favorite.objects.filter(post=self.post, author=self.user).exists())
# Test trying to add the same post to favorites again
with self.assertRaises(AlreadyAddedToFavoritesException):
service.add_to_favorites(self.user)
[docs]
def test_remove_from_favorites(self):
# Firstly, add post to favorites to setup the scenario
FavoriteService.create(self.post, self.user)
service = PostService(self.post)
# Test removing the post from favorites
service.remove_from_favorites(self.user)
self.assertFalse(Favorite.objects.filter(post=self.post, author=self.user).exists())
# Test trying to remove the post from favorites again
with self.assertRaises(UserNotHasThisPostInFavoritesException):
service.remove_from_favorites(self.user)
[docs]
class LikeServiceTestCase(TestCase):
[docs]
def setUp(self) -> None:
self.service = LikeService
[docs]
def test_create(self):
user = UserFactory() # type: ignore
post = PostFactory() # type: ignore
instance = self.service.create(post, user)
self.assertIsNotNone(instance)
[docs]
def test_delete(self):
instance = LikeFactory()
self.service.delete(instance.post, instance.author)
[docs]
class PostCategoryServiceTestCase(TestCase):
[docs]
def setUp(self) -> None:
self.service = PostCategoryService
self.max_quantity = POSTS_CONFIGS.get("MAX_POST_CATEGORIES", 1)
self.categories = CategoryFactory.create_batch(self.max_quantity + 1)
user = UserFactory()
self.post = PostFactory(author=user)
self.post.categories.set(self.categories[: self.max_quantity])
[docs]
def test_invalid_categories(self):
# Pass invalid categories
with self.assertRaises(CategoriesDontExistException):
self.service.set_categories(self.post, [9999])
[docs]
def test_too_many_categories(self):
with self.assertRaises(TooManyCategoriesException):
self.service.set_categories(self.post, [category.pk for category in self.categories])
[docs]
def test_valid_categories(self):
self.service.set_categories(self.post, [self.categories[0].pk])
self.assertListEqual(list(self.post.categories.values_list("pk", flat=True)), [self.categories[0].pk])
[docs]
def test_no_category_change(self):
initial_categories = self.post.categories.values_list("pk", flat=True)
self.service.set_categories(self.post, list(initial_categories))
self.assertListEqual(list(self.post.categories.values_list("pk", flat=True)), list(initial_categories))
[docs]
class TestFavoriteService(TestCase):
[docs]
def setUp(self):
self.user = UserFactory()
self.post = PostFactory()
[docs]
def test_create_new_favorite(self):
favorite, created = FavoriteService.create(self.post, self.user)
self.assertTrue(created)
self.assertEqual(favorite.author, self.user)
self.assertEqual(favorite.post, self.post)
[docs]
def test_create_existing_favorite(self):
# Create an existing favorite
Favorite.objects.create(post=self.post, author=self.user)
self.post.favorites_quantity = 1
self.post.save()
# Try to create again
favorite, created = FavoriteService.create(self.post, self.user)
self.assertFalse(created)
self.assertEqual(favorite.author, self.user)
self.assertEqual(favorite.post, self.post)
# Count should remain same
self.assertEqual(self.post.favorites.count(), 1)
self.assertEqual(self.post.favorites_quantity, 1)
[docs]
def test_delete_existing_favorite(self):
Favorite.objects.create(post=self.post, author=self.user)
FavoriteService.delete(self.post, self.user)
self.assertFalse(Favorite.objects.filter(post=self.post, author=self.user).exists())
[docs]
def test_delete_nonexistent_favorite(self):
with self.assertRaises(Favorite.DoesNotExist):
FavoriteService.delete(self.post, self.user)
[docs]
@pytest.mark.usefixtures("jpg_file")
class TestPostCreationService(unittest.TestCase):
[docs]
def setUp(self) -> None:
self.user = UserFactory()
self.service_class = PostCreationService
[docs]
def test_create(self) -> None:
data = {"content": [{"original": self.jpg_file, "type": ContentTypeChoices.IMAGE}]} # type: ignore
post = self.service_class.create(data, self.user)
self.assertEqual(post, Post.objects.filter(id=post.id, author=self.user).first())
[docs]
def test_check_media(self) -> None:
data = {"content": [{"original": self.jpg_file, "type": ContentTypeChoices.IMAGE}]} # type: ignore
post = self.service_class.create(data, self.user)
# Assert the post was created correctly
self.assertEqual(post, Post.objects.filter(id=post.id, author=self.user).first())
# Assert the media was created correctly
media = PostMedia.objects.get(post=post)
self.assertTrue(media.formatted_path.name.endswith(".webp"))
# Assert the image was compressed
with Image.open(media.formatted_path.path) as img:
self.assertEqual(img.format, "WEBP")
original_size = self.jpg_file.size # type: ignore
compressed_size = media.formatted_path.size
self.assertLess(compressed_size, original_size)
[docs]
def test_check_wide_image(self) -> None:
image_path = os.path.join(os.path.dirname(__file__), "../_data/wide.jpg")
with open(image_path, "rb") as image:
original_image_file = File(image)
data = {"content": [{"original": original_image_file, "type": ContentTypeChoices.IMAGE}]} # type: ignore
post = self.service_class.create(data, self.user)
# Assert the post was created correctly
self.assertEqual(post, Post.objects.filter(id=post.id, author=self.user).first())
# Assert the media was created correctly
media = PostMedia.objects.get(post=post)
self.assertTrue(media.formatted_path.name.endswith(".webp"))
# Assert the original image was compressed
with Image.open(media.formatted_path.path) as img:
self.assertEqual(img.format, "WEBP")
original_size = original_image_file.size # type: ignore
compressed_size = media.formatted_path.size
self.assertLess(compressed_size, original_size)
self.assertEqual(img.width, settings.IMAGE_MAX_RESOLUTION_WIDTH)
self.assertEqual(img.height, settings.IMAGE_MAX_RESOLUTION_HEIGHT)
# Assert the preview image was compressed
with Image.open(media.preview_path.path) as img:
self.assertEqual(img.format, "WEBP")
original_size = original_image_file.size # type: ignore
compressed_size = media.formatted_path.size
self.assertLess(compressed_size, original_size)
self.assertEqual(img.width, settings.IMAGE_MIN_RESOLUTION_WIDTH)
self.assertEqual(img.height, settings.IMAGE_MIN_RESOLUTION_HEIGHT)
[docs]
def test_check_high_image(self) -> None:
image_path = os.path.join(os.path.dirname(__file__), "../_data/high.jpg")
with open(image_path, "rb") as image:
original_image_file = File(image)
data = {"content": [{"original": original_image_file, "type": ContentTypeChoices.IMAGE}]} # type: ignore
post = self.service_class.create(data, self.user)
# Assert the post was created correctly
self.assertEqual(post, Post.objects.filter(id=post.id, author=self.user).first())
# Assert the media was created correctly
media = PostMedia.objects.get(post=post)
self.assertTrue(media.formatted_path.name.endswith(".webp"))
# Assert the image was compressed
with Image.open(media.formatted_path.path) as img:
self.assertEqual(img.format, "WEBP")
original_size = original_image_file.size # type: ignore
compressed_size = media.formatted_path.size
self.assertLess(compressed_size, original_size)
self.assertEqual(img.width, settings.IMAGE_MAX_RESOLUTION_WIDTH)
self.assertEqual(img.height, settings.IMAGE_MAX_RESOLUTION_HEIGHT)
with Image.open(media.preview_path.path) as img:
self.assertEqual(img.format, "WEBP")
original_size = original_image_file.size # type: ignore
compressed_size = media.formatted_path.size
self.assertLess(compressed_size, original_size)
self.assertEqual(img.width, settings.IMAGE_MIN_RESOLUTION_WIDTH)
self.assertEqual(img.height, settings.IMAGE_MIN_RESOLUTION_HEIGHT)
[docs]
def test_check_status(self) -> None:
image_path = os.path.join(os.path.dirname(__file__), "../_data/wide.jpg")
with open(image_path, "rb") as image:
original_image_file = File(image)
data = {"content": [{"original": original_image_file, "type": ContentTypeChoices.IMAGE}]} # type: ignore
post = self.service_class.create(data, self.user)
# Assert the post was created correctly
self.assertEqual(post, Post.objects.filter(id=post.id, author=self.user).first())
# Assert the media was created correctly
media = PostMedia.objects.get(post=post)
self.assertTrue(media.formatted_path.name.endswith(".webp"))
self.assertEqual(media.post.status, StatusPostChoice.READY_FOR_PUBLISH)
[docs]
def test_original_field_update(self) -> None:
post = PostFactory(author=self.user)
post_media = PostMediaFactory(post=post, type="image") # type: ignore
updated_data = {"original": self.jpg_file} # type: ignore
old_original = post_media.original
updated_media = PostMediaService.update(post_media, updated_data)
# Refresh from DB to get the latest state
updated_media.refresh_from_db()
# Assert old original is not equal to updated one
self.assertNotEqual(old_original, updated_media.original)
[docs]
class TestPostMediaService(unittest.TestCase):
"""Test post media service"""
[docs]
def setUp(self) -> None:
self.service_class = PostMediaService
self.post_media = PostMediaFactory()
self.data = {
"post_media": self.post_media,
"upload_video_path": f"media/post-content/{self.post_media.post.id}/video/formated/stream.m3u8",
"upload_preview_path": f"media/post-content/{self.post_media.post.id}/video/preview.jpg",
}
[docs]
def test_set_formatted_path(self) -> None:
"""Test formatted path success case"""
instance = self.service_class.set_formatted_path(**self.data) # type: ignore
self.assertEqual(instance.formatted_path, self.data["upload_video_path"])
self.assertEqual(instance.preview_path, self.data["upload_preview_path"])