23 Commits

Author SHA1 Message Date
94d3956da8 final 2025-10-18 17:34:59 +03:00
903cb7c930 pytest 1.7 2025-10-12 18:59:42 +03:00
c015a25c81 pytest 1.6 2025-10-12 18:35:10 +03:00
bc5360d9f6 pytest 1.5 2025-10-12 18:30:20 +03:00
1423ca9e68 pytest 1.4 2025-10-12 18:18:30 +03:00
52fb856307 pytest 1.3 2025-10-12 18:17:20 +03:00
71ab99232b readme 3.7 2025-10-12 16:41:44 +03:00
d0a13e3863 pytest 1.2 2025-10-12 16:39:15 +03:00
a4f9af3e2d tests 2025-10-12 15:00:23 +03:00
a2aadc82b7 Update readme 3.6 2025-10-05 16:13:13 +03:00
701bf5f603 user information 2025-10-05 15:55:19 +03:00
b3aaf04024 rate_limit slowapi 2025-10-05 15:15:32 +03:00
f7b39da624 permissions 1.1 and hash_password issue solve 2025-10-05 14:43:25 +03:00
6db14b2329 makefile 1.1 and permissions 1.0 2025-10-05 02:09:28 +03:00
46c6e1cd94 pytest 1.0 and makefile 2025-10-03 14:47:12 +03:00
1c7ebdcc2e alembic 1.2 2025-10-01 12:24:49 +03:00
28a853c865 alembic 1.1 2025-10-01 12:24:09 +03:00
5deffdfcf1 alembic 1.0 2025-09-30 15:27:13 +03:00
e7a9b0a2f7 db dates 2025-09-30 11:22:20 +03:00
7d2e290fb6 error exceptions and columns of time 2025-09-28 18:09:08 +03:00
e18e16bde2 uvicorn logging instead middleware 2025-09-28 16:50:49 +03:00
71116de157 pydantic out model and comments 2025-09-28 16:39:54 +03:00
fe7fb0971d http midleware 2025-09-28 15:34:42 +03:00
31 changed files with 921 additions and 100 deletions

1
.gitignore vendored
View File

@@ -7,6 +7,7 @@ __pycache__/
*.pyc *.pyc
*.pyo *.pyo
*.pyd *.pyd
*.pytest_cache
# IDE и редакторы # IDE и редакторы
.vscode/ .vscode/

7
Plan
View File

@@ -1,7 +0,0 @@
###Приложение для мониторинга серверов###
#План: сделать сайт для отображения информации о системных компонентах
#Сделать авторизацию на нем с валидацией pydantic и создание пользователя с fastapi + sqlalchemy
#создать простую страничку с текстом по разным серверам
#придумать способ получать эти данные с серверов или хотяб со своей машины
#отправлять их так же вначале в fast-api, а он в бд и выводить будет на сайте
#Уведомлять по тг, если значения равын чему то

View File

@@ -8,12 +8,16 @@ Pet-проект на стеке **FastAPI + SQLAlchemy (async) + Pydantic + Pyt
## 📂 Структура проекта ## 📂 Структура проекта
``` ```
/ /
├── server/ # исходный код: модели, маршруты, JWT, базы данных ├── server/backend # Исходный код: endpoints, pydantic, JWT, отправка паролей на почту, permissions, rate-limiting
├── run.py # точка входа, запуск приложения ├── server/frontend # Странички login, registration, reset-password, main
├── requirements.txt # зависимости ├── server/database # База данных, alembic, db_balancer(не настроен, как шаблон)
├── .env # переменные окружения ├── server/testing # pytests
├── README.md # этот файл ├── run.py # Точка входа, запуск приложения
── Plan # заметки / TODO / планы по проекту ── makefile # Точка входа, запуск приложения и утилит
├── requirements.txt # Зависимости
├── .env # Переменные окружения
└── README.md # Этот файл
``` ```
--- ---
@@ -33,8 +37,8 @@ Pet-проект на стеке **FastAPI + SQLAlchemy (async) + Pydantic + Pyt
``` ```
2. Создать и активировать виртуальное окружение: 2. Создать и активировать виртуальное окружение:
``` ```
python3 -m venv venv python3 -m venv .venv
source venv/bin/activate # Linux / macOS source .venv/bin/activate # Linux / macOS
venv\Scripts\activate # Windows venv\Scripts\activate # Windows
``` ```
@@ -52,13 +56,16 @@ ALLOW_HEADERS=*
JWT_SECRET_KEY=your_secret_key JWT_SECRET_KEY=your_secret_key
ACCESS_TOKEN_EXPIRE_MINUTES=30 ACCESS_TOKEN_EXPIRE_MINUTES=30
``` ```
5. Мигрировать БД:
5. Запустить приложение:
``` ```
python run.py make migrate_head
```
6. Запустить приложение:
```
python run.py или вне окружения сразу прописать make run
``` ```
6. Документация OpenAPI будет доступна по адресу: 7. Документация OpenAPI будет доступна по адресу:
``` ```
• Swagger UI → http://localhost:8000/docs • Swagger UI → http://localhost:8000/docs
• JSON схема → http://localhost:8000/openapi.json • JSON схема → http://localhost:8000/openapi.json
@@ -93,25 +100,15 @@ username=<email>&password=<пароль>
Запуск всех тестов: Запуск всех тестов:
``` ```
pytest make test
``` ```
Тестируются: Тестируются (В планах):
``` ```
• CRUD операции • CRUD операции
• Pydantic-схемы • Pydantic-схемы
• Эндпоинты API • Эндпоинты API
• JWT авторизация • JWT авторизация
``` • Тестирование
✅ TODO
```
• Миграции через Alembic
• Обработка ошибок
• Логирование
• PostgreSQL + Pgpool-II для продакшена
• Разделение схем Pydantic на вход/выход
• Тестирование edge-cases
``` ```

