Files
sqlalchemy-fastapi-pydentic…/server/backend/endpoints.py
2025-10-05 15:15:32 +03:00

133 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, status, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from .rate_limit import limiter, ratelimit_handler
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from pydantic import EmailStr
from . import pydentic, JWT, password, permissions
from server.database import db
from datetime import datetime, timedelta
import asyncio
api = FastAPI()
api.state.limiter = limiter
api.add_exception_handler(RateLimitExceeded, ratelimit_handler)
api.add_middleware(SlowAPIMiddleware)
from dotenv import load_dotenv #Работа с env для CORS
import os
load_dotenv()
origins = os.getenv("ALLOW_ORIGINS").split(",")
credentials = os.getenv("ALLOW_CREDENTIALS").lower() == "true"
methods = os.getenv("ALLOW_METHODS").split(",")
headers = os.getenv("ALLOW_HEADERS").split(",")
api.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=credentials,
allow_methods=methods,
allow_headers=headers,
)
@api.get("/protected") #test
async def protected(current_user: str = Depends(JWT.current_user)):
return {"msg": f"Hello, {current_user}"}
@api.get("/", response_model=list[pydentic.UserOut]) #список!
async def get_all_rows(current_user: str = Depends(JWT.current_user), user=Depends(permissions.check_permission("is_admin"))):
users = await db.get_all_rows()
if not users:
raise HTTPException(status_code=401, detail="The user isn't found")
return users
@api.get("/get_user_by_email/{email}", response_model=pydentic.UserOut)
async def get_user_by_email(email:str, current_user: str = Depends(JWT.current_user), user=Depends(permissions.check_permission("can_view"))):
user = await db.get_user_by_email(email)
if user:
return user
else:
raise HTTPException(status_code=401, detail="The user isn't found")
@api.post("/user_create", response_model=pydentic.UserOut)
async def create_user(row:pydentic.CreateUser):
new_row = pydentic.CreateUser(email=row.email, description=row.description, activated = row.activated, password = row.password)
try:
await db.create_user(new_row)
except:
raise HTTPException(status_code=409, detail="User with this email already exists")
user = await db.get_user_by_email(row.email)
return user
@api.delete("/user_delete/{email}", response_model=pydentic.UserOut)
async def delete_user(email:str,current_user: str = Depends(JWT.current_user), user = Depends(permissions.check_permission("can_delete"))):
user = await db.get_user_by_email(email)
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
await db.delete_user(email)
return user
@api.put("/user_update/{email}", response_model=pydentic.UserOut)
async def update_user(email:str, updated_row: pydentic.UserUpdate, current_user: str = Depends(JWT.current_user), user = Depends(permissions.check_permission("can_edit"))):
user = await db.get_user_by_email(email) #user из бд, которого запросили
perm = user.permissions[0] #права по запрошенному user
current = await db.get_user_by_email(current_user) #user из бд по jwt
current_perms = current.permissions[0] #Права по jwt
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
changed = False
#изменение только определенных колонок
updatable_fields = ["email", "description", "activated"]
for field in updatable_fields:
new_value = getattr(updated_row, field)
if new_value is not None and new_value != getattr(user, field):
setattr(user, field, new_value)
changed = True
# пароль
if updated_row.password:
if not verify_password(updated_row.password, user.password):
user.password = updated_row.password
changed = True
# права (только для админа)
if current_perms.is_admin:
perm_fields = ["can_edit", "can_delete", "can_view", "is_admin"]
for field in perm_fields:
new_value = getattr(updated_row, field)
if new_value is not None and new_value != getattr(perm, field):
setattr(perm, field, new_value)
changed = True
if changed:
user = await db.update_user(user_info=user, perm_info=perm)
return user
@api.post("/login")
async def login_user(form_data: OAuth2PasswordRequestForm = Depends()):
try:
creds = pydentic.UserLogin(email=form_data.username, password=form_data.password)
except:
raise HTTPException(status_code=422, detail="Email is not a valid email address")
user = await db.login_user(creds)
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
access_token = await JWT.AccessToken.create(
{"sub": user.email},
timedelta(minutes=JWT.ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}
@api.post("/reset", response_model=pydentic.UserOut)
async def reset_user(row:pydentic.UserReset):
user = await db.get_user_by_email(row.email)
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
new_password = password.generate_password()
new_row = pydentic.UserReset(email=row.email, new_password=new_password)
password.send_password(new_row)
user = await db.reset_user(new_row)
return user
@api.get("/admin/{email}")
async def admin_stuff(
email: str,
user = Depends(permissions.check_permission("can_delete"))
):
return {"msg": f"Добро пожаловать, {user.email}"}