tests
This commit is contained in:
@@ -9,3 +9,4 @@ python-jose[cryptography] == 3.5.0
|
|||||||
alembic == 1.16.5
|
alembic == 1.16.5
|
||||||
pytest == 8.4.1
|
pytest == 8.4.1
|
||||||
slowapi == 0.1.9
|
slowapi == 0.1.9
|
||||||
|
pytest-asyncio == 1.2.0
|
||||||
@@ -87,19 +87,19 @@ async def update_user(email:str, updated_row: pydentic.UserUpdate, current_user:
|
|||||||
changed = True
|
changed = True
|
||||||
# пароль
|
# пароль
|
||||||
if updated_row.password:
|
if updated_row.password:
|
||||||
if not verify_password(updated_row.password, user.password):
|
if not db.verify_password(updated_row.password, user.password):
|
||||||
user.password = updated_row.password
|
user.password = updated_row.password
|
||||||
changed = True
|
changed = True
|
||||||
# права (только для админа)
|
# права (только для админа)
|
||||||
if current_perms.is_admin:
|
if current_perms.is_admin:
|
||||||
perm_fields = ["can_edit", "can_delete", "can_view", "is_admin"]
|
perm_fields = ["can_edit", "can_delete", "can_view"]
|
||||||
for field in perm_fields:
|
for field in perm_fields:
|
||||||
new_value = getattr(updated_row, field)
|
new_value = getattr(updated_row, field)
|
||||||
if new_value is not None and new_value != getattr(perm, field):
|
if new_value is not None and new_value != getattr(perm, field):
|
||||||
setattr(perm, field, new_value)
|
setattr(perm, field, new_value)
|
||||||
changed = True
|
changed = True
|
||||||
if changed:
|
if changed:
|
||||||
user = await db.update_user(user_info=user, perm_info=perm)
|
user = await db.update_user(email = email, user_info=user, perm_info=perm)
|
||||||
return user
|
return user
|
||||||
@api.post("/login")
|
@api.post("/login")
|
||||||
async def login_user(form_data: OAuth2PasswordRequestForm = Depends()):
|
async def login_user(form_data: OAuth2PasswordRequestForm = Depends()):
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from pydantic import BaseModel, Field, EmailStr, constr,validator
|
from pydantic import BaseModel, Field, EmailStr, constr, field_validator
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
from enum import IntEnum
|
from enum import IntEnum
|
||||||
|
|
||||||
@@ -22,7 +22,7 @@ class UsersInfo(BaseModel):
|
|||||||
description: str = Field(..., description="description of the user")
|
description: str = Field(..., description="description of the user")
|
||||||
activated:bool = Field(..., description="Has the user activated their account")
|
activated:bool = Field(..., description="Has the user activated their account")
|
||||||
password:constr(min_length=8) = Field(..., description="Password with min 8 chars, letters and digits")
|
password:constr(min_length=8) = Field(..., description="Password with min 8 chars, letters and digits")
|
||||||
@validator('password')
|
@field_validator('password')
|
||||||
def password_validator(cls, password):
|
def password_validator(cls, password):
|
||||||
return check_password_complexity(cls, password)
|
return check_password_complexity(cls, password)
|
||||||
class CreateUser(UsersInfo):
|
class CreateUser(UsersInfo):
|
||||||
@@ -34,8 +34,8 @@ class UserUpdate(BaseModel):
|
|||||||
password:Optional[constr(min_length=8)] = Field(None, description="Password with min 8 chars, letters and digits")
|
password:Optional[constr(min_length=8)] = Field(None, description="Password with min 8 chars, letters and digits")
|
||||||
can_edit:Optional[bool] = Field(None, description="The user can edit something")
|
can_edit:Optional[bool] = Field(None, description="The user can edit something")
|
||||||
can_delete:Optional[bool] = Field(None, description="The user can delete something")
|
can_delete:Optional[bool] = Field(None, description="The user can delete something")
|
||||||
can_view:Optional[bool]=Field(None, descriptiopn="The user can view something")
|
can_view:Optional[bool]=Field(None, description="The user can view something")
|
||||||
@validator('password')
|
@field_validator('password')
|
||||||
def password_validator(cls, password):
|
def password_validator(cls, password):
|
||||||
return check_password_complexity(cls, password)
|
return check_password_complexity(cls, password)
|
||||||
class UserLogin(BaseModel):
|
class UserLogin(BaseModel):
|
||||||
@@ -44,7 +44,7 @@ class UserLogin(BaseModel):
|
|||||||
class UserReset(BaseModel):
|
class UserReset(BaseModel):
|
||||||
email:EmailStr = Field(..., min_length=6, max_length=254, description="user's email")
|
email:EmailStr = Field(..., min_length=6, max_length=254, description="user's email")
|
||||||
new_password:constr(min_length=8) = Field(None,description="New_password")
|
new_password:constr(min_length=8) = Field(None,description="New_password")
|
||||||
@validator('new_password')
|
@field_validator('new_password')
|
||||||
def password_validator(cls, new_password):
|
def password_validator(cls, new_password):
|
||||||
return check_password_complexity(cls, new_password)
|
return check_password_complexity(cls, new_password)
|
||||||
class UserOut(BaseModel):
|
class UserOut(BaseModel):
|
||||||
|
|||||||
@@ -72,9 +72,9 @@ async def get_all_rows():
|
|||||||
result = await session.execute(select(User))
|
result = await session.execute(select(User))
|
||||||
users = result.scalars().all()
|
users = result.scalars().all()
|
||||||
return users
|
return users
|
||||||
async def update_user(user_info, perm_info):
|
async def update_user(email,user_info, perm_info):
|
||||||
async with AsyncSessionLocal() as session:
|
async with AsyncSessionLocal() as session:
|
||||||
result = await session.execute(select(User).options(selectinload(User.permissions)).where(User.email==user_info.email))
|
result = await session.execute(select(User).options(selectinload(User.permissions)).where(User.email==email))
|
||||||
user = result.scalar_one_or_none()
|
user = result.scalar_one_or_none()
|
||||||
if user:
|
if user:
|
||||||
user.email = user_info.email
|
user.email = user_info.email
|
||||||
|
|||||||
0
server/testing/__init__.py
Normal file
0
server/testing/__init__.py
Normal file
9
server/testing/conftest.py
Normal file
9
server/testing/conftest.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
import pytest_asyncio
|
||||||
|
from httpx import AsyncClient, ASGITransport
|
||||||
|
from server.backend.endpoints import api
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def client():
|
||||||
|
transport = ASGITransport(app=api)#подключение к FastAPI-приложению напрямую
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac: #Имитирует подключение в это приложение, как будто по сети
|
||||||
|
yield ac #чтобы вернуть значение, но не завершить функцию.
|
||||||
0
server/testing/tests/__init__.py
Normal file
0
server/testing/tests/__init__.py
Normal file
@@ -1,7 +1,39 @@
|
|||||||
# test_math.py
|
import pytest
|
||||||
def add(a, b):
|
from httpx import AsyncClient, ASGITransport
|
||||||
return a + b
|
from fastapi import status
|
||||||
|
from server.backend import JWT # твой JWT модуль
|
||||||
|
from server.backend.endpoints import api
|
||||||
|
from server.testing.conftest import client
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_protected_unauthorized(client):
|
||||||
|
"Проверка: без токена — 401"
|
||||||
|
|
||||||
|
|
||||||
|
response = await client.get("/protected") #отправляется GET /protected без токена
|
||||||
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_protected_invalidtoken(client):
|
||||||
|
"Проверка: Токена неверный - 401"
|
||||||
|
|
||||||
|
|
||||||
|
response = await client.get("/protected", headers={"Authorization": "Invalid token"})
|
||||||
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_protected_authorized(client):
|
||||||
|
"Проверка: с токеном — ответ с Hello"
|
||||||
|
|
||||||
|
async def fake_current_user(): #фейковая зависимость (fake_current_user) — функция, которая имитирует поведение настоящего JWT.current_user, но просто возвращает строку "test_user".
|
||||||
|
return "test_user"
|
||||||
|
|
||||||
|
# подменяем зависимость
|
||||||
|
api.dependency_overrides[JWT.current_user] = fake_current_user #FastAPI позволяет временно подменять зависимости (через Depends).
|
||||||
|
response = await client.get("/protected", headers={"Authorization": "Bearer faketoken"})
|
||||||
|
assert response.status_code == status.HTTP_200_OK
|
||||||
|
assert response.json() == {"msg": "Hello, test_user"}
|
||||||
|
|
||||||
|
api.dependency_overrides.clear() #после теста очистка оверрайда, чтобы не повлиять на другие тесты.
|
||||||
|
|
||||||
|
|
||||||
def test_addition():
|
|
||||||
assert add(2, 2) == 4
|
|
||||||
assert add(-1, 1) == 0
|
|
||||||
Reference in New Issue
Block a user