18
makefile Normal file
View File

@@ -0,0 +1,18 @@
VENV=source ./.venv/bin/activate;
ALEMBIC=alembic -c ./server/database/alembic/alembic.ini
.PHONY: test run migrate_head migrate_down migrate_history migrate_current migrate
test:
$(VENV) pytest -c ./server/testing/pytest.ini ./server/testing/tests/
run:
$(VENV) python run.py
migrate_head:
$(VENV) $(ALEMBIC) upgrade head
migrate_down:
$(VENV) $(ALEMBIC) downgrade -1
migrate_history:
$(VENV) $(ALEMBIC) history
migrate_current:
$(VENV) $(ALEMBIC) current
migrate:
$(VENV) $(ALEMBIC) revision --autogenerate

View File

@@ -6,3 +6,7 @@ greenlet == 3.2.4
passlib == 1.7.4 passlib == 1.7.4
bcrypt == 4.0.1 bcrypt == 4.0.1
python-jose[cryptography] == 3.5.0 python-jose[cryptography] == 3.5.0
alembic == 1.16.5
pytest == 8.4.1
slowapi == 0.1.9
pytest-asyncio == 1.2.0

10
run.py
View File

@@ -3,8 +3,14 @@ from server.backend import endpoints # импортируем FastAPI экзем
import asyncio import asyncio
from server.database import db from server.database import db
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(db.main()) uvicorn.run(
uvicorn.run("server.backend.endpoints:api", host="127.0.0.1", port=8000, reload=True) "server.backend.endpoints:api",
host="127.0.0.1",
port=8000,
reload=True,
log_level="info",
access_log=True
)
#ps aux | grep uvicorn #ps aux | grep uvicorn
# kill -9 pid # kill -9 pid

View File

@@ -1,4 +1,4 @@
from datetime import datetime, timedelta #jwt from datetime import datetime, timedelta, timezone #jwt
from jose import JWTError, jwt from jose import JWTError, jwt
from fastapi import HTTPException, Depends, status from fastapi import HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
@@ -17,7 +17,7 @@ class Token():
@staticmethod @staticmethod
async def create_token(data: dict, expires_delta: timedelta | None = None): async def create_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy() to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15)) expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire}) to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt return encoded_jwt

View File

