Files
sqlalchemy-fastapi-pydentic…/server/backend/endpoints.py

118 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, status, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import EmailStr
from . import pydentic, JWT, password
from server.database import db
from datetime import datetime, timedelta
import asyncio
api = FastAPI()
from dotenv import load_dotenv #Работа с env для CORS
import os
load_dotenv()
origins = os.getenv("ALLOW_ORIGINS").split(",")
credentials = os.getenv("ALLOW_CREDENTIALS").lower() == "true"
methods = os.getenv("ALLOW_METHODS").split(",")
headers = os.getenv("ALLOW_HEADERS").split(",")
api.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=credentials,
allow_methods=methods,
allow_headers=headers,
)
@api.get("/protected") #test
async def protected(current_user: str = Depends(JWT.current_user)):
return {"msg": f"Hello, {current_user}"}
@api.get("/", response_model=list[pydentic.UserOut]) #список!
async def get_all_rows(current_user: str = Depends(JWT.current_user)):
users = await db.get_all_rows()
if not users:
raise HTTPException(status_code=401, detail="The user isn't found")
return users
@api.get("/get_user_by_email/{email}", response_model=pydentic.UserOut)
async def get_user_by_email(email:str, current_user: str = Depends(JWT.current_user)):
user = await db.get_user_by_email(email)
if user:
return user
else:
raise HTTPException(status_code=401, detail="The user isn't found")
@api.post("/user_create", response_model=pydentic.UserOut)
async def create_user(row:pydentic.CreateUser):
new_row = pydentic.CreateUser(email=row.email, description=row.description, activated = row.activated, password = row.password)
try:
await db.create_user(new_row)
except:
raise HTTPException(status_code=409, detail="User with this email already exists")
user = await db.get_user_by_email(row.email)
return user
@api.delete("/user_delete/{email}", response_model=pydentic.UserOut)
async def delete_user(email:str,current_user: str = Depends(JWT.current_user)):
user = await db.get_user_by_email(email)
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
await db.delete_user(email)
return user
@api.put("/user_update/{email}", response_model=pydentic.UserOut)
async def update_user(email:str, updated_row: pydentic.UserUpdate, current_user: str = Depends(JWT.current_user)):
user = await db.get_user_by_email(email)
perm = user.permissions[0]
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
changed = False
if updated_row.email is not None and updated_row.email != user.email:
user.email = updated_row.email
changed = True
if updated_row.description is not None and updated_row.description != user.description:
user.description = updated_row.description
changed = True
if updated_row.activated is not None and updated_row.activated != user.activated:
user.activated = updated_row.activated
changed = True
if updated_row.password is not None and updated_row.password != user.password:
user.password = updated_row.password
changed = True
if updated_row.can_edit is not None and updated_row.can_edit != perm.can_edit:
perm.can_edit = updated_row.can_edit
changed = True
if updated_row.can_delete is not None and updated_row.can_delete != perm.can_delete:
perm.can_delete = updated_row.can_delete
changed = True
if changed:
user = await db.update_user(user_info=user, perm_info=perm)
else:
pass
return user
@api.post("/login")
async def login_user(form_data: OAuth2PasswordRequestForm = Depends()):
try:
creds = pydentic.UserLogin(email=form_data.username, password=form_data.password)
except:
raise HTTPException(status_code=422, detail="Email is not a valid email address")
user = await db.login_user(creds)
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
access_token = await JWT.AccessToken.create(
{"sub": user.email},
timedelta(minutes=JWT.ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}
@api.post("/reset", response_model=pydentic.UserOut)
async def reset_user(row:pydentic.UserReset):
user = await db.get_user_by_email(row.email)
if not user:
raise HTTPException(status_code=401, detail="The user isn't found")
new_password = password.generate_password()
new_row = pydentic.UserReset(email=row.email, new_password=new_password)
password.send_password(new_row)
user = await db.reset_user(new_row)
return user