91 lines
3.6 KiB
Python
91 lines
3.6 KiB
Python
from fastapi import FastAPI, Depends, HTTPException,status, Response, Cookie
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
import server.backend.schema.pydantic as pydantic
|
|
import server.backend.database.db as db
|
|
from server.backend.auth.JWT import signJWT, decodeJWT
|
|
api = FastAPI(openapi_url="/api/openapi.json",docs_url="/api/docs", redoc_url="/api/redoc")
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
token_cookie: str = Cookie(default=None)
|
|
):
|
|
token = credentials.credentials if credentials else token_cookie
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
user = decodeJWT(token)
|
|
if not user:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
return user
|
|
async def check_roles(user=Depends(get_current_user)):
|
|
user_check = await db.list_user(user["user_id"])
|
|
if not user_check:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
if user_check.admin != True:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
|
|
return user
|
|
|
|
@api.post("/api/update", response_model=pydantic.UserUpdate)
|
|
async def update_user(data: pydantic.UserUpdate, user=Depends(get_current_user)):
|
|
user_check = await db.list_user(user["user_id"])
|
|
if not user_check:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
if not user_check.admin:
|
|
if data.code != user_check.code:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Ordinary users cannot change their code"
|
|
)
|
|
if user_check.admin:
|
|
if data.code != user_check.code:
|
|
existing_user = await db.list_user_by_code(data.code)
|
|
if existing_user and existing_user.id != user_check.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Code already exists for another user"
|
|
)
|
|
updated_data = data.copy(update={"id": user_check.id})
|
|
updated_data = await db.update_user(updated_data)
|
|
return updated_data
|
|
|
|
@api.post("/api/create", response_model=pydantic.UserAccess)
|
|
async def create_user(user_info: pydantic.UserCreate,user=Depends(check_roles)):
|
|
user = await db.create_user(user_info)
|
|
if user == None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Code already exists for another user"
|
|
)
|
|
return user_info
|
|
|
|
@api.get("/api/list")
|
|
async def list_users(user=Depends(check_roles)):
|
|
list_of_users = await db.list_users()
|
|
return list_of_users
|
|
|
|
@api.post("/api/auth")
|
|
async def auth(code: pydantic.UserAccess, response: Response):
|
|
login = await db.login_user(code)
|
|
if login is None:
|
|
raise HTTPException(status_code=401, detail="Forbidden")
|
|
token = signJWT(login)
|
|
response.set_cookie(
|
|
key="token",
|
|
value=token,
|
|
httponly=True,
|
|
secure=True,
|
|
samesite="strict"
|
|
)
|
|
return {"access_token": token, "token_type": "bearer"}
|
|
@api.get("/api/verify")
|
|
async def verify(token: str = Cookie(default=None)):
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="No token")
|
|
user = decodeJWT(token)
|
|
if not user:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
return {"status": "ok"}
|
|
@api.post("/api/logout")
|
|
async def logout(response: Response):
|
|
response.delete_cookie(key="token")
|
|
return {"status": "ok"} |