@@ -1,15 +1,24 @@
from fastapi import FastAPI, HTTPException, status, Depends from fastapi import FastAPI, HTTPException, status, Depends, Request
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordRequestForm
from . import pydentic, JWT, password
from datetime import datetime, timedelta from .rate_limit import limiter, ratelimit_handler
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from pydantic import EmailStr from pydantic import EmailStr
from . import pydentic, JWT, password, permissions
from server.database import db from server.database import db
from datetime import datetime, timedelta
import asyncio import asyncio
api = FastAPI() api = FastAPI()
api.state.limiter = limiter
api.add_exception_handler(RateLimitExceeded, ratelimit_handler)
api.add_middleware(SlowAPIMiddleware)
from dotenv import load_dotenv #Работа с env для CORS from dotenv import load_dotenv #Работа с env для CORS
import os import os
load_dotenv() load_dotenv()
@@ -25,72 +34,88 @@ api.add_middleware(
allow_headers=headers, allow_headers=headers,
) )
@api.get("/protected") @api.get("/protected") #test
async def protected(current_user: str = Depends(JWT.current_user)): async def protected(current_user: str = Depends(JWT.current_user)):
return {"msg": f"Hello, {current_user}"} return {"msg": f"Hello, {current_user}"}
@api.get("/", response_model=pydentic.CreateUser) @api.get("/", response_model=list[pydentic.UserOut]) #список!
async def get_all_rows(current_user: str = Depends(JWT.current_user)): async def get_all_rows(current_user: str = Depends(JWT.current_user), user=Depends(permissions.check_permission("is_admin"))):
for row in await db.get_all_rows(): users = await db.get_all_rows()
if row: if not users:
return row raise HTTPException(status_code=401, detail="The user isn't found")
else: return users
raise HTTPException(status_code=404, detail="The user isn't found") @api.get("/get_user_by_email/{email}", response_model=pydentic.UserOut)
@api.get("/get_user_by_email/{email}", response_model=pydentic.CreateUser) async def get_user_by_email(email:str, current_user: str = Depends(JWT.current_user), user=Depends(permissions.check_permission("can_view"))):
async def get_user_by_email(email:str, current_user: str = Depends(JWT.current_user)):
user = await db.get_user_by_email(email) user = await db.get_user_by_email(email)
if user: if user:
return user return user
else: else:
raise HTTPException(status_code=404, detail="The user isn't found") raise HTTPException(status_code=401, detail="The user isn't found")
@api.post("/user_create", response_model=pydentic.UsersInfo) @api.post("/user_create", response_model=pydentic.UserOut)
async def create_user(row:pydentic.CreateUser): async def create_user(row:pydentic.CreateUser):
new_row = pydentic.CreateUser(email=row.email, description=row.description, activated = row.activated, password = row.password) new_row = pydentic.CreateUser(email=row.email, description=row.description, activated = row.activated, password = row.password)
try:
await db.create_user(new_row) await db.create_user(new_row)
return new_row except:
@api.delete("/user_delete/{email}", response_model=pydentic.UsersInfo) raise HTTPException(status_code=409, detail="User with this email already exists")
async def delete_user(email:str,current_user: str = Depends(JWT.current_user)): user = await db.get_user_by_email(row.email)
return user
@api.delete("/user_delete/{email}", response_model=pydentic.UserOut)
async def delete_user(email:str,current_user: str = Depends(JWT.current_user), user = Depends(permissions.check_permission("can_delete"))):
user = await db.get_user_by_email(email) user = await db.get_user_by_email(email)
if not user: if not user:
raise HTTPException(status_code=404, detail="The user isn't found") raise HTTPException(status_code=401, detail="The user isn't found")
await db.delete_user(email) await db.delete_user(email)
return user return user
@api.put("/user_update/{email}", response_model=pydentic.UsersInfo) @api.put("/user_update/{email}", response_model=pydentic.UserOut)
async def update_user(email:str, updated_row: pydentic.UserUpdate, current_user: str = Depends(JWT.current_user)): async def update_user(email:str, updated_row: pydentic.UserUpdate, current_user: str = Depends(JWT.current_user), user = Depends(permissions.check_permission("can_edit"))):
user = await db.get_user_by_email(email) user = await db.get_user_by_email(email) #user из бд, которого запросили
perm = user.permissions[0] #права по запрошенному user
current = await db.get_user_by_email(current_user) #user из бд по jwt
current_perms = current.permissions[0] #Права по jwt
if not user: if not user:
raise HTTPException(status_code=404, detail="The user isn't found") raise HTTPException(status_code=401, detail="The user isn't found")
changed = False changed = False
if updated_row.email is not None and updated_row.email != user.email: #изменение только определенных колонок
user.email = updated_row.email updatable_fields = ["email", "description", "activated"]
for field in updatable_fields:
new_value = getattr(updated_row, field)
if new_value is not None and new_value != getattr(user, field):
setattr(user, field, new_value)
changed = True changed = True
if updated_row.description is not None and updated_row.description != user.description: # пароль
user.description = updated_row.description if updated_row.password:
changed = True if not db.verify_password(updated_row.password, user.password):
if updated_row.activated is not None and updated_row.activated != user.activated:
user.activated = updated_row.activated
changed = True
if updated_row.password is not None and updated_row.password != user.password:
user.password = updated_row.password user.password = updated_row.password
changed = True changed = True
# права (только для админа)
if current_perms.is_admin:
perm_fields = ["can_edit", "can_delete", "can_view"]
for field in perm_fields:
new_value = getattr(updated_row, field)
if new_value is not None and new_value != getattr(perm, field):
setattr(perm, field, new_value)
changed = True
if changed: if changed:
await db.update_user(user) user = await db.update_user(email = email, user_info=user, perm_info=perm)
else:
pass
return user return user
@api.post("/login") @api.post("/login")
async def login_user(form_data: OAuth2PasswordRequestForm = Depends()): async def login_user(form_data: OAuth2PasswordRequestForm = Depends()):
try:
creds = pydentic.UserLogin(email=form_data.username, password=form_data.password) creds = pydentic.UserLogin(email=form_data.username, password=form_data.password)
except:
raise HTTPException(status_code=422, detail="Email is not a valid email address")
user = await db.login_user(creds) user = await db.login_user(creds)
if not user: if not user:
raise HTTPException(status_code=401, detail="The user isn't found") raise HTTPException(status_code=401, detail="The user isn't found")
access_token = await JWT.AccessToken.create( access_token = await JWT.AccessToken.create(
{"sub": user.email}, {"sub": user.email},
timedelta(minutes=JWT.ACCESS_TOKEN_EXPIRE_MINUTES) timedelta(minutes=JWT.ACCESS_TOKEN_EXPIRE_MINUTES)
) )
return {"access_token": access_token, "token_type": "bearer"} return {"access_token": access_token, "token_type": "bearer"}
@api.post("/reset", response_model=pydentic.UsersInfo) @api.post("/reset", response_model=pydentic.UserOut)
async def reset_user(row:pydentic.UserReset): async def reset_user(row:pydentic.UserReset):
user = await db.get_user_by_email(row.email) user = await db.get_user_by_email(row.email)
if not user: if not user:
@@ -100,4 +125,9 @@ async def reset_user(row:pydentic.UserReset):
password.send_password(new_row) password.send_password(new_row)
user = await db.reset_user(new_row) user = await db.reset_user(new_row)
return user return user
@api.get("/me", response_model=pydentic.UserOut)
async def read_current_user(current_user=Depends(JWT.current_user)):
user = await db.get_user_by_email(current_user)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user

