from fastapi import FastAPI, HTTPException, status, Depends, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from fastapi.security import OAuth2PasswordRequestForm from pydantic import EmailStr from . import pydentic, JWT, password from server.database import db from datetime import datetime, timedelta import asyncio api = FastAPI() from dotenv import load_dotenv #Работа с env для CORS import os load_dotenv() origins = os.getenv("ALLOW_ORIGINS").split(",") credentials = os.getenv("ALLOW_CREDENTIALS").lower() == "true" methods = os.getenv("ALLOW_METHODS").split(",") headers = os.getenv("ALLOW_HEADERS").split(",") api.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=credentials, allow_methods=methods, allow_headers=headers, ) @api.get("/protected") #test async def protected(current_user: str = Depends(JWT.current_user)): return {"msg": f"Hello, {current_user}"} @api.get("/", response_model=list[pydentic.UserOut]) #список! async def get_all_rows(current_user: str = Depends(JWT.current_user)): users = await db.get_all_rows() if not user: raise HTTPException(status_code=401, detail="The user isn't found") return users @api.get("/get_user_by_email/{email}", response_model=pydentic.UserOut) async def get_user_by_email(email:str, current_user: str = Depends(JWT.current_user)): user = await db.get_user_by_email(email) if user: return user else: raise HTTPException(status_code=401, detail="The user isn't found") @api.post("/user_create", response_model=pydentic.UserOut) async def create_user(row:pydentic.CreateUser): new_row = pydentic.CreateUser(email=row.email, description=row.description, activated = row.activated, password = row.password) try: await db.create_user(new_row) except: raise HTTPException(status_code=409, detail="User with this email already exists") return new_row @api.delete("/user_delete/{email}", response_model=pydentic.UserOut) async def delete_user(email:str,current_user: str = Depends(JWT.current_user)): user = await db.get_user_by_email(email) if not user: raise HTTPException(status_code=401, detail="The user isn't found") await db.delete_user(email) return user @api.put("/user_update/{email}", response_model=pydentic.UserOut) async def update_user(email:str, updated_row: pydentic.UserUpdate, current_user: str = Depends(JWT.current_user)): user = await db.get_user_by_email(email) if not user: raise HTTPException(status_code=401, detail="The user isn't found") changed = False if updated_row.email is not None and updated_row.email != user.email: user.email = updated_row.email changed = True if updated_row.description is not None and updated_row.description != user.description: user.description = updated_row.description changed = True if updated_row.activated is not None and updated_row.activated != user.activated: user.activated = updated_row.activated changed = True if updated_row.password is not None and updated_row.password != user.password: user.password = updated_row.password changed = True if changed: await db.update_user(user) else: pass return user @api.post("/login") async def login_user(form_data: OAuth2PasswordRequestForm = Depends()): try: creds = pydentic.UserLogin(email=form_data.username, password=form_data.password) except: raise HTTPException(status_code=422, detail="Email is not a valid email address") user = await db.login_user(creds) if not user: raise HTTPException(status_code=401, detail="The user isn't found") access_token = await JWT.AccessToken.create( {"sub": user.email}, timedelta(minutes=JWT.ACCESS_TOKEN_EXPIRE_MINUTES) ) return {"access_token": access_token, "token_type": "bearer"} @api.post("/reset", response_model=pydentic.UserOut) async def reset_user(row:pydentic.UserReset): user = await db.get_user_by_email(row.email) if not user: raise HTTPException(status_code=401, detail="The user isn't found") new_password = password.generate_password() new_row = pydentic.UserReset(email=row.email, new_password=new_password) password.send_password(new_row) user = await db.reset_user(new_row) return user