22 Commits

Author SHA1 Message Date
94d3956da8 final 2025-10-18 17:34:59 +03:00
903cb7c930 pytest 1.7 2025-10-12 18:59:42 +03:00
c015a25c81 pytest 1.6 2025-10-12 18:35:10 +03:00
bc5360d9f6 pytest 1.5 2025-10-12 18:30:20 +03:00
1423ca9e68 pytest 1.4 2025-10-12 18:18:30 +03:00
52fb856307 pytest 1.3 2025-10-12 18:17:20 +03:00
71ab99232b readme 3.7 2025-10-12 16:41:44 +03:00
d0a13e3863 pytest 1.2 2025-10-12 16:39:15 +03:00
a4f9af3e2d tests 2025-10-12 15:00:23 +03:00
a2aadc82b7 Update readme 3.6 2025-10-05 16:13:13 +03:00
701bf5f603 user information 2025-10-05 15:55:19 +03:00
b3aaf04024 rate_limit slowapi 2025-10-05 15:15:32 +03:00
f7b39da624 permissions 1.1 and hash_password issue solve 2025-10-05 14:43:25 +03:00
6db14b2329 makefile 1.1 and permissions 1.0 2025-10-05 02:09:28 +03:00
46c6e1cd94 pytest 1.0 and makefile 2025-10-03 14:47:12 +03:00
1c7ebdcc2e alembic 1.2 2025-10-01 12:24:49 +03:00
28a853c865 alembic 1.1 2025-10-01 12:24:09 +03:00
5deffdfcf1 alembic 1.0 2025-09-30 15:27:13 +03:00
e7a9b0a2f7 db dates 2025-09-30 11:22:20 +03:00
7d2e290fb6 error exceptions and columns of time 2025-09-28 18:09:08 +03:00
e18e16bde2 uvicorn logging instead middleware 2025-09-28 16:50:49 +03:00
71116de157 pydantic out model and comments 2025-09-28 16:39:54 +03:00
31 changed files with 913 additions and 104 deletions

1
.gitignore vendored
View File

@@ -7,6 +7,7 @@ __pycache__/
*.pyc
*.pyo
*.pyd
*.pytest_cache
# IDE и редакторы
.vscode/

7
Plan
View File

@@ -1,7 +0,0 @@
###Приложение для мониторинга серверов###
#План: сделать сайт для отображения информации о системных компонентах
#Сделать авторизацию на нем с валидацией pydantic и создание пользователя с fastapi + sqlalchemy
#создать простую страничку с текстом по разным серверам
#придумать способ получать эти данные с серверов или хотяб со своей машины
#отправлять их так же вначале в fast-api, а он в бд и выводить будет на сайте
#Уведомлять по тг, если значения равын чему то

View File