View File

@@ -0,0 +1,30 @@
from fastapi import Depends, HTTPException, status, Path, Request
from . import JWT
from server.database import db
def check_permission(required: str):
async def wrapper(
request: Request,
current_user = Depends(JWT.current_user),
):
requested_email = request.path_params.get("email")
user = await db.get_user_by_email(current_user)
perms = user.permissions[0]
# если админ → разрешено всегда
if perms.is_admin:
return user
# проверяем, что у пользователя есть нужное право
if not getattr(perms, required, False):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"You don't have a permission"
)
# проверяем, что работает только со своим email
if current_user.lower() != requested_email.lower():
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"You can only do this with your own account"
)
return user
return wrapper

View File

@@ -1,9 +1,11 @@
from pydantic import BaseModel, Field, EmailStr, constr,validator from pydantic import BaseModel, Field, EmailStr, constr, field_validator
from typing import List, Optional from typing import List, Optional
from enum import IntEnum from enum import IntEnum
from datetime import datetime
#Валидация пароля #Валидация пароля
import re import re
def check_password_complexity(cls, password): def check_password_complexity(cls, password): #Проверка пароля на соответствие сложности
if password is None: if password is None:
return password return password
if not re.search(r'[A-Za-z]', password): if not re.search(r'[A-Za-z]', password):
@@ -20,7 +22,7 @@ class UsersInfo(BaseModel):
description: str = Field(..., description="description of the user") description: str = Field(..., description="description of the user")
activated:bool = Field(..., description="Has the user activated their account") activated:bool = Field(..., description="Has the user activated their account")
password:constr(min_length=8) = Field(..., description="Password with min 8 chars, letters and digits") password:constr(min_length=8) = Field(..., description="Password with min 8 chars, letters and digits")
@validator('password') @field_validator('password')
def password_validator(cls, password): def password_validator(cls, password):
return check_password_complexity(cls, password) return check_password_complexity(cls, password)
class CreateUser(UsersInfo): class CreateUser(UsersInfo):
@@ -30,7 +32,10 @@ class UserUpdate(BaseModel):
description:Optional[str] = Field(None, description="description of the user") description:Optional[str] = Field(None, description="description of the user")
activated:Optional[bool] = Field(None, description="Has the user activated their account") activated:Optional[bool] = Field(None, description="Has the user activated their account")
password:Optional[constr(min_length=8)] = Field(None, description="Password with min 8 chars, letters and digits") password:Optional[constr(min_length=8)] = Field(None, description="Password with min 8 chars, letters and digits")
@validator('password') can_edit:Optional[bool] = Field(None, description="The user can edit something")
can_delete:Optional[bool] = Field(None, description="The user can delete something")
can_view:Optional[bool]=Field(None, description="The user can view something")
@field_validator('password')
def password_validator(cls, password): def password_validator(cls, password):
return check_password_complexity(cls, password) return check_password_complexity(cls, password)
class UserLogin(BaseModel): class UserLogin(BaseModel):
@@ -39,6 +44,11 @@ class UserLogin(BaseModel):
class UserReset(BaseModel): class UserReset(BaseModel):
email:EmailStr = Field(..., min_length=6, max_length=254, description="user's email") email:EmailStr = Field(..., min_length=6, max_length=254, description="user's email")
new_password:constr(min_length=8) = Field(None,description="New_password") new_password:constr(min_length=8) = Field(None,description="New_password")
@validator('new_password') @field_validator('new_password')
def password_validator(cls, new_password): def password_validator(cls, new_password):
return check_password_complexity(cls, new_password) return check_password_complexity(cls, new_password)
class UserOut(BaseModel):
email:EmailStr
description:str
activated:bool
created_at:datetime

