from fastapi import FastAPI, Depends, HTTPException,status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import server.backend.schema.pydantic as pydantic import server.backend.database.db as db from server.backend.auth.JWT import signJWT, decodeJWT api = FastAPI() security = HTTPBearer() async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): token = credentials.credentials user = decodeJWT(token) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") return user async def check_roles(user=Depends(get_current_user)): if user.get("admin") != True: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied") return user @api.post("/update", response_model=pydantic.UserUpdate) async def update_user(data: pydantic.UserUpdate, user=Depends(get_current_user)): user_check = await db.list_user(user["user_id"]) if not user_check: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") if not user_check.admin: if data.code != user_check.code: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Ordinary users cannot change their code" ) if user_check.admin: if data.code != user_check.code: existing_user = await db.list_user_by_code(data.code) if existing_user and existing_user.id != user_check.id: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Code already exists for another user" ) updated_data = data.copy(update={"id": user_check.id}) updated_data = await db.update_user(updated_data) return updated_data @api.post("/create", response_model=pydantic.UserAccess) async def create_user(user_info: pydantic.UserCreate,user=Depends(check_roles)): await db.create_user(user_info) return user_info @api.get("/list") async def list_users(user=Depends(check_roles)): list_of_users = await db.list_users() return list_of_users @api.post("/auth",response_model=pydantic.Token) async def auth(code:pydantic.UserAccess): login = await db.login_user(code) if login == None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Forbidden") token = signJWT(login) return {"access_token": token, "token_type": "bearer"}