from fastapi import FastAPI, Depends, HTTPException,status, Response, Cookie from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import server.backend.schema.pydantic as pydantic import server.backend.database.db as db from server.backend.auth.JWT import signJWT, decodeJWT api = FastAPI(openapi_url="/api/openapi.json",docs_url="/api/docs", redoc_url="/api/redoc") security = HTTPBearer(auto_error=False) async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), token_cookie: str = Cookie(default=None) ): token = credentials.credentials if credentials else token_cookie if not token: raise HTTPException(status_code=401, detail="Not authenticated") user = decodeJWT(token) if not user: raise HTTPException(status_code=401, detail="Invalid token") return user async def check_roles(user=Depends(get_current_user)): user_check = await db.list_user(user["user_id"]) if not user_check: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") if user_check.admin != True: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied") return user @api.post("/api/update", response_model=pydantic.UserUpdate) async def update_user(data: pydantic.UserUpdate, user=Depends(get_current_user)): user_check = await db.list_user(user["user_id"]) if not user_check: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") if not user_check.admin: if data.code != user_check.code: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Ordinary users cannot change their code" ) if user_check.admin: if data.code != user_check.code: existing_user = await db.list_user_by_code(data.code) if existing_user and existing_user.id != user_check.id: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Code already exists for another user" ) updated_data = data.copy(update={"id": user_check.id}) updated_data = await db.update_user(updated_data) return updated_data @api.post("/api/create", response_model=pydantic.UserAccess) async def create_user(user_info: pydantic.UserCreate,user=Depends(check_roles)): user = await db.create_user(user_info) if user == None: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Code already exists for another user" ) return user_info @api.get("/api/list") async def list_users(user=Depends(check_roles)): list_of_users = await db.list_users() return list_of_users @api.post("/api/auth") async def auth(code: pydantic.UserAccess, response: Response): login = await db.login_user(code) if login is None: raise HTTPException(status_code=401, detail="Forbidden") token = signJWT(login) response.set_cookie( key="token", value=token, httponly=True, secure=True, samesite="strict" ) return {"access_token": token, "token_type": "bearer"} @api.get("/api/verify") async def verify(token: str = Cookie(default=None)): if not token: raise HTTPException(status_code=401, detail="No token") user = decodeJWT(token) if not user: raise HTTPException(status_code=401, detail="Invalid token") return {"status": "ok"} @api.post("/api/logout") async def logout(response: Response): response.delete_cookie(key="token") return {"status": "ok"}