Files
sqlalchemy-fastapi-pydentic…/server/backend/endpoints.py
2025-09-25 17:54:34 +03:00

104 lines
4.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, status, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from . import pydentic, JWT, password
from datetime import datetime, timedelta
from pydantic import EmailStr
from server.database import db
import asyncio
api = FastAPI()
from dotenv import load_dotenv #Работа с env для CORS
import os
load_dotenv()
origins = os.getenv("ALLOW_ORIGINS").split(",")
credentials = os.getenv("ALLOW_CREDENTIALS").lower() == "true"
methods = os.getenv("ALLOW_METHODS").split(",")
headers = os.getenv("ALLOW_HEADERS").split(",")
api.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=credentials,
allow_methods=methods,
allow_headers=headers,
)
@api.get("/protected")
async def protected(current_user: str = Depends(JWT.current_user)):
return {"msg": f"Hello, {current_user}"}
@api.get("/", response_model=pydentic.CreateUser)
async def get_all_rows(current_user: str = Depends(JWT.current_user)):
for row in await db.get_all_rows():
if row:
return row
else:
raise HTTPException(status_code=404, detail="The user isn't found")
@api.get("/get_user_by_email/{email}", response_model=pydentic.CreateUser)
async def get_user_by_email(email:str, current_user: str = Depends(JWT.current_user)):
user = await db.get_user_by_email(email)
if user:
return user
else:
raise HTTPException(status_code=404, detail="The user isn't found")
@api.post("/user_create", response_model=pydentic.UsersInfo)
async def create_user(row:pydentic.CreateUser):
new_row = pydentic.CreateUser(email=row.email, description=row.description, activated = row.activated, password = row.password)
await db.create_user(new_row)
return new_row
@api.delete("/user_delete/{email}", response_model=pydentic.UsersInfo)
async def delete_user(email:str,current_user: str = Depends(JWT.current_user)):
user = await db.get_user_by_email(email)
if not user:
raise HTTPException(status_code=404, detail="The user isn't found")
await db.delete_user(email)
return user
@api.put("/user_update/{email}", response_model=pydentic.UsersInfo)
async def update_user(email:str, updated_row: pydentic.UserUpdate, current_user: str = Depends(JWT.current_user)):
user = await db.get_user_by_email(email)
if not user:
raise HTTPException(status_code=404, detail="The user isn't found")
changed = False
if updated_row.email is not None and updated_row.email != user.email:
user.email = updated_row.email
changed = True
if updated_row.description is not None and updated_row.description != user.description:
user.description = updated_row.description
changed = True
if updated_row.activated is not None and updated_row.activated != user.activated:
user.activated = updated_row.activated
changed = True
if updated_row.password is not None and updated_row.password != user.password:
user.password = updated_row.password
changed = True
if changed:
await db.update_user(user)
else:
pass
return user
@api.post("/login")
async def login_user(form_data: OAuth2PasswordRequestForm = Depends()):
creds = pydentic.UserLogin(email=form_data.username, password=form_data.password)
user = await db.login_user(creds)
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
access_token = await JWT.AccessToken.create(
{"sub": user.email},
timedelta(minutes=JWT.ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}
@api.post("/reset", response_model=pydentic.UsersInfo)
async def reset_user(row:pydentic.UserReset):
user = await db.get_user_by_email(row.email)
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
new_password = password.generate_password()
new_row = pydentic.UserReset(email=row.email, new_password=new_password)
password.send_password(new_row)
user = await db.reset_user(new_row)
return user