123 lines
4.1 KiB
Python
123 lines
4.1 KiB
Python
import hashlib
|
|
import imghdr
|
|
import re
|
|
import zlib
|
|
import validators
|
|
from uuid import UUID
|
|
from config import (
|
|
VALIDATE_PASSWORD_REGEX,
|
|
VALIDATE_USERNAME_REGEX,
|
|
FORCE_EMAIL_VERIFICATION,
|
|
MAX_MEDIA_SIZE,
|
|
ZLIB_COMPRESSION_LEVEL,
|
|
ALLOWED_MEDIA_TYPES,
|
|
)
|
|
from orm.users import (
|
|
get_user_by_username,
|
|
get_user_by_email,
|
|
UserModel,
|
|
get_user_by_id,
|
|
)
|
|
from fastapi import UploadFile
|
|
from fastapi.exceptions import HTTPException
|
|
|
|
|
|
async def validate_user(
|
|
first_name: str | None,
|
|
last_name: str | None,
|
|
username: str | None,
|
|
email: str | None,
|
|
password: str | None,
|
|
bio: str | None,
|
|
):
|
|
"""
|
|
Performs some validation upon user creation. Returns
|
|
a tuple (success, msg) to be used by routes. Values
|
|
set to None are not checked against
|
|
"""
|
|
|
|
if first_name and len(first_name) > 64:
|
|
return False, "first name is too long"
|
|
if first_name and len(first_name) < 5:
|
|
return False, "first name is too short"
|
|
if last_name and len(last_name) > 64:
|
|
return False, "last name is too long"
|
|
if last_name and len(last_name) < 2:
|
|
return False, "last name is too short"
|
|
if username and len(username) < 5:
|
|
return False, "username is too short"
|
|
if username and len(username) > 32:
|
|
return False, "username is too long"
|
|
if (
|
|
username
|
|
and VALIDATE_USERNAME_REGEX
|
|
and not re.match(VALIDATE_USERNAME_REGEX, username)
|
|
):
|
|
return False, "username is invalid"
|
|
if email and not validators.email(email):
|
|
return False, "email is not valid"
|
|
if password and len(password) > 72:
|
|
return False, "password is too long"
|
|
if (
|
|
password
|
|
and VALIDATE_PASSWORD_REGEX
|
|
and not re.match(VALIDATE_PASSWORD_REGEX, password)
|
|
):
|
|
return False, "password is too weak"
|
|
if username and await get_user_by_username(
|
|
username, deleted_ok=True, restricted_ok=True
|
|
):
|
|
return False, "username is already taken"
|
|
if email and await get_user_by_email(email, deleted_ok=True, restricted_ok=True):
|
|
return False, "email is already registered"
|
|
if bio and len(bio) > 4096:
|
|
return False, "bio is too long"
|
|
if bio:
|
|
try:
|
|
bio.encode("utf-8")
|
|
except UnicodeDecodeError:
|
|
return False, "bio contains invalid characters"
|
|
return True, ""
|
|
|
|
|
|
async def validate_profile_picture(
|
|
file: UploadFile,
|
|
) -> tuple[bool | None, str, bytes, str]:
|
|
"""
|
|
Validates a profile picture's size and content to see if it fits
|
|
our criteria and returns a tuple result, ext, data where result is a
|
|
boolean or none (True = check was passed, False = size too large,
|
|
None = check was failed for other reasons) indicating if the check was successful,
|
|
ext is the file's type and extension, data is a compressed stream of bytes
|
|
representing the original media and hash is the file's SHA256 hash encoded in
|
|
hexadecimal, before the compression. This function never raises an exception
|
|
"""
|
|
|
|
async with file:
|
|
try:
|
|
content = await file.read()
|
|
if len(content) > MAX_MEDIA_SIZE:
|
|
return False, "", b"", ""
|
|
if not (ext := imghdr.what(content.decode())) in ALLOWED_MEDIA_TYPES:
|
|
return None, "", b"", ""
|
|
return (
|
|
True,
|
|
ext,
|
|
zlib.compress(content, ZLIB_COMPRESSION_LEVEL),
|
|
hashlib.sha256(content).hexdigest(),
|
|
)
|
|
except (UnicodeDecodeError, zlib.error):
|
|
return None, "", b"", ""
|
|
|
|
|
|
# Credential loaders for our authenticated routes
|
|
async def get_self_by_id(public_id: UUID, requires_verified: bool = True) -> UserModel:
|
|
user = await get_user_by_id(public_id, include_secrets=True, restricted_ok=True)
|
|
if FORCE_EMAIL_VERIFICATION and requires_verified and not user.email_verified:
|
|
raise HTTPException(status_code=401, detail="Email verification is required")
|
|
return user
|
|
|
|
|
|
async def get_self_by_id_unverified(public_id: UUID) -> UserModel:
|
|
return await get_self_by_id(public_id, False)
|