PySimpleSocial/endpoints/users.py

901 lines
32 KiB
Python

import base64
from datetime import timezone, datetime
import hashlib
import json
import re
import imghdr
import uuid
import zlib
import bcrypt
from uuid import UUID
import validators
from fastapi import APIRouter as FastAPI, Depends, Response, Request, UploadFile
from fastapi.exceptions import HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from datetime import timedelta
from pathlib import Path
from config import (
BCRYPT_ROUNDS,
LOGGER,
SESSION_EXPIRE_LIMIT,
COOKIE_SAMESITE_POLICY,
COOKIE_DOMAIN,
MANAGER,
SESSION_COOKIE_NAME,
LIMITER,
VALIDATE_PASSWORD_REGEX,
VALIDATE_USERNAME_REGEX,
SECURE_COOKIE,
COOKIE_PATH,
COOKIE_HTTPONLY,
ALLOWED_MEDIA_TYPES,
ZLIB_COMPRESSION_LEVEL,
MAX_MEDIA_SIZE,
STORAGE_ENGINE,
STORAGE_FOLDER,
SMTP_HOST,
SMTP_PORT,
SMTP_USER,
SMTP_PASSWORD,
SMTP_USE_TLS,
SMTP_TIMEOUT,
SMTP_FROM_USER,
SMTP_TEMPLATES_DIRECTORY,
PLATFORM_NAME,
HAS_HTTPS,
HOST,
PORT,
EMAIL_VERIFICATION_EXPIRATION,
FORCE_EMAIL_VERIFICATION,
UNVERIFIED_MANAGER,
)
from responses import (
Response as APIResponse,
UnprocessableEntity,
BadRequest,
NotFound,
MediaTypeNotAcceptable,
PayloadTooLarge,
InternalServerError,
)
from responses.users import (
PrivateUserResponse,
PublicUserResponse,
)
from orm.users import (
User,
UserModel,
PublicUserModel,
PrivateUserModel,
get_user_by_username,
get_user_by_id,
get_user_by_email,
)
from orm.media import Media, MediaType, PublicMediaModel
from orm.email_verification import EmailVerification, EmailVerificationType
from util.email import send_email
router = FastAPI()
# Credential loaders for our authenticated routes
@MANAGER.user_loader()
async def get_self_by_id(public_id: UUID, requires_verified: bool = True) -> UserModel:
user = await get_user_by_id(public_id, include_secrets=True, restricted_ok=True)
if FORCE_EMAIL_VERIFICATION and requires_verified and not user.email_verified:
raise HTTPException(status_code=401, detail="Email verification is required")
return user
@UNVERIFIED_MANAGER.user_loader()
async def get_self_by_id_unverified(public_id: UUID) -> UserModel:
return await get_self_by_id(public_id, False)
# Here follow our *beautifully* documented path operations
@router.post(
"/user",
tags=["Users"],
status_code=200,
responses={
200: {"model": APIResponse},
400: {"model": BadRequest},
422: {"model": UnprocessableEntity},
},
)
@LIMITER.limit("5/minute")
async def login(request: Request, response: Response, data: OAuth2PasswordRequestForm = Depends()):
"""
Performs user authentication. Endpoint is limited to 5 hits per minute
"""
if request.cookies.get(SESSION_COOKIE_NAME):
raise HTTPException(status_code=400, detail="Please logout first")
username = data.username
if len(username) > 32:
raise HTTPException(status_code=413, detail="Authentication failed: username is too long")
try:
password = data.password.encode()
if len(password) > 72:
raise HTTPException(status_code=413, detail="Authentication failed: password is too long")
except UnicodeEncodeError as e:
LOGGER.warning(
f"An error occurred while attempting to decode password for user {username} -> {type(e).__name__}: {e}"
)
raise HTTPException(
status_code=413,
detail="Authentication failed: invalid characters in password",
)
if not (user := await get_user_by_username(username, include_secrets=True, restricted_ok=True)):
raise HTTPException(
status_code=413,
detail="Authentication failed: the user does not exist",
)
if not bcrypt.checkpw(password, user.password_hash):
raise HTTPException(
status_code=413,
detail="Authentication failed: password mismatch",
)
token = MANAGER.create_access_token(
expires=timedelta(seconds=SESSION_EXPIRE_LIMIT),
data={"sub": str(user.public_id)},
)
response.set_cookie(
secure=SECURE_COOKIE,
key=SESSION_COOKIE_NAME,
max_age=SESSION_EXPIRE_LIMIT,
value=token,
httponly=COOKIE_HTTPONLY,
samesite=COOKIE_SAMESITE_POLICY,
domain=COOKIE_DOMAIN or None,
path=COOKIE_PATH or "/",
)
return APIResponse(status_code=200, msg="Authentication successful")
@router.get(
"/user/logout",
tags=["Users"],
status_code=200,
responses={200: {"model": APIResponse}, 422: {"model": UnprocessableEntity}},
)
@LIMITER.limit("5/minute")
async def logout(request: Request, response: Response, _user: UserModel = Depends(UNVERIFIED_MANAGER)):
"""
Deletes a user's session cookie, logging them
out. Endpoint is limited to 5 hits per minute
"""
response.delete_cookie(
secure=SECURE_COOKIE,
key=SESSION_COOKIE_NAME,
httponly=COOKIE_HTTPONLY,
samesite=COOKIE_SAMESITE_POLICY,
domain=COOKIE_DOMAIN or None,
path=COOKIE_PATH or "/",
)
return APIResponse(status_code=200, msg="Logged out")
@router.get(
"/user/me",
tags=["Users"],
status_code=200,
responses={
200: {
"model": PrivateUserResponse,
"exclude": {"password_hash", "internal_id", "deleted"},
},
422: {"model": UnprocessableEntity},
},
)
@LIMITER.limit("2/second")
async def get_self(request: Request, user: UserModel = Depends(UNVERIFIED_MANAGER)):
"""
Fetches a user's own info. This returns some
extra data such as email address, account
creation date and email verification status,
which is not available from the regular endpoint.
Endpoint is limited to 2 hits per second
"""
return PrivateUserResponse(status_code=200, msg="Success", data=user)
@router.get(
"/user/username/{username}",
tags=["Users"],
status_code=200,
responses={
200: {
"model": PublicUserResponse,
"exclude": {"password_hash", "internal_id", "deleted"},
},
404: {"model": NotFound},
422: {"model": UnprocessableEntity},
},
)
@LIMITER.limit("30/second")
async def get_user_by_name(request: Request, username: str, _auth: UserModel = Depends(MANAGER)):
"""
Fetches a single user by its public username
"""
if not (user := await get_user_by_username(username)):
return NotFound(msg="Lookup failed: the user does not exist")
return PublicUserResponse(
data=PublicUserModel(
public_id=user.public_id,
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
bio=user.bio,
verified_account=user.verified_account,
profile_picture=PublicMediaModel(
media_id=user.profile_picture.media_id,
content=user.profile_picture.content,
content_type=user.profile_picture.content_type,
creation_date=user.profile_picture.creation_date,
)
if user.profile_picture
else None,
)
)
@router.get(
"/user/id/{public_id}",
tags=["Users"],
status_code=200,
responses={
200: {"model": PublicUserResponse},
404: {"model": NotFound},
422: {"model": UnprocessableEntity},
},
)
@LIMITER.limit("30/second")
async def get_user_by_public_id(request: Request, public_id: str, _auth: UserModel = Depends(MANAGER)):
"""
Fetches a single user by its public ID
"""
if not (user := await get_user_by_id(UUID(public_id))):
raise HTTPException(status_code=404, detail="Lookup failed: the user does not exist")
return PublicUserResponse(
data=PublicUserModel(
public_id=user.public_id,
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
bio=user.bio,
verified_account=user.verified_account,
profile_picture=PublicMediaModel(
media_id=user.profile_picture.media_id,
content=user.profile_picture.content,
content_type=user.profile_picture.content_type,
creation_date=user.profile_picture.creation_date,
)
if user.profile_picture
else None,
)
)
async def validate_user(
first_name: str | None,
last_name: str | None,
username: str | None,
email: str | None,
password: str | None,
bio: str | None,
):
"""
Performs some validation upon user creation. Returns
a tuple (success, msg) to be used by routes. Values
set to None are not checked against
"""
if first_name and len(first_name) > 64:
return False, "first name is too long"
if first_name and len(first_name) < 5:
return False, "first name is too short"
if last_name and len(last_name) > 64:
return False, "last name is too long"
if last_name and len(last_name) < 2:
return False, "last name is too short"
if username and len(username) < 5:
return False, "username is too short"
if username and len(username) > 32:
return False, "username is too long"
if username and VALIDATE_USERNAME_REGEX and not re.match(VALIDATE_USERNAME_REGEX, username):
return False, "username is invalid"
if email and not validators.email(email):
return False, "email is not valid"
if password and len(password) > 72:
return False, "password is too long"
if password and VALIDATE_PASSWORD_REGEX and not re.match(VALIDATE_PASSWORD_REGEX, password):
return False, "password is too weak"
if username and await get_user_by_username(username, deleted_ok=True, restricted_ok=True):
return False, "username is already taken"
if email and await get_user_by_email(email, deleted_ok=True, restricted_ok=True):
return False, "email is already registered"
if bio and len(bio) > 4096:
return False, "bio is too long"
if bio:
try:
bio.encode("utf-8")
except UnicodeDecodeError:
return False, "bio contains invalid characters"
return True, ""
@router.delete(
"/user",
tags=["Users"],
status_code=200,
responses={200: {"model": APIResponse}, 422: {"model": UnprocessableEntity}},
)
@LIMITER.limit("1/minute")
async def delete(request: Request, response: Response, user: UserModel = Depends(UNVERIFIED_MANAGER)):
"""
Sets the user's deleted flag in the database,
without actually deleting the associated
data
"""
await User.update({User.deleted: True}).where(User.public_id == user.public_id)
response.delete_cookie(
secure=SECURE_COOKIE,
key=SESSION_COOKIE_NAME,
httponly=COOKIE_HTTPONLY,
samesite=COOKIE_SAMESITE_POLICY,
domain=COOKIE_DOMAIN or None,
path=COOKIE_PATH or "/",
)
return APIResponse(status_code=200, msg="Success")
@router.get(
"/user/verifyEmail/{verification_id}",
tags=["Users"],
status_code=200,
responses={
200: {"model": APIResponse},
404: {"model": NotFound},
422: {"model": UnprocessableEntity},
},
)
@LIMITER.limit("3/second")
async def verify_email(
request: Request,
verification_id: str,
user: UserModel = Depends(UNVERIFIED_MANAGER),
):
"""
Verifies a user's email address
"""
if not (
verification := await EmailVerification.select(*EmailVerification.all_columns())
.where(EmailVerification.id == verification_id)
.first()
):
raise HTTPException(status_code=404, detail="Verification ID is invalid")
elif not verification["pending"]:
raise HTTPException(status_code=400, detail="Email is already verified")
elif datetime.now().astimezone(timezone.utc) - verification["creation_date"].astimezone(timezone.utc) > timedelta(
seconds=EMAIL_VERIFICATION_EXPIRATION
):
raise HTTPException(
status_code=400,
detail="Verification window has expired. Try again",
)
else:
await EmailVerification.update({EmailVerification.pending: False}).where(
EmailVerification.user == user.public_id
)
await User.update({User.email_verified: True}).where(User.public_id == user.public_id)
return APIResponse(status_code=200, msg="Verification successful")
@router.get(
"/user/resetPassword/{verification_id}",
tags=["Users"],
status_code=200,
responses={
200: {"model": APIResponse},
400: {"model": BadRequest},
422: {"model": UnprocessableEntity},
404: {"model": NotFound},
},
)
@LIMITER.limit("3/second")
async def reset_password(
request: Request,
verification_id: str,
user: UserModel = Depends(UNVERIFIED_MANAGER),
):
"""
Modifies a user's password
"""
if not (
verification := await EmailVerification.select(*EmailVerification.all_columns())
.where(EmailVerification.id == verification_id)
.first()
):
raise HTTPException(status_code=404, detail="Request ID is invalid")
elif not verification["pending"]:
raise HTTPException(status_code=400, detail="This link has already been used")
elif datetime.now().astimezone(timezone.utc) - verification["creation_date"].astimezone(timezone.utc) > timedelta(
seconds=EMAIL_VERIFICATION_EXPIRATION
):
raise HTTPException(
status_code=400,
detail="Verification window has expired. Try again",
)
else:
# Note how we don't update based on the verification ID:
# this way, multiple pending email verification requests
# are all cleared at once
await EmailVerification.update({EmailVerification.pending: False}).where(
EmailVerification.user == user.public_id and EmailVerification.kind == EmailVerificationType.PASSWORD_RESET
)
await User.update({User.password_hash: verification["data"]}).where(User.public_id == user.public_id)
return APIResponse(status_code=200, msg="Password updated")
@router.get(
"/user/changeEmail/{verification_id}",
tags=["Users"],
status_code=200,
responses={
200: {"model": APIResponse},
400: {"model": BadRequest},
422: {"model": UnprocessableEntity},
404: {"model": NotFound},
},
)
@LIMITER.limit("3/second")
async def change_email(
request: Request,
verification_id: str,
user: UserModel = Depends(UNVERIFIED_MANAGER),
):
"""
Modifies a user's email
"""
if not (
verification := await EmailVerification.select(*EmailVerification.all_columns())
.where(EmailVerification.id == verification_id)
.first()
):
raise HTTPException(status_code=404, detail="Request ID is invalid")
elif not verification["pending"]:
raise HTTPException(status_code=400, detail="This link has already been used")
elif datetime.now().astimezone(timezone.utc) - verification["creation_date"].astimezone(timezone.utc) > timedelta(
seconds=EMAIL_VERIFICATION_EXPIRATION
):
raise HTTPException(
status_code=400,
detail="Verification window has expired. Try again",
)
else:
# Note how we don't update based on the verification ID:
# this way, multiple pending email verification requests
# are all cleared at once
await EmailVerification.update({EmailVerification.pending: False}).where(
EmailVerification.user == user.public_id and EmailVerification.kind == EmailVerificationType.CHANGE_EMAIL
)
await User.update(
{
User.email_address: verification["data"].decode(),
User.email_verified: False,
}
).where(User.public_id == user.public_id)
return APIResponse(status_code=200, msg="Email updated")
@router.put(
"user/resendMail",
tags=["Users"],
status_code=200,
responses={
200: {"model": APIResponse},
400: {"model": BadRequest},
422: {"model": UnprocessableEntity},
500: {"model": InternalServerError},
},
)
@LIMITER.limit("6/minute")
async def resend_email(request: Request, user: UserModel = Depends(UNVERIFIED_MANAGER)):
"""
Resends the verification email to the user if the previous has expired
"""
if user.email_verified:
raise HTTPException(status_code=400, detail="Email is already verified")
email_template = SMTP_TEMPLATES_DIRECTORY / f"{user.locale}.json"
try:
email_template.resolve(strict=True)
except FileNotFoundError:
email_template = SMTP_TEMPLATES_DIRECTORY / "en_US.json"
with email_template.open() as f:
email_message = json.load(f)["signup"]
verification_id = uuid.uuid4()
if await send_email(
SMTP_HOST,
SMTP_PORT,
email_message["content"].format(
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
email=user.email_address,
link=f"http{'s' if HAS_HTTPS else ''}://{HOST}"
f"{'' if PORT == 443 and HAS_HTTPS or PORT == 80 else f':{PORT}'}/user/verifyEmail/{verification_id}",
platformName=PLATFORM_NAME,
),
SMTP_TIMEOUT,
SMTP_FROM_USER,
user.email_address,
email_message["subject"].format(
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
email=user.email_address,
platformName=PLATFORM_NAME,
),
SMTP_USER,
SMTP_PASSWORD,
use_tls=SMTP_USE_TLS,
):
await EmailVerification.update(
{
EmailVerification.id: verification_id,
EmailVerification.creation_date: datetime.now(),
}
)
return APIResponse(status_code=200, msg="Success")
else:
raise HTTPException(
status_code=500,
detail="An error occurred while trying to resend the email," " please try again later",
)
@router.put(
"/user",
tags=["Users"],
status_code=200,
responses={
200: {"model": APIResponse},
400: {"model": BadRequest},
422: {"model": UnprocessableEntity},
500: {"model": InternalServerError},
413: {"model": PayloadTooLarge},
},
)
@LIMITER.limit("2/minute")
async def signup(
request: Request,
first_name: str,
last_name: str,
username: str,
email: str,
password: str,
bio: str | None = None,
locale: str = "en_US",
):
"""
Endpoint used to create new users
"""
if request.cookies.get(SESSION_COOKIE_NAME):
raise HTTPException(status_code=400, detail="Please logout first")
# We don't use FastAPI's validation because we want custom error
# messages
result, msg = await validate_user(first_name, last_name, username, email, password, bio)
if not result:
return APIResponse(status_code=413, msg=f"Signup failed: {msg}")
else:
salt = bcrypt.gensalt(BCRYPT_ROUNDS)
user = User(
first_name=first_name,
last_name=last_name,
username=username,
email_address=email,
password_hash=bcrypt.hashpw(password.encode(), salt),
bio=bio,
)
email_template = SMTP_TEMPLATES_DIRECTORY / f"{locale}.json"
try:
email_template.resolve(strict=True)
except FileNotFoundError:
email_template = SMTP_TEMPLATES_DIRECTORY / "en_US.json"
with email_template.open() as f:
email_message = json.load(f)["signup"]
verification_id = uuid.uuid4()
if await send_email(
SMTP_HOST,
SMTP_PORT,
email_message["content"].format(
first_name=first_name,
last_name=last_name,
username=username,
email=email,
link=f"http{'s' if HAS_HTTPS else ''}://{HOST}"
f"{'' if PORT == 443 and HAS_HTTPS or PORT == 80 else f':{PORT}'}/user/verifyEmail/{verification_id}",
platformName=PLATFORM_NAME,
),
SMTP_TIMEOUT,
SMTP_FROM_USER,
email,
email_message["subject"].format(
first_name=first_name,
last_name=last_name,
username=username,
email=email,
platformName=PLATFORM_NAME,
),
SMTP_USER,
SMTP_PASSWORD,
use_tls=SMTP_USE_TLS,
):
await User.insert(user)
await EmailVerification.insert(
EmailVerification(
{
EmailVerification.id: verification_id,
EmailVerification.user: user,
}
)
)
return APIResponse(status_code=200, msg="Success")
else:
raise HTTPException(
status_code=500,
detail="An error occurred while sending verification email, please" " try again later",
)
async def validate_profile_picture(
file: UploadFile,
) -> tuple[bool | None, str, bytes, str]:
"""
Validates a profile picture's size and content to see if it fits
our criteria and returns a tuple result, ext, data where result is a
boolean or none (True = check was passed, False = size too large,
None = check was failed for other reasons) indicating if the check was successful,
ext is the file's type and extension, data is a compressed stream of bytes
representing the original media and hash is the file's SHA256 hash encoded in
hexadecimal, before the compression. This function never raises an exception
"""
async with file:
try:
content = await file.read()
if len(content) > MAX_MEDIA_SIZE:
return False, "", b"", ""
if not (ext := imghdr.what(content.decode())) in ALLOWED_MEDIA_TYPES:
return None, "", b"", ""
return (
True,
ext,
zlib.compress(content, ZLIB_COMPRESSION_LEVEL),
hashlib.sha256(content).hexdigest(),
)
except (UnicodeDecodeError, zlib.error):
return None, "", b"", ""
@router.patch(
"/user",
tags=["Users"],
status_code=200,
responses={
200: {"model": APIResponse},
400: {"model": BadRequest},
422: {"model": UnprocessableEntity},
500: {"model": InternalServerError},
413: {"model": PayloadTooLarge},
415: {"model": MediaTypeNotAcceptable},
},
)
async def update_user(
request: Request,
user: UserModel = Depends(UNVERIFIED_MANAGER),
first_name: str | None = None,
last_name: str | None = None,
username: str | None = None,
profile_picture: UploadFile | None = None,
email_address: str | None = None,
password: str | None = None,
bio: str | None = None,
delete: bool = False,
):
"""
Updates a user's profile information. Parameters that are not specified are left unchanged unless
the delete option is set to True, in which case a value of null for a parameter indicates that it
is to be reset or removed. If delete equals False, the default, at least one parameter has to be non-null.
Setting a new email address is only allowed if the old one is verified and will require the user to click a
link sent to the current email address to authorize the operation, after which the address is modified.
A similar procedure is required for resetting the password, requiring an email confirmation before said
change is registered. Please also note that changing the email address undoes the account's email verification,
which needs to be carried out again. When delete equals True, only the bio and profile_picture fields are considered
since they're the only ones that can be set to a null value
"""
if not delete and not any((first_name, last_name, username, profile_picture, email_address, bio, password)):
raise HTTPException(status_code=400, detail="At least one value has to be specified")
result, msg = await validate_user(first_name, last_name, username, email_address, password, bio)
if not result:
raise HTTPException(status_code=413, detail=f"Update failed: {msg}")
orig_user = user.copy()
if not delete:
if first_name:
user.first_name = first_name
if last_name:
user.last_name = last_name
if username:
user.username = username
if bio:
user.bio = bio
if profile_picture:
result, ext, media, digest = validate_profile_picture(profile_picture)
if result is False:
raise HTTPException(status_code=415, detail="The file type is unsupported")
elif result is None:
raise HTTPException(status_code=413, detail="The file is too large")
elif (m := await Media.select(Media.media_id).where(Media.media_id == digest).first()) is None:
# This media hasn't been already uploaded (either by this user or by someone
# else), so we save it now. If it has been already uploaded, there's no need
# to do it again (that's what the hash is for)
match STORAGE_ENGINE:
case "database":
m = Media(
media_id=digest,
media_type=MediaType.BLOB,
content_type=ext,
content=base64.b64encode(media),
)
await Media.insert(m)
user.profile_picture = m
case "local":
file = Path(STORAGE_FOLDER).resolve(strict=True) / str(digest)
file.touch(mode=0o644)
with file.open("wb") as f:
f.write(media)
m = Media(
media_id=digest,
media_type=MediaType.FILE,
content_type=ext,
content=file.as_posix(),
)
await Media.insert(m)
user.profile_picture = m
case "url":
pass # TODO: Use/implement CDN uploading
else:
user.media = m
if password and not bcrypt.checkpw(password.encode(), user.password_hash):
email_template = SMTP_TEMPLATES_DIRECTORY / f"{user.locale}.json"
try:
email_template.resolve(strict=True)
except FileNotFoundError:
email_template = SMTP_TEMPLATES_DIRECTORY / "en_US.json"
with email_template.open() as f:
email_message = json.load(f)["password_change"]
verification_id = uuid.uuid4()
if not await send_email(
SMTP_HOST,
SMTP_PORT,
email_message["content"].format(
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
email=user.email_address,
link=f"http{'s' if HAS_HTTPS else ''}://{HOST}"
f"{'' if PORT == 443 and HAS_HTTPS or PORT == 80 else f':{PORT}'}/user/resetPassword/{verification_id}",
platformName=PLATFORM_NAME,
),
SMTP_TIMEOUT,
SMTP_FROM_USER,
user.email_address,
email_message["subject"].format(
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
email=user.email_address,
platformName=PLATFORM_NAME,
),
SMTP_USER,
SMTP_PASSWORD,
use_tls=SMTP_USE_TLS,
):
raise HTTPException(
500,
detail="An error occurred while trying to send mail, please try again later",
)
else:
await EmailVerification.insert(
EmailVerification(
{
EmailVerification.id: verification_id,
EmailVerification.user: User(public_id=user.public_id),
EmailVerification.data: bcrypt.hashpw(password.encode(), user.password_hash[:29]),
}
)
)
if email_address and user.email_address != email_address:
if not user.email_verified:
raise HTTPException(
status_code=403,
detail="The email address needs to be verified first",
)
email_template = SMTP_TEMPLATES_DIRECTORY / f"{user.locale}.json"
try:
email_template.resolve(strict=True)
except FileNotFoundError:
email_template = SMTP_TEMPLATES_DIRECTORY / "en_US.json"
with email_template.open() as f:
email_message = json.load(f)["email_change"]
verification_id = uuid.uuid4()
if not await send_email(
SMTP_HOST,
SMTP_PORT,
email_message["content"].format(
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
email=user.email_address,
platformName=PLATFORM_NAME,
link=f"http{'s' if HAS_HTTPS else ''}://{HOST}"
f"{'' if PORT == 443 and HAS_HTTPS or PORT == 80 else f':{PORT}'}/user/changeEmail/{verification_id}",
newMail=email_address,
),
SMTP_TIMEOUT,
SMTP_FROM_USER,
user.email_address,
email_message["subject"].format(
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
email=user.email_address,
platformName=PLATFORM_NAME,
),
SMTP_USER,
SMTP_PASSWORD,
use_tls=SMTP_USE_TLS,
):
raise HTTPException(
500,
detail="An error occurred while trying to send mail, please try again later",
)
else:
await EmailVerification.insert(
EmailVerification(
{
EmailVerification.id: verification_id,
EmailVerification.user: User(public_id=user.public_id),
EmailVerification.data: email_address.encode(),
}
)
)
else:
if not bio:
user.bio = None
if not profile_picture:
user.profile_picture = None
fields = []
for field in user:
if field not in ["email_address", "password"] and getattr(orig_user, field) != getattr(user, field):
fields.append((field, getattr(user, field)))
if fields:
# If anything has changed, we update our info
await User.update({field: value for field, value in fields}).where(User.public_id == user.public_id)
return APIResponse(status_code=200, msg="Changes saved successfully")