import os
import re
from datetime import timedelta
from typing import List
from django.conf import settings
from django.contrib.auth.hashers import check_password
from django.core import mail
from django.core.files import File
from django.urls import reverse
from django.utils.timezone import now
from elasticsearch_dsl import connections
from rest_framework import status
from rest_framework.test import APITestCase, APITransactionTestCase
from api.users.exceptions import AlreadyFollowingUserAPIException, NotFollowingUserAPIException
from apps.posts.models import Like
from apps.users.documents import UserDocument
from apps.users.models import ActivationCode, Category, User
from apps.users.validators import AgeValidator
from common.tests import get_token_for_user
from core.settings import USERS_CONFIGS
from tests.posts.factories import CommentFactory, LikeFactory, PostFactory
from tests.users.factories import CategoryFactory, UserFactory, fake # type: ignore
[docs]
class UserSignUpTests(APITestCase):
interests: List[Category] = []
[docs]
@classmethod
def setUpClass(cls):
super(UserSignUpTests, cls).setUpClass()
cls.interests = CategoryFactory.create_batch(10)
cls.user = UserFactory.create()
[docs]
def test_successful_signup(self):
image_path = os.path.join(os.path.dirname(__file__), "../_data/1.jpg")
with open(image_path, "rb") as image:
password = fake.password()
data = {
"email": "0wlrlyk@gmail.com",
"username": fake.user_name(),
"password": password,
"password_repeat": password,
"interests": [interest.id for interest in self.interests], # використовуємо всі 10 інтересів
"avatar": File(image),
"birthdate": fake.date_of_birth(minimum_age=14).strftime("%Y-%m-%d"),
"first_name": fake.first_name(),
"last_name": fake.last_name(),
}
url = reverse("users:signup")
response = self.client.post(url, data, format="multipart")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("id", response.data)
self.assertTrue(User.objects.filter(email=data["email"]).exists())
[docs]
def test_successful_activation(self):
image_path = os.path.join(os.path.dirname(__file__), "../_data/1.jpg")
with open(image_path, "rb") as image:
password = fake.password()
data = {
"email": fake.email(),
"username": fake.user_name(),
"password": password,
"password_repeat": password,
"interests": [interest.id for interest in self.interests], # використовуємо всі 10 інтересів
"avatar": File(image),
"birthdate": fake.date_of_birth(minimum_age=14).strftime("%Y-%m-%d"),
"first_name": fake.first_name(),
"last_name": fake.last_name(),
}
url = reverse("users:signup")
response = self.client.post(url, data, format="multipart")
message = mail.outbox[0].message().__str__()
match = re.search(r'href="([^"]*activate[^"]*)"', message)
url = match.group(1)
# Search of uid and token in link
uid_match = re.search(r"uid=([^&]*)", url)
token_match = re.search(r"token=([^&]*)", url)
if uid_match and token_match:
uid = uid_match.group(1)
token = token_match.group(1)
url = reverse("users:activate", kwargs={"uid": uid, "token": token})
activate_response = self.client.post(url, format="json")
user = User.objects.get(pk=response.data.get("id"))
user.refresh_from_db()
self.assertIsNotNone(user.password)
self.assertEqual(activate_response.status_code, status.HTTP_200_OK)
self.assertEqual(user.email, data["email"])
self.assertEqual(user.username, data["username"])
self.assertIn("access", activate_response.data)
self.assertIn("refresh", activate_response.data)
self.assertTrue(".jpg" in user.profile.avatar.name)
self.assertEqual(list(user.interests.all().values_list("pk", flat=True)), data["interests"])
self.assertTrue(user.is_active)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn("id", response.data)
self.assertTrue(User.objects.filter(email=data["email"]).exists())
[docs]
def test_wrong_signup_different_passwords(self):
image_path = os.path.join(os.path.dirname(__file__), "../_data/1.jpg")
with open(image_path, "rb") as image:
data = {
"email": fake.email(),
"username": fake.user_name(),
"password": fake.password(),
"password_repeat": fake.password(),
"interests": [interest.id for interest in self.interests], # використовуємо всі 10 інтересів
"avatar": File(image),
"birthdate": fake.date_of_birth(minimum_age=14).strftime("%Y-%m-%d"),
"first_name": fake.first_name(),
"last_name": fake.last_name(),
}
url = reverse("users:signup")
response = self.client.post(url, data, format="multipart")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(len(response.data.keys()), 1)
self.assertEqual(list(response.data.keys())[0], "password_repeat")
[docs]
def test_wrong_signup_missing_fields(self):
data = {
"email": fake.email(),
}
url = reverse("users:signup")
response = self.client.post(url, data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
[docs]
def test_successful_change_password_signup(self):
token = get_token_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
url = reverse("users:password-change")
password = fake.password()
data = {"password": password, "password_repeat": password}
response = self.client.post(url, data=data, format="json")
self.user.refresh_from_db()
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
self.assertTrue(check_password(password, self.user.password))
[docs]
def test_change_password_signup_password_check(self):
token = get_token_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
url = reverse("users:password-change")
password = "password" # noqa
data = {"password": password, "password_repeat": password}
response = self.client.post(url, data=data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("password", response.data)
[docs]
def test_successful_reset_password(self):
data = {"email": self.user.email}
url_reset_password = reverse("users:reset-password")
response = self.client.post(url_reset_password, data=data, format="json")
message = mail.outbox[0].message().__str__()
match = re.search(r'href="([^"]*reset-password[^"]*)"', message)
is_activation_code_exists = self.user.activation_codes.all().exists()
url = match.group(1)
# Search for uid and token in a link
uid_match = re.search(r"uid=([^&]*)", url)
token_match = re.search(r"token=([^&]*)", url)
if uid_match and token_match:
uid = uid_match.group(1)
token = token_match.group(1)
data = {"uid": uid, "token": token, "password": "New_p@ssword1", "password_repeat": "New_p@ssword1"}
url = reverse("users:confirm-password")
confirm_response = self.client.post(url, data=data, format="json")
is_activation_code_exists_after = self.user.activation_codes.all().exists()
self.user.refresh_from_db()
self.assertTrue(check_password(data["password"], self.user.password))
self.assertEqual(confirm_response.status_code, status.HTTP_204_NO_CONTENT)
self.assertTrue(is_activation_code_exists)
self.assertFalse(is_activation_code_exists_after)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
[docs]
def test_reset_password_check_password(self):
data = {"email": self.user.email}
url_reset_password = reverse("users:reset-password")
response = self.client.post(url_reset_password, data=data, format="json")
message = mail.outbox[0].message().__str__()
match = re.search(r'href="([^"]*reset-password[^"]*)"', message)
self.user.activation_codes.all().exists()
url = match.group(1)
# Search for uid and token in a link
uid_match = re.search(r"uid=([^&]*)", url)
token_match = re.search(r"token=([^&]*)", url)
if uid_match and token_match:
uid = uid_match.group(1)
token = token_match.group(1)
data = {"uid": uid, "token": token, "password": "password", "password_repeat": "password"}
url = reverse("users:confirm-password")
confirm_response = self.client.post(url, data=data, format="json")
self.assertEqual(confirm_response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("password", confirm_response.data)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
[docs]
def test_wrong_uid(self):
data = {"email": self.user.email}
url_reset_password = reverse("users:reset-password")
response = self.client.post(url_reset_password, data=data, format="json")
message = mail.outbox[0].message().__str__()
match = re.search(r'href="([^"]*reset-password[^"]*)"', message)
url = match.group(1)
# Search for uid and token in a link
uid_match = re.search(r"uid=([^&]*)", url)
token_match = re.search(r"token=([^&]*)", url)
if uid_match and token_match:
token = token_match.group(1)
data = {"uid": "AV8xsq", "token": token, "password": "New_password", "password_repeat": "New_password"}
url = reverse("users:confirm-password")
confirm_response = self.client.post(url, data=data, format="json")
self.user.refresh_from_db()
self.assertTrue("uid" in confirm_response.data)
self.assertEqual(confirm_response.data["uid"], "The uid is incorrect")
self.assertEqual(confirm_response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
[docs]
def test_wrong_token(self):
self.user.is_active = True
self.user.save()
data = {"email": self.user.email}
url_reset_password = reverse("users:reset-password")
response = self.client.post(url_reset_password, data=data, format="json")
message = mail.outbox[0].message().__str__()
match = re.search(r'href="([^"]*reset-password[^"]*)"', message)
url = match.group(1)
# Search for uid and token in a link
uid_match = re.search(r"uid=([^&]*)", url)
token_match = re.search(r"token=([^&]*)", url)
if uid_match and token_match:
uid = uid_match.group(1)
data = {"uid": uid, "token": "random_text", "password": "New_password", "password_repeat": "New_password"}
url = reverse("users:confirm-password")
confirm_response = self.client.post(url, data=data, format="json")
self.user.refresh_from_db()
self.assertTrue("token" in confirm_response.data)
self.assertEqual(str(confirm_response.data["token"][0]), "This token is invalid")
self.assertEqual(confirm_response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
[docs]
def test_wrong_token_expired(self):
data = {"email": self.user.email}
url_reset_password = reverse("users:reset-password")
response = self.client.post(url_reset_password, data=data, format="json")
message = mail.outbox[0].message().__str__()
match = re.search(r'href="([^"]*reset-password[^"]*)"', message)
url = match.group(1)
# Search for uid and token in a link
uid_match = re.search(r"uid=([^&]*)", url)
token_match = re.search(r"token=([^&]*)", url)
if uid_match and token_match:
uid = uid_match.group(1)
token = token_match.group(1)
activation_code = ActivationCode.objects.get(uid=uid, code=token, user=self.user)
activation_code.expiration_date = now() - timedelta(days=1)
activation_code.save()
data = {"uid": uid, "token": token, "password": "New_password", "password_repeat": "New_password"}
url = reverse("users:confirm-password")
confirm_response = self.client.post(url, data=data, format="json")
self.user.refresh_from_db()
self.assertTrue("token" in confirm_response.data)
self.assertEqual(str(confirm_response.data["token"][0]), "This token is expired")
self.assertEqual(confirm_response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
[docs]
def test_successful_change_email(self):
token = get_token_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
data = {"email": "new_test_email@test.mail.com"}
url_change_email = reverse("users:user")
response = self.client.patch(url_change_email, data=data, format="json")
message = mail.outbox[0].message().__str__()
match = re.search(r'href="([^"]*confirm-email[^"]*)"', message)
is_activation_code_exists = self.user.activation_codes.all().exists()
url = match.group(1)
# Search for uid and token in a link
uid_match = re.search(r"uid=([^&]*)", url)
token_match = re.search(r"token=([^&]*)", url)
if uid_match and token_match:
uid = uid_match.group(1)
token = token_match.group(1)
url = reverse("users:confirm-email", kwargs={"uid": uid, "token": token})
confirm_response = self.client.post(url, data=data, format="json")
is_activation_code_exists_after = self.user.activation_codes.all().exists()
self.user.refresh_from_db()
self.assertEqual(self.user.email, data["email"])
self.assertEqual(confirm_response.status_code, status.HTTP_204_NO_CONTENT)
self.assertTrue(is_activation_code_exists)
self.assertFalse(is_activation_code_exists_after)
self.assertNotEqual(response.data.get("email"), data["email"])
self.assertEqual(response.status_code, status.HTTP_200_OK)
[docs]
def test_successful_delete_user(self):
token = get_token_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
url = reverse("users:user")
url_second = reverse("users:user", kwargs={"pk": self.user.pk})
response_delete = self.client.delete(url, format="json")
response_get = self.client.get(url_second, format="json")
self.user.refresh_from_db()
self.assertEqual(response_delete.status_code, status.HTTP_204_NO_CONTENT)
self.assertEqual(response_get.status_code, status.HTTP_401_UNAUTHORIZED)
[docs]
def test_successful_delete_user_and_signup(self):
self.user.is_deleted = True
password = "B@seP@ssw0rd" # noqa
self.user.set_password(password)
self.user.save()
token_url = reverse("token")
data = {"email": self.user.email, "password": password}
response_token = self.client.post(token_url, data=data, format="json")
self.assertEqual(response_token.status_code, status.HTTP_401_UNAUTHORIZED)
[docs]
def tests_successful_user_flow(self):
password = fake.password()
data = {
"birthdate": fake.date_of_birth(minimum_age=14).strftime("%Y-%m-%d"),
"password": password,
"email": fake.email(),
"password_repeat": password,
"interests": [interest.id for interest in self.interests],
"username": fake.user_name(),
"first_name": fake.first_name(),
"last_name": fake.last_name(),
}
# Part 1 :: Create user and send activation letter at email
url = reverse("users:signup")
response_create = self.client.post(url, data, format="multipart")
user = User.objects.get(pk=response_create.data.get("id"))
# Part 2 :: Resend activation letter at email
url_resend = reverse("users:resend-activation-code")
response_resend_activation_code = self.client.post(url_resend, {"email": data["email"]}, format="multipart")
message = mail.outbox[1].message().__str__()
match = re.search(r'href="([^"]*activate[^"]*)"', message)
url = match.group(1)
# Search of uid and token in link
uid_match = re.search(r"uid=([^&]*)", url)
token_match = re.search(r"token=([^&]*)", url)
if uid_match and token_match:
uid = uid_match.group(1)
token = token_match.group(1)
# Part 3 :: Activate user
url = reverse("users:activate", kwargs={"uid": uid, "token": token})
activate_response = self.client.post(url, format="json")
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {activate_response.data.get('access')}")
# Part 4 :: Update user
url_user = reverse("users:user")
update_data = {
"username": "test_username",
"interests": [interest.id for interest in self.interests], # використовуємо всі 10 інтересів
}
response_update = self.client.patch(url_user, data=update_data, format="multipart")
user.refresh_from_db()
self.assertEqual(user.is_active, True)
self.assertEqual(user.username, update_data.get("username"))
self.assertEqual(
list(user.interests.values_list("pk", flat=True)),
[i.get("id") for i in response_update.data.get("interests_list")],
)
self.assertEqual(response_create.status_code, status.HTTP_200_OK)
self.assertEqual(response_resend_activation_code.status_code, status.HTTP_204_NO_CONTENT)
self.assertTrue(len(mail.outbox) >= 2)
[docs]
def tests_successful_user_flow_json(self):
password = fake.password()
data = {
"birthdate": fake.date_of_birth(minimum_age=14).strftime("%Y-%m-%d"),
"password": password,
"email": fake.email(),
"password_repeat": password,
"interests": [interest.id for interest in self.interests],
"username": fake.user_name(),
"first_name": fake.first_name(),
"last_name": fake.last_name(),
}
# Part 1 :: Create user and send activation letter at email
url = reverse("users:signup")
response_create = self.client.post(url, data, format="multipart")
user = User.objects.get(pk=response_create.data.get("id"))
# Part 2 :: Resend activation letter at email
url_resend = reverse("users:resend-activation-code")
response_resend_activation_code = self.client.post(url_resend, {"email": data["email"]}, format="multipart")
message = mail.outbox[1].message().__str__()
match = re.search(r'href="([^"]*activate[^"]*)"', message)
url = match.group(1)
# Search of uid and token in link
uid_match = re.search(r"uid=([^&]*)", url)
token_match = re.search(r"token=([^&]*)", url)
if uid_match and token_match:
uid = uid_match.group(1)
token = token_match.group(1)
# Part 3 :: Activate user
url = reverse("users:activate", kwargs={"uid": uid, "token": token})
activate_response = self.client.post(url, format="json")
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {activate_response.data.get('access')}")
# Part 4 :: Update user
url_user = reverse("users:user")
update_data = {
"username": "test_username",
"interests": [interest.id for interest in self.interests], # використовуємо всі 10 інтересів
"birthdate": fake.date_of_birth(minimum_age=14).strftime("%Y-%m-%d"),
"password": password,
"email": fake.email(),
}
response_update = self.client.patch(url_user, data=update_data, format="json")
user.refresh_from_db()
self.assertEqual(user.is_active, True)
self.assertEqual(user.username, update_data.get("username"))
self.assertEqual(
list(user.interests.values_list("pk", flat=True)),
[i.get("id") for i in response_update.data.get("interests_list")],
)
self.assertEqual(response_create.status_code, status.HTTP_200_OK)
self.assertEqual(response_resend_activation_code.status_code, status.HTTP_204_NO_CONTENT)
self.assertTrue(len(mail.outbox) >= 2)
[docs]
def tests_check_min_age(self):
password = fake.password()
data = {
"birthdate": fake.date_of_birth(maximum_age=USERS_CONFIGS["AGE_LIMIT"] - 1).strftime("%Y-%m-%d"),
"password": password,
"email": fake.email(),
"password_repeat": password,
}
url = reverse("users:signup")
response = self.client.post(url, data, format="multipart")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.data["birthdate"][0], AgeValidator.messages["age_validation_error"])
[docs]
def tests_check_age_valid(self):
password = fake.password()
data = {
"birthdate": fake.date_of_birth(minimum_age=USERS_CONFIGS["AGE_LIMIT"] + 1).strftime("%Y-%m-%d"),
"password": password,
"email": fake.email(),
"password_repeat": password,
"interests": [interest.id for interest in self.interests],
"username": fake.user_name(),
"first_name": fake.first_name(),
"last_name": fake.last_name(),
}
url = reverse("users:signup")
response = self.client.post(url, data, format="multipart")
self.assertEqual(response.status_code, status.HTTP_200_OK)
[docs]
def tests_check_password(self):
password = "password" # noqa
data = {
"birthdate": fake.date_of_birth(minimum_age=USERS_CONFIGS["AGE_LIMIT"] + 1).strftime("%Y-%m-%d"),
"password": password,
"email": fake.email(),
"password_repeat": password,
}
url = reverse("users:signup")
response = self.client.post(url, data, format="multipart")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("password", response.data)
[docs]
def tests_check_fields(self):
password = fake.password() # noqa
data = {
"birthdate": fake.date_of_birth(minimum_age=USERS_CONFIGS["AGE_LIMIT"] + 1).strftime("%Y-%m-%d"),
"password": password,
"email": fake.email(),
"password_repeat": password,
"is_active": True,
"interests": [interest.id for interest in self.interests],
"username": fake.user_name(),
"first_name": fake.first_name(),
"last_name": fake.last_name(),
}
url = reverse("users:signup")
response = self.client.post(url, data, format="multipart")
self.assertFalse(response.data["is_active"])
[docs]
def test_list_of_users(self):
quantity = 5
UserFactory.create_batch(quantity)
response = self.client.get(reverse("users:list"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], quantity + 1)
[docs]
class FollowTestCase(APITestCase):
[docs]
def setUp(self):
self.es = connections.get_connection()
page_size = settings.REST_FRAMEWORK["PAGE_SIZE"]
self.user1 = UserFactory()
self.user2 = UserFactory()
self.user1.save()
self.user2.save()
self.main_user = UserFactory()
self.password = fake.password()
self.main_user.set_password(self.password)
self.main_user.save()
UserDocument().update(self.main_user)
self.followers = UserFactory.create_batch(page_size * 2, first_name="SameName")
self.main_user.followers.set(self.followers)
for follower in self.followers:
follower.save()
UserDocument().update(follower)
self.following = UserFactory.create_batch(page_size * 2, first_name="SameName")
self.main_user.following.set(self.following)
for follow in self.following:
follow.save()
UserDocument().update(follow)
self.likes = LikeFactory.create_batch(10)
Like.objects.update(author=self.main_user)
[docs]
def tearDown(self):
self.es.delete_by_query(index="users", body={"query": {"match_all": {}}})
self.es.indices.refresh(index="users")
[docs]
def test_successful_follow(self):
token = get_token_for_user(self.user1)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = self.client.post(reverse("users:follow", kwargs={"pk": self.user2.pk}))
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
self.assertIn(self.user2, self.user1.following.all())
[docs]
def test_successful_unfollow(self):
token = get_token_for_user(self.user1)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
self.user1.following.add(self.user2)
response = self.client.post(reverse("users:unfollow", kwargs={"pk": self.user2.pk}))
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
self.assertNotIn(self.user2, self.user1.following.all())
[docs]
def test_error_unfollowing_unfollowed_user(self):
token = get_token_for_user(self.user1)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = self.client.post(reverse("users:unfollow", kwargs={"pk": self.user2.pk}))
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.data["detail"], NotFollowingUserAPIException.default_detail)
[docs]
def test_error_following_already_followed_user(self):
token = get_token_for_user(self.user1)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
self.user1.following.add(self.user2)
response = self.client.post(reverse("users:follow", kwargs={"pk": self.user2.pk}))
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.data["detail"], AlreadyFollowingUserAPIException.default_detail)
[docs]
def test_list_followers(self):
token = get_token_for_user(self.user1)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
self.user2.following.add(self.user1)
response = self.client.get(reverse("users:followers_list"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(self.user2.pk, [user["id"] for user in response.data["results"]])
[docs]
def test_list_followers_other_user(self):
token = get_token_for_user(self.user1)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
self.user1.following.add(self.user2)
response = self.client.get(
reverse("users:user_followers_list", kwargs={"pk": self.user2.pk}),
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(self.user1.pk, [user["id"] for user in response.data["results"]])
[docs]
def test_search(self):
connections.get_connection().indices.refresh(index="users")
token = get_token_for_user(self.main_user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = self.client.get(
reverse("users:followers_list"),
{"search": "SameName", "limit": str(settings.REST_FRAMEWORK["PAGE_SIZE"]), "offset": 0},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data["results"]), settings.REST_FRAMEWORK["PAGE_SIZE"])
[docs]
def test_list_following(self):
token = get_token_for_user(self.user1)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
self.user1.following.add(self.user2) # add user2 to followers of user1
response = self.client.get(reverse("users:following_list"))
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(self.user2.pk, [user["id"] for user in response.data["results"]])
[docs]
def test_list_following_other_user(self):
token = get_token_for_user(self.user1)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
self.user2.following.add(self.user1) # add user2 to followers of user1
response = self.client.get(
reverse("users:user_following_list", kwargs={"pk": self.user2.pk}),
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(self.user1.pk, [user["id"] for user in response.data["results"]])
[docs]
def test_search_following(self):
connections.get_connection().indices.refresh(index="users")
token = get_token_for_user(self.main_user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = self.client.get(
reverse("users:following_list"),
{"search": "SameName", "limit": str(settings.REST_FRAMEWORK["PAGE_SIZE"]), "offset": 0},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data["results"]), settings.REST_FRAMEWORK["PAGE_SIZE"])
[docs]
def test_users_likes_followers_following_quantity(self):
token = get_token_for_user(self.main_user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = self.client.get(reverse("users:user"))
self.assertEqual(response.data["likes_quantity"], len(self.likes))
self.assertEqual(response.data["followers_quantity"], len(self.followers))
self.assertEqual(response.data["following_quantity"], len(self.following))
[docs]
def test_auth_following(self):
data = {"email": self.main_user.email, "password": self.password}
response = self.client.post(reverse("token"), data=data)
self.assertEqual(len(response.data["user"]["following"]), 5)
[docs]
def test_users_following_field(self):
token = get_token_for_user(self.main_user)
following_user = self.main_user.following.first()
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
response = self.client.get(reverse("users:user", kwargs={"pk": following_user.pk}))
self.assertTrue(response.data["is_current_user_following"])
[docs]
class ExportDataTests(APITransactionTestCase):
[docs]
def setUp(self):
self.user = UserFactory()
self.post = PostFactory(author=self.user)
self.like = LikeFactory(author=self.user, post=self.post)
self.comment = CommentFactory(author=self.user, post=self.post)
[docs]
def test_export_data(self):
self.user.refresh_from_db()
self.post.refresh_from_db()
self.like.refresh_from_db()
self.comment.refresh_from_db()
token = get_token_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token}")
url = reverse("users:export_data")
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertIn("User", data)
self.assertIn("Post", data)
self.assertIn("Like", data)
self.assertIn("Comment", data)
user_data = data["User"]
self.assertEqual(user_data["id"], self.user.id)