import re import bcrypt from uuid import UUID import validators from fastapi import APIRouter as FastAPI, Depends, Response, Request from fastapi.exceptions import HTTPException from fastapi.security import OAuth2PasswordRequestForm from datetime import timedelta from config import ( BCRYPT_ROUNDS, LOGGER, SESSION_EXPIRE_LIMIT, COOKIE_SAMESITE_POLICY, COOKIE_DOMAIN, MANAGER, SESSION_COOKIE_NAME, LIMITER, VALIDATE_PASSWORD_REGEX, VALIDATE_USERNAME_REGEX, SECURE_COOKIE, COOKIE_PATH, COOKIE_HTTPONLY, ) from orm.users import UserModel, User router = FastAPI() async def get_user_by_id( public_id: UUID, include_secrets: bool = False, restricted_ok: bool = False, deleted_ok: bool = False ) -> dict | None: """ Retrieves a user by its public ID """ user = ( await User.select( *User.all_columns(exclude=["public_id"]), User.public_id.as_alias("id"), exclude_secrets=not include_secrets, ) .where(User.public_id == public_id) .first() ) if user: # Performs validation UserModel(**user) if (user["deleted"] and not deleted_ok) or (user["restricted"] and not restricted_ok): return return user return @MANAGER.user_loader() async def get_self_by_id(public_id: UUID) -> dict: return await get_user_by_id(public_id, include_secrets=True, restricted_ok=True) async def get_user_by_username( username: str, include_secrets: bool = False, restricted_ok: bool = False, deleted_ok: bool = False ) -> dict | None: """ Retrieves a user by its public username """ user = ( await User.select( *User.all_columns(exclude=["public_id"]), User.public_id.as_alias("id"), exclude_secrets=not include_secrets, ) .where(User.username == username) .first() ) if user: # Performs validation UserModel(**user) if (user["deleted"] and not deleted_ok) or (user["restricted"] and not restricted_ok): return return user return async def get_user_by_email( email: str, include_secrets: bool = False, restricted_ok: bool = False, deleted_ok: bool = False ) -> dict | None: """ Retrieves a user by its email address (meant to be used internally) """ user = ( await User.select( *User.all_columns(exclude=["public_id"]), User.public_id.as_alias("id"), exclude_secrets=not include_secrets, ) .where(User.email_address == email) .first() ) if user: # Performs validation UserModel(**user) if (user["deleted"] and not deleted_ok) or (user["restricted"] and not restricted_ok): return return user return @LIMITER.limit("5/minute") @router.post("/user") async def login( request: Request, response: Response, data: OAuth2PasswordRequestForm = Depends() ) -> dict: if request.cookies.get(SESSION_COOKIE_NAME): raise HTTPException(status_code=400, detail="Please logout first") username = data.username if len(username) > 32: raise HTTPException( status_code=413, detail="Authentication failed: username is too long" ) try: password = data.password.encode() if len(password) > 72: raise HTTPException( status_code=413, detail="Authentication failed: password is too long" ) except UnicodeEncodeError as e: LOGGER.warning( f"An error occurred while attempting to decode password for user {username} -> {type(e).__name__}: {e}" ) raise HTTPException( status_code=413, detail="Authentication failed: invalid characters in password", ) if not ( user := await get_user_by_username( username, include_secrets=True, restricted_ok=True ) ): raise HTTPException( status_code=413, detail="Authentication failed: the user does not exist", ) if not bcrypt.checkpw(password, user["password_hash"]): raise HTTPException( status_code=413, detail="Authentication failed: password mismatch", ) token = MANAGER.create_access_token( expires=timedelta(seconds=SESSION_EXPIRE_LIMIT), data={"sub": str(user["id"])} ) response.set_cookie( secure=SECURE_COOKIE, key=SESSION_COOKIE_NAME, max_age=SESSION_EXPIRE_LIMIT, value=token, httponly=COOKIE_HTTPONLY, samesite=COOKIE_SAMESITE_POLICY, domain=COOKIE_DOMAIN or None, path=COOKIE_PATH or "/", ) return {"status_code": 200, "msg": "Authentication successful"} @router.get("/user/logout") @LIMITER.limit("5/minute") async def logout( request: Request, response: Response, user: dict = Depends(MANAGER) ) -> dict: """ Deletes a user's session cookie, logging them out """ response.delete_cookie( secure=SECURE_COOKIE, key=SESSION_COOKIE_NAME, httponly=COOKIE_HTTPONLY, samesite=COOKIE_SAMESITE_POLICY, domain=COOKIE_DOMAIN or None, path=COOKIE_PATH or "/", ) return {"status_code": 200, "msg": "Logged out"} @router.get("/user/me") @LIMITER.limit("2/second") async def get_self(request: Request, user: dict = Depends(MANAGER)) -> dict: """ Fetches a user's own info. This returns some extra data such as email address, account creation date and email verification status, which is not available from the regular endpoint """ user.pop("password_hash") user.pop("internal_id") user.pop("deleted") return {"status_code": 200, "msg": "Success", "data": user} @router.get("/user/username/{username}") @LIMITER.limit("30/second") async def get_user_by_name( request: Request, username: str, _auth: dict = Depends(MANAGER) ) -> dict: """ Fetches a single user by its public ID """ if not (user := await get_user_by_username(username)): return { "status_code": 404, "msg": "Lookup failed: the user does not exist", } user.pop("restricted") user.pop("deleted") return {"status_code": 200, "msg": "Lookup successful", "data": user} @router.get("/user/id/{public_id}") @LIMITER.limit("30/second") async def get_user_by_public_id( request: Request, public_id: str, _auth: dict = Depends(MANAGER) ) -> dict: """ Fetches a single user by its public ID """ if not (user := await get_user_by_id(UUID(public_id))): raise HTTPException( status_code=404, detail="Lookup failed: the user does not exist" ) user.pop("restricted") user.pop("deleted") return {"status_code": 200, "msg": "Lookup successful", "data": user} async def validate_user( first_name: str, last_name: str, username: str, email: str, password: str ) -> tuple[bool, str]: """ Performs some validation upon user creation. Returns a tuple (success, msg) to be used by routes """ if len(first_name) > 64: return False, "first name is too long" if len(first_name) < 5: return False, "first name is too short" if len(last_name) > 64: return False, "last name is too long" if len(last_name) < 2: return False, "last name is too short" if len(username) < 5: return False, "username is too short" if len(username) > 32: return False, "username is too long" if VALIDATE_USERNAME_REGEX and not re.match(VALIDATE_USERNAME_REGEX, username): return False, "username is invalid" if not validators.email(email): return False, "email is not valid" if len(password) > 72: return False, "password is too long" if VALIDATE_PASSWORD_REGEX and not re.match(VALIDATE_PASSWORD_REGEX, password): return False, "password is too weak" if await get_user_by_username(username, deleted_ok=True, restricted_ok=True): return False, "username is already taken" if await get_user_by_email(email, deleted_ok=True, restricted_ok=True): return False, "email is already registered" return True, "" @router.delete("/user") @LIMITER.limit("1/minute") async def delete(request: Request, response: Response, user: dict = Depends(MANAGER)) -> dict: """ Sets the user's deleted flag in the database, without actually deleting the associated data """ await User.update({User.deleted: True}).where(User.public_id == user["id"]) response.delete_cookie( secure=SECURE_COOKIE, key=SESSION_COOKIE_NAME, httponly=COOKIE_HTTPONLY, samesite=COOKIE_SAMESITE_POLICY, domain=COOKIE_DOMAIN or None, path=COOKIE_PATH or "/",) return {"status_code": 200, "msg": "Success"} @router.put("/user") @LIMITER.limit("2/minute") async def signup( request: Request, first_name: str, last_name: str, username: str, email: str, password: str, ) -> dict: """ Endpoint used to create new users """ if request.cookies.get(SESSION_COOKIE_NAME): raise HTTPException(status_code=400, detail="Please logout first") # We don't use FastAPI's validation because we want custom error # messages result, msg = await validate_user(first_name, last_name, username, email, password) if not result: return {"status_code": 413, "msg": f"Signup failed: {msg}"} else: await User.insert( User( first_name=first_name, last_name=last_name, username=username, email_address=email, password_hash=bcrypt.hashpw( password.encode(), bcrypt.gensalt(BCRYPT_ROUNDS) ), ) ) return {"status_code": 200, "msg": "Success"}