Files

60 lines
2.5 KiB
Python

from fastapi import FastAPI, Depends, HTTPException,status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import server.backend.schema.pydantic as pydantic
import server.backend.database.db as db
from server.backend.auth.JWT import signJWT, decodeJWT
api = FastAPI()
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
user = decodeJWT(token)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
return user
async def check_roles(user=Depends(get_current_user)):
if user.get("admin") != True:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
return user
@api.post("/update", response_model=pydantic.UserUpdate)
async def update_user(data: pydantic.UserUpdate, user=Depends(get_current_user)):
user_check = await db.list_user(user["user_id"])
if not user_check:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
if not user_check.admin:
if data.code != user_check.code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Ordinary users cannot change their code"
)
if user_check.admin:
if data.code != user_check.code:
existing_user = await db.list_user_by_code(data.code)
if existing_user and existing_user.id != user_check.id:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Code already exists for another user"
)
updated_data = data.copy(update={"id": user_check.id})
updated_data = await db.update_user(updated_data)
return updated_data
@api.post("/create", response_model=pydantic.UserAccess)
async def create_user(user_info: pydantic.UserCreate,user=Depends(check_roles)):
await db.create_user(user_info)
return user_info
@api.get("/list")
async def list_users(user=Depends(check_roles)):
list_of_users = await db.list_users()
return list_of_users
@api.post("/auth",response_model=pydantic.Token)
async def auth(code:pydantic.UserAccess):
login = await db.login_user(code)
if login == None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Forbidden")
token = signJWT(login)
return {"access_token": token, "token_type": "bearer"}