View File

@@ -0,0 +1,15 @@
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.responses import JSONResponse
from fastapi import Request
# создаём limiter с глобальным лимитом
limiter = Limiter(key_func=get_remote_address, default_limits=["10/minute"])
# обработчик ошибок
async def ratelimit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={"detail": "Too many requests, try again later."},
)

View File

View File

@@ -0,0 +1,148 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
#Алембик не умеет работать с асинхронностью!!!
sqlalchemy.url = sqlite:///server/database/DB/example.db
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1 @@
Generic single-database configuration.

View File

@@ -0,0 +1,78 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from server.database.db import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,32 @@
"""empty message
Revision ID: 17250f0912ea
Revises: 4182906fdea3
Create Date: 2025-10-04 16:42:19.812353
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '17250f0912ea'
down_revision: Union[str, Sequence[str], None] = '4182906fdea3'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

View File

@@ -0,0 +1,32 @@
"""empty message
Revision ID: 1ee14fd147d8
Revises: f828091b6f7d
Create Date: 2025-10-05 12:59:47.055178
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '1ee14fd147d8'
down_revision: Union[str, Sequence[str], None] = 'f828091b6f7d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('permissions', sa.Column('is_admin', sa.Boolean(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('permissions', 'is_admin')
# ### end Alembic commands ###

View File