@@ -8,12 +8,16 @@ Pet-проект на стеке **FastAPI + SQLAlchemy (async) + Pydantic + Pyt
## 📂 Структура проекта
```
/
├── server/ # исходный код: модели, маршруты, JWT, базы данных
├── run.py # точка входа, запуск приложения
├── requirements.txt # зависимости
├── .env # переменные окружения
├── README.md # этот файл
── Plan # заметки / TODO / планы по проекту
├── server/backend # Исходный код: endpoints, pydantic, JWT, отправка паролей на почту, permissions, rate-limiting
├── server/frontend # Странички login, registration, reset-password, main
├── server/database # База данных, alembic, db_balancer(не настроен, как шаблон)
├── server/testing # pytests
├── run.py # Точка входа, запуск приложения
── makefile # Точка входа, запуск приложения и утилит
├── requirements.txt # Зависимости
├── .env # Переменные окружения
└── README.md # Этот файл
```
---
@@ -33,8 +37,8 @@ Pet-проект на стеке **FastAPI + SQLAlchemy (async) + Pydantic + Pyt
```
2. Создать и активировать виртуальное окружение:
```
python3 -m venv venv
source venv/bin/activate # Linux / macOS
python3 -m venv .venv
source .venv/bin/activate # Linux / macOS
venv\Scripts\activate # Windows
```
@@ -52,13 +56,16 @@ ALLOW_HEADERS=*
JWT_SECRET_KEY=your_secret_key
ACCESS_TOKEN_EXPIRE_MINUTES=30
```
5. Запустить приложение:
5. Мигрировать БД:
```
python run.py
make migrate_head
```
6. Запустить приложение:
```
python run.py или вне окружения сразу прописать make run
```
6. Документация OpenAPI будет доступна по адресу:
7. Документация OpenAPI будет доступна по адресу:
```
• Swagger UI → http://localhost:8000/docs
• JSON схема → http://localhost:8000/openapi.json
@@ -93,25 +100,15 @@ username=<email>&password=<пароль>
Запуск всех тестов:
```
pytest
make test
```
Тестируются:
Тестируются (В планах):
```
• CRUD операции
• Pydantic-схемы
• Эндпоинты API
• JWT авторизация
```
✅ TODO
```
• Миграции через Alembic
• Обработка ошибок
• Логирование
• PostgreSQL + Pgpool-II для продакшена
• Разделение схем Pydantic на вход/выход
• Тестирование edge-cases
• Тестирование
```

18
makefile Normal file
View File

@@ -0,0 +1,18 @@
VENV=source ./.venv/bin/activate;
ALEMBIC=alembic -c ./server/database/alembic/alembic.ini
.PHONY: test run migrate_head migrate_down migrate_history migrate_current migrate
test:
$(VENV) pytest -c ./server/testing/pytest.ini ./server/testing/tests/
run:
$(VENV) python run.py
migrate_head:
$(VENV) $(ALEMBIC) upgrade head
migrate_down:
$(VENV) $(ALEMBIC) downgrade -1
migrate_history:
$(VENV) $(ALEMBIC) history
migrate_current:
$(VENV) $(ALEMBIC) current
migrate:
$(VENV) $(ALEMBIC) revision --autogenerate

View File

@@ -6,3 +6,7 @@ greenlet == 3.2.4
passlib == 1.7.4
bcrypt == 4.0.1
python-jose[cryptography] == 3.5.0
alembic == 1.16.5
pytest == 8.4.1
slowapi == 0.1.9
pytest-asyncio == 1.2.0

10
run.py
View File

@@ -3,8 +3,14 @@ from server.backend import endpoints # импортируем FastAPI экзем
import asyncio
from server.database import db
if __name__ == "__main__":
asyncio.run(db.main())
uvicorn.run("server.backend.endpoints:api", host="127.0.0.1", port=8000, reload=True)
uvicorn.run(
"server.backend.endpoints:api",
host="127.0.0.1",
port=8000,
reload=True,
log_level="info",
access_log=True
)
#ps aux | grep uvicorn
# kill -9 pid

View File

@@ -1,4 +1,4 @@
from datetime import datetime, timedelta #jwt
from datetime import datetime, timedelta, timezone #jwt
from jose import JWTError, jwt
from fastapi import HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer
@@ -17,7 +17,7 @@ class Token():
@staticmethod
async def create_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

View File

@@ -3,16 +3,22 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from .rate_limit import limiter, ratelimit_handler
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from pydantic import EmailStr
from . import pydentic, JWT, password
from . import pydentic, JWT, password, permissions
from server.database import db
from datetime import datetime, timedelta
import asyncio
api = FastAPI()
api.state.limiter = limiter
api.add_exception_handler(RateLimitExceeded, ratelimit_handler)
api.add_middleware(SlowAPIMiddleware)
from dotenv import load_dotenv #Работа с env для CORS
import os
load_dotenv()
@@ -28,71 +34,79 @@ api.add_middleware(
allow_headers=headers,
)
@api.middleware("http") #Логирование заходов перед всеми endpoints
async def log_requests(request: Request, call_next):
ip = request.client.host
ua = request.headers.get("user-agent")
method = request.method
url = str(request.url)
print(f"[{method}] {url} from {ip} ({ua})")
response = await call_next(request)
return response
@api.get("/protected")
@api.get("/protected") #test
async def protected(current_user: str = Depends(JWT.current_user)):
return {"msg": f"Hello, {current_user}"}
@api.get("/", response_model=list[pydentic.UsersInfo]) #список!
async def get_all_rows(current_user: str = Depends(JWT.current_user)):
@api.get("/", response_model=list[pydentic.UserOut]) #список!
async def get_all_rows(current_user: str = Depends(JWT.current_user), user=Depends(permissions.check_permission("is_admin"))):
users = await db.get_all_rows()
if not users:
raise HTTPException(status_code=404, detail="No users found")
raise HTTPException(status_code=401, detail="The user isn't found")
return users
@api.get("/get_user_by_email/{email}", response_model=pydentic.UsersInfo)
async def get_user_by_email(email:str, current_user: str = Depends(JWT.current_user)):
@api.get("/get_user_by_email/{email}", response_model=pydentic.UserOut)
async def get_user_by_email(email:str, current_user: str = Depends(JWT.current_user), user=Depends(permissions.check_permission("can_view"))):
user = await db.get_user_by_email(email)
if user:
return user
else:
raise HTTPException(status_code=404, detail="The user isn't found")
@api.post("/user_create", response_model=pydentic.UsersInfo)
raise HTTPException(status_code=401, detail="The user isn't found")
@api.post("/user_create", response_model=pydentic.UserOut)
async def create_user(row:pydentic.CreateUser):
new_row = pydentic.CreateUser(email=row.email, description=row.description, activated = row.activated, password = row.password)
try:
await db.create_user(new_row)
return new_row
@api.delete("/user_delete/{email}", response_model=pydentic.UsersInfo)
async def delete_user(email:str,current_user: str = Depends(JWT.current_user)):
except:
raise HTTPException(status_code=409, detail="User with this email already exists")
user = await db.get_user_by_email(row.email)
return user
@api.delete("/user_delete/{email}", response_model=pydentic.UserOut)
async def delete_user(email:str,current_user: str = Depends(JWT.current_user), user = Depends(permissions.check_permission("can_delete"))):
user = await db.get_user_by_email(email)
if not user:
raise HTTPException(status_code=404, detail="The user isn't found")
raise HTTPException(status_code=401, detail="The user isn't found")
await db.delete_user(email)
return user
@api.put("/user_update/{email}", response_model=pydentic.UsersInfo)
async def update_user(email:str, updated_row: pydentic.UserUpdate, current_user: str = Depends(JWT.current_user)):
user = await db.get_user_by_email(email)
@api.put("/user_update/{email}", response_model=pydentic.UserOut)
async def update_user(email:str, updated_row: pydentic.UserUpdate, current_user: str = Depends(JWT.current_user), user = Depends(permissions.check_permission("can_edit"))):
user = await db.get_user_by_email(email) #user из бд, которого запросили
perm = user.permissions[0] #права по запрошенному user
current = await db.get_user_by_email(current_user) #user из бд по jwt
current_perms = current.permissions[0] #Права по jwt
if not user:
raise HTTPException(status_code=404, detail="The user isn't found")
raise HTTPException(status_code=401, detail="The user isn't found")
changed = False
if updated_row.email is not None and updated_row.email != user.email:
user.email = updated_row.email
#изменение только определенных колонок
updatable_fields = ["email", "description", "activated"]
for field in updatable_fields:
new_value = getattr(updated_row, field)
if new_value is not None and new_value != getattr(user, field):
setattr(user, field, new_value)
changed = True
if updated_row.description is not None and updated_row.description != user.description:
user.description = updated_row.description
changed = True
if updated_row.activated is not None and updated_row.activated != user.activated:
user.activated = updated_row.activated
changed = True
if updated_row.password is not None and updated_row.password != user.password:
# пароль
if updated_row.password:
if not db.verify_password(updated_row.password, user.password):
user.password = updated_row.password
changed = True
# права (только для админа)
if current_perms.is_admin:
perm_fields = ["can_edit", "can_delete", "can_view"]
for field in perm_fields:
new_value = getattr(updated_row, field)
if new_value is not None and new_value != getattr(perm, field):
setattr(perm, field, new_value)
changed = True
if changed:
await db.update_user(user)
else:
pass
user = await db.update_user(email = email, user_info=user, perm_info=perm)
return user
@api.post("/login")
async def login_user(form_data: OAuth2PasswordRequestForm = Depends()):
try:
creds = pydentic.UserLogin(email=form_data.username, password=form_data.password)
except:
raise HTTPException(status_code=422, detail="Email is not a valid email address")
user = await db.login_user(creds)
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
@@ -101,9 +115,8 @@ async def login_user(form_data: OAuth2PasswordRequestForm = Depends()):
timedelta(minutes=JWT.ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}
@api.post("/reset", response_model=pydentic.UsersInfo)
@api.post("/reset", response_model=pydentic.UserOut)
async def reset_user(row:pydentic.UserReset):
user = await db.get_user_by_email(row.email)
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
@@ -112,4 +125,9 @@ async def reset_user(row:pydentic.UserReset):
password.send_password(new_row)
user = await db.reset_user(new_row)
return user
@api.get("/me", response_model=pydentic.UserOut)
async def read_current_user(current_user=Depends(JWT.current_user)):
user = await db.get_user_by_email(current_user)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user

View File

@@ -0,0 +1,30 @@
from fastapi import Depends, HTTPException, status, Path, Request
from . import JWT
from server.database import db
def check_permission(required: str):
async def wrapper(
request: Request,
current_user = Depends(JWT.current_user),
):
requested_email = request.path_params.get("email")
user = await db.get_user_by_email(current_user)
perms = user.permissions[0]
# если админ → разрешено всегда
if perms.is_admin:
return user
# проверяем, что у пользователя есть нужное право
if not getattr(perms, required, False):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"You don't have a permission"
)
# проверяем, что работает только со своим email
if current_user.lower() != requested_email.lower():
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"You can only do this with your own account"
)
return user
return wrapper

View File

@@ -1,9 +1,11 @@
from pydantic import BaseModel, Field, EmailStr, constr,validator
from pydantic import BaseModel, Field, EmailStr, constr, field_validator
from typing import List, Optional
from enum import IntEnum
from datetime import datetime
#Валидация пароля
import re
def check_password_complexity(cls, password):
def check_password_complexity(cls, password): #Проверка пароля на соответствие сложности
if password is None:
return password
if not re.search(r'[A-Za-z]', password):
@@ -20,7 +22,7 @@ class UsersInfo(BaseModel):
description: str = Field(..., description="description of the user")
activated:bool = Field(..., description="Has the user activated their account")
password:constr(min_length=8) = Field(..., description="Password with min 8 chars, letters and digits")
@validator('password')
@field_validator('password')
def password_validator(cls, password):
return check_password_complexity(cls, password)
class CreateUser(UsersInfo):
@@ -30,7 +32,10 @@ class UserUpdate(BaseModel):
description:Optional[str] = Field(None, description="description of the user")
activated:Optional[bool] = Field(None, description="Has the user activated their account")
password:Optional[constr(min_length=8)] = Field(None, description="Password with min 8 chars, letters and digits")
@validator('password')
can_edit:Optional[bool] = Field(None, description="The user can edit something")
can_delete:Optional[bool] = Field(None, description="The user can delete something")
can_view:Optional[bool]=Field(None, description="The user can view something")
@field_validator('password')
def password_validator(cls, password):
return check_password_complexity(cls, password)
class UserLogin(BaseModel):
@@ -39,6 +44,11 @@ class UserLogin(BaseModel):
class UserReset(BaseModel):
email:EmailStr = Field(..., min_length=6, max_length=254, description="user's email")
new_password:constr(min_length=8) = Field(None,description="New_password")
@validator('new_password')
@field_validator('new_password')
def password_validator(cls, new_password):
return check_password_complexity(cls, new_password)
class UserOut(BaseModel):
email:EmailStr
description:str
activated:bool
created_at:datetime

View File

@@ -0,0 +1,15 @@
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.responses import JSONResponse
from fastapi import Request
# создаём limiter с глобальным лимитом
limiter = Limiter(key_func=get_remote_address, default_limits=["10/minute"])
# обработчик ошибок
async def ratelimit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={"detail": "Too many requests, try again later."},
)

View File

View File

@@ -0,0 +1,148 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
#Алембик не умеет работать с асинхронностью!!!
sqlalchemy.url = sqlite:///server/database/DB/example.db
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1 @@
Generic single-database configuration.

View File

@@ -0,0 +1,78 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from server.database.db import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,32 @@
"""empty message
Revision ID: 17250f0912ea
Revises: 4182906fdea3
Create Date: 2025-10-04 16:42:19.812353
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '17250f0912ea'
down_revision: Union[str, Sequence[str], None] = '4182906fdea3'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

View File

@@ -0,0 +1,32 @@
"""empty message
Revision ID: 1ee14fd147d8
Revises: f828091b6f7d
Create Date: 2025-10-05 12:59:47.055178
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '1ee14fd147d8'
down_revision: Union[str, Sequence[str], None] = 'f828091b6f7d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('permissions', sa.Column('is_admin', sa.Boolean(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('permissions', 'is_admin')
# ### end Alembic commands ###

View File

@@ -0,0 +1,45 @@
"""check
Revision ID: 4182906fdea3
Revises:
Create Date: 2025-09-30 14:57:28.113339
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '4182906fdea3'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(length=254), nullable=False),
sa.Column('description', sa.String(), nullable=False),
sa.Column('activated', sa.Boolean(), nullable=True),
sa.Column('password', sa.String(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('last_login', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email')
)
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_users_id'), table_name='users')
op.drop_table('users')
# ### end Alembic commands ###

View File

@@ -0,0 +1,39 @@
"""empty message
Revision ID: 500009136941
Revises: 17250f0912ea
Create Date: 2025-10-04 17:24:48.096744
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '500009136941'
down_revision: Union[str, Sequence[str], None] = '17250f0912ea'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('permissions',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('can_edit', sa.Boolean(), nullable=True),
sa.Column('can_delete', sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('permissions')
# ### end Alembic commands ###

View File

@@ -0,0 +1,32 @@
"""empty message
Revision ID: f828091b6f7d
Revises: 500009136941
Create Date: 2025-10-05 12:11:47.000416
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'f828091b6f7d'
down_revision: Union[str, Sequence[str], None] = '500009136941'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('permissions', sa.Column('can_view', sa.Boolean(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('permissions', 'can_view')
# ### end Alembic commands ###

View File

@@ -1,9 +1,11 @@
import asyncio
from datetime import datetime,timezone
#from sqlalchemy import create_engine #Не async
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from sqlalchemy.orm import DeclarativeBase, sessionmaker, relationship,selectinload
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy import Column, Integer, String, Boolean, select
from sqlalchemy import Column, Integer, String, Boolean, select,func, DateTime, ForeignKey, event
from pathlib import Path
db_folder = Path(__file__).parent / "DB"
@@ -12,6 +14,7 @@ db_path = db_folder / "example.db"
async_engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", echo=True)
#sqlite+aiosqlite — тип БД + async-драйвер ///example.db — путь к файлу (три слэша, если путь относительный; четыре, если абсолютный
#async_engine = create_async_engine( "postgresql+asyncpg://user:pass@host:5432/mydb", echo=True) #Можно указать Pgpool-II для psql или proxysql для mysql mariadb
from passlib.context import CryptContext
#Hash password
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@@ -33,10 +36,26 @@ class User(Base):
description = Column(String, nullable=False)
activated = Column(Boolean, default=False)
password = Column(String, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
last_login = Column(DateTime(timezone=True))
# связь с Permission
permissions = relationship(
"Permission", back_populates="user", cascade="all, delete-orphan" #back populates = backref, только пишется в обоих классах с которыми связь
)#cascade - если ты удаляешь юзера, SQLAlchemy автоматом удалит все его пермишены.Если ты отцепил пермишен от юзера — он считается “сиротой” и тоже удаляется.
class Permission(Base):
__tablename__ = "permissions"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
can_edit = Column(Boolean, default=True)
can_delete = Column(Boolean, default=True)
can_view = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
# обратная связь к User
user = relationship("User", back_populates="permissions")
async def init_db():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def create_user(user_info):
async with AsyncSessionLocal() as session:
new_user = User(email=user_info.email, description=user_info.description, activated=user_info.activated, password=hash_password(user_info.password))
@@ -45,7 +64,7 @@ async def create_user(user_info):
await session.refresh(new_user)
async def get_user_by_email(email):
async with AsyncSessionLocal() as session:
result = await session.execute(select(User).where(User.email==email))
result = await session.execute(select(User).options(selectinload(User.permissions)).where(User.email==email))
user = result.scalar_one_or_none()
return user
async def get_all_rows():
@@ -53,16 +72,20 @@ async def get_all_rows():
result = await session.execute(select(User))
users = result.scalars().all()
return users
async def update_user(user_info):
async def update_user(email,user_info, perm_info):
async with AsyncSessionLocal() as session:
result = await session.execute(select(User).where(User.id==user_info.id))
result = await session.execute(select(User).options(selectinload(User.permissions)).where(User.email==email))
user = result.scalar_one_or_none()
if user:
user.email = user_info.email
user.description = user_info.description
user.activated = user_info.activated
user.password = hash_password(user_info.password)
perm = user.permissions[0] # если у юзера одна запись - Это связь один-ко-многим: у одного User может быть список из нескольких Permission.
perm.can_edit = perm_info.can_edit
perm.can_delete = perm_info.can_delete
await session.commit()
return user
async def delete_user(email):
async with AsyncSessionLocal() as session:
result = await session.execute(select(User).where(User.email==email))
@@ -75,6 +98,8 @@ async def login_user(user_info):
result = await session.execute(select(User).where(User.email == user_info.email))
user = result.scalar_one_or_none()
if user and verify_password(user_info.password, user.password):
user.last_login=datetime.now(timezone.utc)
await session.commit()
return user
return None
async def reset_user(user_info):
@@ -86,5 +111,8 @@ async def reset_user(user_info):
await session.commit()
return user
return None
async def main():
await init_db()
@event.listens_for(User, "after_insert") #listener не работает в async
def create_permission(mapper, connection, target):
connection.execute(
Permission.__table__.insert().values(user_id=target.id)
)

View File

@@ -12,12 +12,16 @@
<form id="logoutForm">
<button type="submit">Logout</button>
</form>
<h2>data</h2>
<form action="#" method="POST">
<p>Data</p>
<h2 id ="Account_Email">data</h2>
<form>
<p id = "Account_Description">Data</p>
<p id = "Account_Activated">Data</p>
<p id = "Account_Created">Data</p>
</form>
</div>
</div>
<script src="js.js"></script>
<script type="module" src="profile.js"></script>
</body>
</html>

View File

@@ -0,0 +1,45 @@
async function loadUser() {
try {
const token = getToken();
const response = await fetch("http://localhost:8000/me", {
method: "GET",
headers: {
"Authorization": `Bearer ${token}`
}
});
if (!response.ok) {
throw new Error("Unauthorized");
}
const data = await response.json();
console.log("User info:", data);
show_data(data)
} catch (err) {
showError(["Connection error"]);
}
}
loadUser();
function show_data(data) {
const emailElem = document.getElementById('Account_Email');
if (emailElem) {
emailElem.textContent = data.email;
}
const descElem = document.getElementById('Account_Description');
if (descElem) {
descElem.textContent = data.description || "—";
}
const activatedElem = document.getElementById('Account_Activated');
if (activatedElem) {
activatedElem.textContent = `Active: ${data.activated ? "Yes" : "No"}`;
}
const createdElem = document.getElementById('Account_Created');
if (createdElem) {
// красиво обрезать дату
createdElem.textContent = new Date(data.created_at).toLocaleString();
}
}

View File

View File

@@ -0,0 +1,9 @@
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from server.backend.endpoints import api
@pytest_asyncio.fixture
async def client():
transport = ASGITransport(app=api)#подключение к FastAPI-приложению напрямую
async with AsyncClient(transport=transport, base_url="http://test") as ac: #Имитирует подключение в это приложение, как будто по сети
yield ac #чтобы вернуть значение, но не завершить функцию.

View File

@@ -0,0 +1,8 @@
[pytest]
minversion = 6.0
testpaths = tests
python_files = test_*.py *_test.py
python_functions = test_*
addopts =
-vv
--import-mode=importlib

View File

@@ -0,0 +1,39 @@
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi import status
from server.backend import JWT # твой JWT модуль
from server.backend.endpoints import api
from server.testing.conftest import client
@pytest.mark.asyncio
async def test_protected_unauthorized(client):
"Проверка: без токена — 401"
response = await client.get("/protected") #отправляется GET /protected без токена
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_protected_invalidtoken(client):
"Проверка: Токена неверный - 401"
response = await client.get("/protected", headers={"Authorization": "Invalid token"})
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_protected_authorized(client):
"Проверка: с токеном — ответ с Hello"
async def fake_current_user(): #фейковая зависимость (fake_current_user) — функция, которая имитирует поведение настоящего JWT.current_user, но просто возвращает строку "test_user".
return "test_user"
# подменяем зависимость
api.dependency_overrides[JWT.current_user] = fake_current_user #FastAPI позволяет временно подменять зависимости (через Depends).
response = await client.get("/protected", headers={"Authorization": "Bearer faketoken"})
assert response.status_code == status.HTTP_200_OK
assert response.json() == {"msg": "Hello, test_user"}
api.dependency_overrides.clear() #после теста очистка оверрайда, чтобы не повлиять на другие тесты.

View File

View File

@@ -0,0 +1,149 @@
import pytest
import uuid #для генерации рандомного uuid
from httpx import AsyncClient, ASGITransport
from fastapi import status
from server.backend import JWT # твой JWT модуль
from server.backend.endpoints import api
from server.testing.conftest import client
from server.database import db
@pytest.fixture(scope="session")
def global_email():
"""Один и тот же email для всех тестов в сессии"""
return f"user_{uuid.uuid4().hex[:8]}@d.d"
@pytest.mark.asyncio
@pytest.mark.parametrize("email,password,expected_status", [
("valid", "123qwe!@#", 200),
("invalidemail.com", "123qwe!@#", 422),
("invalidpassword", "1234", 422),
("invalidall", "1234", 422)
])
async def test_create_user(client, email, password, expected_status, global_email):
'''Проверка: Создание пользователя'''
if email == "valid":
email = global_email #Подставка глобальной фикстуры под конкретный вариант
user_data = {
"email": email,
"description": "test_user",
"activated": "True",
"password": password
}
response = await client.post(f"/user_create", json=user_data)
assert response.status_code == expected_status
@pytest.mark.asyncio
@pytest.mark.parametrize("is_admin,expected_status", [
(False, 403),
(True, 200)
])
async def test_is_admin(client, monkeypatch, global_email, is_admin, expected_status):
"""Проверка: Прав пользователя + обновление"""
class FakePerm:
def __init__(self, is_admin): #Подменяем права пользователя
self.is_admin = is_admin
self.can_edit = False
self.can_delete = False
self.can_view = False
class FakeUser:
def __init__(self, is_admin):
self.email = global_email
self.permissions = [FakePerm(is_admin)]
self.description = "test_user"
self.activated = True
self.password = "123qwe!@#"
async def fake_get_user_by_email(email): #Подмена функции get_user_by_email
return FakeUser(is_admin)
monkeypatch.setattr(db, "get_user_by_email", fake_get_user_by_email)
async def fake_current_user():
return global_email
api.dependency_overrides[JWT.current_user] = fake_current_user
response = await client.put(
f"/user_update/{global_email}",
headers={"Authorization": "Bearer faketoken"},
json={"can_edit": True,
"can_delete": True,
"can_view": True,
"description": "1112322"
},
)
assert response.status_code == expected_status
api.dependency_overrides.clear()
@pytest.mark.asyncio
@pytest.mark.parametrize("email,password,expected_status", [
("valid", "123qwe!@#", 200),
("invalidemail.com", "123qwe!@#", 422),
("invalidpassword", "1234", 422),
("invalidall", "1234", 422)
])
async def test_login_user(client, email, password, expected_status, global_email):
'''Проверка: Логин пользователя'''
if email == "valid":
email = global_email #Подставка глобальной фикстуры под конкретный вариант
user_data = {
"username": email,
"password": password
}
response = await client.post(f"/login", data=user_data)
assert response.status_code == expected_status
@pytest.mark.parametrize("is_admin, can_delete,expected_status", [
(False,True,200),
(True,True,200),
(True,False,200),
(False, False,403)
])
@pytest.mark.asyncio
async def test_delete_user(client, global_email, monkeypatch, is_admin, expected_status, can_delete):
"""Проверка: Удаление пользователя"""
class FakePerm:
def __init__(self, is_admin):
self.is_admin = is_admin
self.can_edit = False
self.can_delete = can_delete
self.can_view = False
class FakeUser:
def __init__(self, is_admin):
self.email = global_email
self.permissions = [FakePerm(is_admin)]
self.description = "test_user"
self.activated = True
self.password = "123qwe!@#"
# добавляем обязательные поля из модели UserOut
self.created_at = "2025-10-12T00:00:00Z"
self.updated_at = "2025-10-12T00:00:00Z"
self.last_login = None
async def fake_get_user_by_email(email):
return FakeUser(is_admin)
monkeypatch.setattr(db, "get_user_by_email", fake_get_user_by_email)
async def fake_current_user():
return global_email
api.dependency_overrides[JWT.current_user] = fake_current_user
response = await client.delete(
f"/user_delete/{global_email}",
headers={"Authorization": "Bearer faketoken"}
)
assert response.status_code == expected_status
api.dependency_overrides.clear()