@@ -0,0 +1,45 @@
"""check
Revision ID: 4182906fdea3
Revises:
Create Date: 2025-09-30 14:57:28.113339
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '4182906fdea3'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('email', sa.String(length=254), nullable=False),
sa.Column('description', sa.String(), nullable=False),
sa.Column('activated', sa.Boolean(), nullable=True),
sa.Column('password', sa.String(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('last_login', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email')
)
op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_users_id'), table_name='users')
op.drop_table('users')
# ### end Alembic commands ###

View File

@@ -0,0 +1,39 @@
"""empty message
Revision ID: 500009136941
Revises: 17250f0912ea
Create Date: 2025-10-04 17:24:48.096744
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '500009136941'
down_revision: Union[str, Sequence[str], None] = '17250f0912ea'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('permissions',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('can_edit', sa.Boolean(), nullable=True),
sa.Column('can_delete', sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('permissions')
# ### end Alembic commands ###

View File

@@ -0,0 +1,32 @@
"""empty message
Revision ID: f828091b6f7d
Revises: 500009136941
Create Date: 2025-10-05 12:11:47.000416
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'f828091b6f7d'
down_revision: Union[str, Sequence[str], None] = '500009136941'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('permissions', sa.Column('can_view', sa.Boolean(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('permissions', 'can_view')
# ### end Alembic commands ###

View File

@@ -1,9 +1,11 @@
import asyncio import asyncio
from datetime import datetime,timezone
#from sqlalchemy import create_engine #Не async #from sqlalchemy import create_engine #Не async
from sqlalchemy.orm import DeclarativeBase, sessionmaker from sqlalchemy.orm import DeclarativeBase, sessionmaker, relationship,selectinload
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy import Column, Integer, String, Boolean, select from sqlalchemy import Column, Integer, String, Boolean, select,func, DateTime, ForeignKey, event
from pathlib import Path from pathlib import Path
db_folder = Path(__file__).parent / "DB" db_folder = Path(__file__).parent / "DB"
@@ -12,6 +14,7 @@ db_path = db_folder / "example.db"
async_engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", echo=True) async_engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", echo=True)
#sqlite+aiosqlite — тип БД + async-драйвер ///example.db — путь к файлу (три слэша, если путь относительный; четыре, если абсолютный #sqlite+aiosqlite — тип БД + async-драйвер ///example.db — путь к файлу (три слэша, если путь относительный; четыре, если абсолютный
#async_engine = create_async_engine( "postgresql+asyncpg://user:pass@host:5432/mydb", echo=True) #Можно указать Pgpool-II для psql или proxysql для mysql mariadb #async_engine = create_async_engine( "postgresql+asyncpg://user:pass@host:5432/mydb", echo=True) #Можно указать Pgpool-II для psql или proxysql для mysql mariadb
from passlib.context import CryptContext from passlib.context import CryptContext
#Hash password #Hash password
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@@ -33,10 +36,26 @@ class User(Base):
description = Column(String, nullable=False) description = Column(String, nullable=False)
activated = Column(Boolean, default=False) activated = Column(Boolean, default=False)
password = Column(String, nullable=False) password = Column(String, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
last_login = Column(DateTime(timezone=True))
# связь с Permission
permissions = relationship(
"Permission", back_populates="user", cascade="all, delete-orphan" #back populates = backref, только пишется в обоих классах с которыми связь
)#cascade - если ты удаляешь юзера, SQLAlchemy автоматом удалит все его пермишены.Если ты отцепил пермишен от юзера — он считается “сиротой” и тоже удаляется.
class Permission(Base):
__tablename__ = "permissions"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
can_edit = Column(Boolean, default=True)
can_delete = Column(Boolean, default=True)
can_view = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
# обратная связь к User
user = relationship("User", back_populates="permissions")
async def init_db():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def create_user(user_info): async def create_user(user_info):
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
new_user = User(email=user_info.email, description=user_info.description, activated=user_info.activated, password=hash_password(user_info.password)) new_user = User(email=user_info.email, description=user_info.description, activated=user_info.activated, password=hash_password(user_info.password))
@@ -45,7 +64,7 @@ async def create_user(user_info):
await session.refresh(new_user) await session.refresh(new_user)
async def get_user_by_email(email): async def get_user_by_email(email):
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
result = await session.execute(select(User).where(User.email==email)) result = await session.execute(select(User).options(selectinload(User.permissions)).where(User.email==email))
user = result.scalar_one_or_none() user = result.scalar_one_or_none()
return user return user
async def get_all_rows(): async def get_all_rows():
@@ -53,16 +72,20 @@ async def get_all_rows():
result = await session.execute(select(User)) result = await session.execute(select(User))
users = result.scalars().all() users = result.scalars().all()
return users return users
async def update_user(user_info): async def update_user(email,user_info, perm_info):
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
result = await session.execute(select(User).where(User.id==user_info.id)) result = await session.execute(select(User).options(selectinload(User.permissions)).where(User.email==email))
user = result.scalar_one_or_none() user = result.scalar_one_or_none()
if user: if user:
user.email = user_info.email user.email = user_info.email
user.description = user_info.description user.description = user_info.description
user.activated = user_info.activated user.activated = user_info.activated
user.password = hash_password(user_info.password) user.password = hash_password(user_info.password)
perm = user.permissions[0] # если у юзера одна запись - Это связь один-ко-многим: у одного User может быть список из нескольких Permission.
perm.can_edit = perm_info.can_edit
perm.can_delete = perm_info.can_delete
await session.commit() await session.commit()
return user
async def delete_user(email): async def delete_user(email):
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
result = await session.execute(select(User).where(User.email==email)) result = await session.execute(select(User).where(User.email==email))
@@ -75,6 +98,8 @@ async def login_user(user_info):
result = await session.execute(select(User).where(User.email == user_info.email)) result = await session.execute(select(User).where(User.email == user_info.email))
user = result.scalar_one_or_none() user = result.scalar_one_or_none()
if user and verify_password(user_info.password, user.password): if user and verify_password(user_info.password, user.password):
user.last_login=datetime.now(timezone.utc)
await session.commit()
return user return user
return None return None
async def reset_user(user_info): async def reset_user(user_info):
@@ -86,5 +111,8 @@ async def reset_user(user_info):
await session.commit() await session.commit()
return user return user
return None return None
async def main(): @event.listens_for(User, "after_insert") #listener не работает в async
await init_db() def create_permission(mapper, connection, target):
connection.execute(
Permission.__table__.insert().values(user_id=target.id)
)

View File

@@ -12,12 +12,16 @@
<form id="logoutForm"> <form id="logoutForm">
<button type="submit">Logout</button> <button type="submit">Logout</button>
</form> </form>
<h2>data</h2> <h2 id ="Account_Email">data</h2>
<form action="#" method="POST"> <form>
<p>Data</p> <p id = "Account_Description">Data</p>
<p id = "Account_Activated">Data</p>
<p id = "Account_Created">Data</p>
</form> </form>
</div> </div>
</div> </div>
<script src="js.js"></script> <script src="js.js"></script>
<script type="module" src="profile.js"></script>
</body> </body>
</html> </html>

View File

@@ -0,0 +1,45 @@
async function loadUser() {
try {
const token = getToken();
const response = await fetch("http://localhost:8000/me", {
method: "GET",
headers: {
"Authorization": `Bearer ${token}`
}
});
if (!response.ok) {
throw new Error("Unauthorized");
}
const data = await response.json();
console.log("User info:", data);
show_data(data)
} catch (err) {
showError(["Connection error"]);
}
}
loadUser();
function show_data(data) {
const emailElem = document.getElementById('Account_Email');
if (emailElem) {
emailElem.textContent = data.email;
}
const descElem = document.getElementById('Account_Description');
if (descElem) {
descElem.textContent = data.description || "—";
}
const activatedElem = document.getElementById('Account_Activated');
if (activatedElem) {
activatedElem.textContent = `Active: ${data.activated ? "Yes" : "No"}`;
}
const createdElem = document.getElementById('Account_Created');
if (createdElem) {
// красиво обрезать дату
createdElem.textContent = new Date(data.created_at).toLocaleString();
}
}

View File

View File

@@ -0,0 +1,9 @@
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from server.backend.endpoints import api
@pytest_asyncio.fixture
async def client():
transport = ASGITransport(app=api)#подключение к FastAPI-приложению напрямую
async with AsyncClient(transport=transport, base_url="http://test") as ac: #Имитирует подключение в это приложение, как будто по сети
yield ac #чтобы вернуть значение, но не завершить функцию.

View File

@@ -0,0 +1,8 @@
[pytest]
minversion = 6.0
testpaths = tests
python_files = test_*.py *_test.py
python_functions = test_*
addopts =
-vv
--import-mode=importlib

View File

@@ -0,0 +1,39 @@
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi import status
from server.backend import JWT # твой JWT модуль
from server.backend.endpoints import api
from server.testing.conftest import client
@pytest.mark.asyncio
async def test_protected_unauthorized(client):
"Проверка: без токена — 401"
response = await client.get("/protected") #отправляется GET /protected без токена
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_protected_invalidtoken(client):
"Проверка: Токена неверный - 401"
response = await client.get("/protected", headers={"Authorization": "Invalid token"})
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_protected_authorized(client):
"Проверка: с токеном — ответ с Hello"
async def fake_current_user(): #фейковая зависимость (fake_current_user) — функция, которая имитирует поведение настоящего JWT.current_user, но просто возвращает строку "test_user".
return "test_user"
# подменяем зависимость
api.dependency_overrides[JWT.current_user] = fake_current_user #FastAPI позволяет временно подменять зависимости (через Depends).
response = await client.get("/protected", headers={"Authorization": "Bearer faketoken"})
assert response.status_code == status.HTTP_200_OK
assert response.json() == {"msg": "Hello, test_user"}
api.dependency_overrides.clear() #после теста очистка оверрайда, чтобы не повлиять на другие тесты.

View File

View File

@@ -0,0 +1,149 @@
import pytest
import uuid #для генерации рандомного uuid
from httpx import AsyncClient, ASGITransport
from fastapi import status
from server.backend import JWT # твой JWT модуль
from server.backend.endpoints import api
from server.testing.conftest import client
from server.database import db
@pytest.fixture(scope="session")
def global_email():
"""Один и тот же email для всех тестов в сессии"""
return f"user_{uuid.uuid4().hex[:8]}@d.d"
@pytest.mark.asyncio
@pytest.mark.parametrize("email,password,expected_status", [
("valid", "123qwe!@#", 200),
("invalidemail.com", "123qwe!@#", 422),
("invalidpassword", "1234", 422),
("invalidall", "1234", 422)
])
async def test_create_user(client, email, password, expected_status, global_email):
'''Проверка: Создание пользователя'''
if email == "valid":
email = global_email #Подставка глобальной фикстуры под конкретный вариант
user_data = {
"email": email,
"description": "test_user",
"activated": "True",
"password": password
}
response = await client.post(f"/user_create", json=user_data)
assert response.status_code == expected_status
@pytest.mark.asyncio
@pytest.mark.parametrize("is_admin,expected_status", [
(False, 403),
(True, 200)
])
async def test_is_admin(client, monkeypatch, global_email, is_admin, expected_status):
"""Проверка: Прав пользователя + обновление"""
class FakePerm:
def __init__(self, is_admin): #Подменяем права пользователя
self.is_admin = is_admin
self.can_edit = False
self.can_delete = False
self.can_view = False
class FakeUser:
def __init__(self, is_admin):
self.email = global_email
self.permissions = [FakePerm(is_admin)]
self.description = "test_user"
self.activated = True
self.password = "123qwe!@#"
async def fake_get_user_by_email(email): #Подмена функции get_user_by_email
return FakeUser(is_admin)
monkeypatch.setattr(db, "get_user_by_email", fake_get_user_by_email)
async def fake_current_user():
return global_email
api.dependency_overrides[JWT.current_user] = fake_current_user
response = await client.put(
f"/user_update/{global_email}",
headers={"Authorization": "Bearer faketoken"},
json={"can_edit": True,
"can_delete": True,
"can_view": True,
"description": "1112322"
},
)
assert response.status_code == expected_status
api.dependency_overrides.clear()
@pytest.mark.asyncio
@pytest.mark.parametrize("email,password,expected_status", [
("valid", "123qwe!@#", 200),
("invalidemail.com", "123qwe!@#", 422),
("invalidpassword", "1234", 422),
("invalidall", "1234", 422)
])
async def test_login_user(client, email, password, expected_status, global_email):
'''Проверка: Логин пользователя'''
if email == "valid":
email = global_email #Подставка глобальной фикстуры под конкретный вариант
user_data = {
"username": email,
"password": password
}
response = await client.post(f"/login", data=user_data)
assert response.status_code == expected_status
@pytest.mark.parametrize("is_admin, can_delete,expected_status", [
(False,True,200),
(True,True,200),
(True,False,200),
(False, False,403)
])
@pytest.mark.asyncio
async def test_delete_user(client, global_email, monkeypatch, is_admin, expected_status, can_delete):
"""Проверка: Удаление пользователя"""
class FakePerm:
def __init__(self, is_admin):
self.is_admin = is_admin
self.can_edit = False
self.can_delete = can_delete
self.can_view = False
class FakeUser:
def __init__(self, is_admin):
self.email = global_email
self.permissions = [FakePerm(is_admin)]
self.description = "test_user"
self.activated = True
self.password = "123qwe!@#"
# добавляем обязательные поля из модели UserOut
self.created_at = "2025-10-12T00:00:00Z"
self.updated_at = "2025-10-12T00:00:00Z"
self.last_login = None
async def fake_get_user_by_email(email):
return FakeUser(is_admin)
monkeypatch.setattr(db, "get_user_by_email", fake_get_user_by_email)
async def fake_current_user():
return global_email
api.dependency_overrides[JWT.current_user] = fake_current_user
response = await client.delete(
f"/user_delete/{global_email}",
headers={"Authorization": "Bearer faketoken"}
)
assert response.status_code == expected_status
api.dependency_overrides.clear()