|
|
|
@ -1,5 +1,7 @@
|
|
|
|
|
import base64
|
|
|
|
|
from datetime import timezone, datetime
|
|
|
|
|
import hashlib
|
|
|
|
|
import json
|
|
|
|
|
import re
|
|
|
|
|
import imghdr
|
|
|
|
|
import uuid
|
|
|
|
@ -33,17 +35,37 @@ from config import (
|
|
|
|
|
ZLIB_COMPRESSION_LEVEL,
|
|
|
|
|
MAX_MEDIA_SIZE,
|
|
|
|
|
STORAGE_ENGINE,
|
|
|
|
|
STORAGE_FOLDER
|
|
|
|
|
STORAGE_FOLDER,
|
|
|
|
|
SMTP_HOST,
|
|
|
|
|
SMTP_PORT,
|
|
|
|
|
SMTP_USER,
|
|
|
|
|
SMTP_PASSWORD,
|
|
|
|
|
SMTP_USE_TLS,
|
|
|
|
|
SMTP_TIMEOUT,
|
|
|
|
|
SMTP_FROM_USER,
|
|
|
|
|
SMTP_TEMPLATES_DIRECTORY,
|
|
|
|
|
PLATFORM_NAME,
|
|
|
|
|
HAS_HTTPS,
|
|
|
|
|
HOST,
|
|
|
|
|
PORT,
|
|
|
|
|
EMAIL_VERIFICATION_EXPIRATION,
|
|
|
|
|
FORCE_EMAIL_VERIFICATION,
|
|
|
|
|
UNVERIFIED_MANAGER,
|
|
|
|
|
)
|
|
|
|
|
from orm.users import UserModel, User
|
|
|
|
|
from orm.media import Media, MediaType
|
|
|
|
|
from orm.email_verification import EmailVerification
|
|
|
|
|
from util.email import send_email
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
router = FastAPI()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_user_by_id(
|
|
|
|
|
public_id: UUID, include_secrets: bool = False, restricted_ok: bool = False,
|
|
|
|
|
deleted_ok: bool = False
|
|
|
|
|
public_id: UUID,
|
|
|
|
|
include_secrets: bool = False,
|
|
|
|
|
restricted_ok: bool = False,
|
|
|
|
|
deleted_ok: bool = False,
|
|
|
|
|
) -> dict | None:
|
|
|
|
|
"""
|
|
|
|
|
Retrieves a user by its public ID
|
|
|
|
@ -61,20 +83,32 @@ async def get_user_by_id(
|
|
|
|
|
if user:
|
|
|
|
|
# Performs validation
|
|
|
|
|
UserModel(**user)
|
|
|
|
|
if (user["deleted"] and not deleted_ok) or (user["restricted"] and not restricted_ok):
|
|
|
|
|
if (user["deleted"] and not deleted_ok) or (
|
|
|
|
|
user["restricted"] and not restricted_ok
|
|
|
|
|
):
|
|
|
|
|
return
|
|
|
|
|
return user
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@MANAGER.user_loader()
|
|
|
|
|
async def get_self_by_id(public_id: UUID) -> dict:
|
|
|
|
|
return await get_user_by_id(public_id, include_secrets=True, restricted_ok=True)
|
|
|
|
|
async def get_self_by_id(public_id: UUID, requires_verified: bool = True) -> dict:
|
|
|
|
|
user = await get_user_by_id(public_id, include_secrets=True, restricted_ok=True)
|
|
|
|
|
if FORCE_EMAIL_VERIFICATION and requires_verified and not user["email_verified"]:
|
|
|
|
|
raise HTTPException(status_code=401, detail="Email verification is required")
|
|
|
|
|
return user
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@UNVERIFIED_MANAGER.user_loader()
|
|
|
|
|
async def get_self_by_id_unverified(public_id: UUID):
|
|
|
|
|
return await get_self_by_id(public_id, False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_user_by_username(
|
|
|
|
|
username: str, include_secrets: bool = False, restricted_ok: bool = False,
|
|
|
|
|
deleted_ok: bool = False
|
|
|
|
|
username: str,
|
|
|
|
|
include_secrets: bool = False,
|
|
|
|
|
restricted_ok: bool = False,
|
|
|
|
|
deleted_ok: bool = False,
|
|
|
|
|
) -> dict | None:
|
|
|
|
|
"""
|
|
|
|
|
Retrieves a user by its public username
|
|
|
|
@ -92,15 +126,19 @@ async def get_user_by_username(
|
|
|
|
|
if user:
|
|
|
|
|
# Performs validation
|
|
|
|
|
UserModel(**user)
|
|
|
|
|
if (user["deleted"] and not deleted_ok) or (user["restricted"] and not restricted_ok):
|
|
|
|
|
if (user["deleted"] and not deleted_ok) or (
|
|
|
|
|
user["restricted"] and not restricted_ok
|
|
|
|
|
):
|
|
|
|
|
return
|
|
|
|
|
return user
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_user_by_email(
|
|
|
|
|
email: str, include_secrets: bool = False, restricted_ok: bool = False,
|
|
|
|
|
deleted_ok: bool = False
|
|
|
|
|
email: str,
|
|
|
|
|
include_secrets: bool = False,
|
|
|
|
|
restricted_ok: bool = False,
|
|
|
|
|
deleted_ok: bool = False,
|
|
|
|
|
) -> dict | None:
|
|
|
|
|
"""
|
|
|
|
|
Retrieves a user by its email address (meant to
|
|
|
|
@ -119,7 +157,9 @@ async def get_user_by_email(
|
|
|
|
|
if user:
|
|
|
|
|
# Performs validation
|
|
|
|
|
UserModel(**user)
|
|
|
|
|
if (user["deleted"] and not deleted_ok) or (user["restricted"] and not restricted_ok):
|
|
|
|
|
if (user["deleted"] and not deleted_ok) or (
|
|
|
|
|
user["restricted"] and not restricted_ok
|
|
|
|
|
):
|
|
|
|
|
return
|
|
|
|
|
return user
|
|
|
|
|
return
|
|
|
|
@ -184,7 +224,7 @@ async def login(
|
|
|
|
|
@router.get("/user/logout")
|
|
|
|
|
@LIMITER.limit("5/minute")
|
|
|
|
|
async def logout(
|
|
|
|
|
request: Request, response: Response, user: dict = Depends(MANAGER)
|
|
|
|
|
request: Request, response: Response, _user: dict = Depends(UNVERIFIED_MANAGER)
|
|
|
|
|
) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
Deletes a user's session cookie, logging them
|
|
|
|
@ -204,7 +244,7 @@ async def logout(
|
|
|
|
|
|
|
|
|
|
@router.get("/user/me")
|
|
|
|
|
@LIMITER.limit("2/second")
|
|
|
|
|
async def get_self(request: Request, user: dict = Depends(MANAGER)) -> dict:
|
|
|
|
|
async def get_self(request: Request, user: dict = Depends(UNVERIFIED_MANAGER)) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
Fetches a user's own info. This returns some
|
|
|
|
|
extra data such as email address, account
|
|
|
|
@ -256,7 +296,12 @@ async def get_user_by_public_id(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def validate_user(
|
|
|
|
|
first_name: str, last_name: str, username: str, email: str, password: str | None
|
|
|
|
|
first_name: str,
|
|
|
|
|
last_name: str,
|
|
|
|
|
username: str,
|
|
|
|
|
email: str,
|
|
|
|
|
password: str | None,
|
|
|
|
|
bio: str | None,
|
|
|
|
|
) -> tuple[bool, str]:
|
|
|
|
|
"""
|
|
|
|
|
Performs some validation upon user creation. Returns
|
|
|
|
@ -276,24 +321,43 @@ async def validate_user(
|
|
|
|
|
return False, "username is too short"
|
|
|
|
|
if username and len(username) > 32:
|
|
|
|
|
return False, "username is too long"
|
|
|
|
|
if username and VALIDATE_USERNAME_REGEX and not re.match(VALIDATE_USERNAME_REGEX, username):
|
|
|
|
|
if (
|
|
|
|
|
username
|
|
|
|
|
and VALIDATE_USERNAME_REGEX
|
|
|
|
|
and not re.match(VALIDATE_USERNAME_REGEX, username)
|
|
|
|
|
):
|
|
|
|
|
return False, "username is invalid"
|
|
|
|
|
if email and not validators.email(email):
|
|
|
|
|
return False, "email is not valid"
|
|
|
|
|
if password and len(password) > 72:
|
|
|
|
|
return False, "password is too long"
|
|
|
|
|
if password and VALIDATE_PASSWORD_REGEX and not re.match(VALIDATE_PASSWORD_REGEX, password):
|
|
|
|
|
if (
|
|
|
|
|
password
|
|
|
|
|
and VALIDATE_PASSWORD_REGEX
|
|
|
|
|
and not re.match(VALIDATE_PASSWORD_REGEX, password)
|
|
|
|
|
):
|
|
|
|
|
return False, "password is too weak"
|
|
|
|
|
if username and await get_user_by_username(username, deleted_ok=True, restricted_ok=True):
|
|
|
|
|
if username and await get_user_by_username(
|
|
|
|
|
username, deleted_ok=True, restricted_ok=True
|
|
|
|
|
):
|
|
|
|
|
return False, "username is already taken"
|
|
|
|
|
if email and await get_user_by_email(email, deleted_ok=True, restricted_ok=True):
|
|
|
|
|
return False, "email is already registered"
|
|
|
|
|
if bio and len(bio) > 4096:
|
|
|
|
|
return False, "bio is too long"
|
|
|
|
|
if bio:
|
|
|
|
|
try:
|
|
|
|
|
bio.encode("utf-8")
|
|
|
|
|
except UnicodeDecodeError:
|
|
|
|
|
return False, "bio contains invalid characters"
|
|
|
|
|
return True, ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.delete("/user")
|
|
|
|
|
@LIMITER.limit("1/minute")
|
|
|
|
|
async def delete(request: Request, response: Response, user: dict = Depends(MANAGER)) -> dict:
|
|
|
|
|
async def delete(
|
|
|
|
|
request: Request, response: Response, user: dict = Depends(UNVERIFIED_MANAGER)
|
|
|
|
|
) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
Sets the user's deleted flag in the database,
|
|
|
|
|
without actually deleting the associated
|
|
|
|
@ -307,10 +371,166 @@ async def delete(request: Request, response: Response, user: dict = Depends(MANA
|
|
|
|
|
httponly=COOKIE_HTTPONLY,
|
|
|
|
|
samesite=COOKIE_SAMESITE_POLICY,
|
|
|
|
|
domain=COOKIE_DOMAIN or None,
|
|
|
|
|
path=COOKIE_PATH or "/",)
|
|
|
|
|
path=COOKIE_PATH or "/",
|
|
|
|
|
)
|
|
|
|
|
return {"status_code": 200, "msg": "Success"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/user/verifyEmail/{verification_id}")
|
|
|
|
|
@LIMITER.limit("3/second")
|
|
|
|
|
async def verify_email(
|
|
|
|
|
request: Request, verification_id: str, user: dict = Depends(UNVERIFIED_MANAGER)
|
|
|
|
|
) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
Verifies a user's email address
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if not (
|
|
|
|
|
verification := await EmailVerification.select(*EmailVerification.all_columns())
|
|
|
|
|
.where(EmailVerification.id == verification_id)
|
|
|
|
|
.first()
|
|
|
|
|
):
|
|
|
|
|
raise HTTPException(status_code=404, detail="Verification ID is invalid")
|
|
|
|
|
elif not verification["pending"]:
|
|
|
|
|
raise HTTPException(status_code=400, detail="Email is already verified")
|
|
|
|
|
elif datetime.now().astimezone(timezone.utc) - verification[
|
|
|
|
|
"creation_date"
|
|
|
|
|
].astimezone(timezone.utc) > timedelta(seconds=EMAIL_VERIFICATION_EXPIRATION):
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=400,
|
|
|
|
|
detail="Verification window has expired. Try again",
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
await EmailVerification.update({EmailVerification.pending: False}).where(
|
|
|
|
|
EmailVerification.user == user["id"]
|
|
|
|
|
)
|
|
|
|
|
await User.update({User.email_verified: True}).where(
|
|
|
|
|
User.public_id == user["id"]
|
|
|
|
|
)
|
|
|
|
|
return {"status_code": 200, "msg": "Verification successful"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/user/resetPassword/{verification_id}")
|
|
|
|
|
@LIMITER.limit("3/second")
|
|
|
|
|
async def reset_password(
|
|
|
|
|
request: Request, verification_id: str, user: dict = Depends(UNVERIFIED_MANAGER)
|
|
|
|
|
) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
Modifies a user's password
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if not (
|
|
|
|
|
verification := await EmailVerification.select(*EmailVerification.all_columns())
|
|
|
|
|
.where(EmailVerification.id == verification_id)
|
|
|
|
|
.first()
|
|
|
|
|
):
|
|
|
|
|
raise HTTPException(status_code=404, detail="Request ID is invalid")
|
|
|
|
|
elif not verification["pending"]:
|
|
|
|
|
raise HTTPException(status_code=400, detail="This link has already been used")
|
|
|
|
|
elif datetime.now().astimezone(timezone.utc) - verification[
|
|
|
|
|
"creation_date"
|
|
|
|
|
].astimezone(timezone.utc) > timedelta(seconds=EMAIL_VERIFICATION_EXPIRATION):
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=400,
|
|
|
|
|
detail="Verification window has expired. Try again",
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
await EmailVerification.update({EmailVerification.pending: False}).where(
|
|
|
|
|
EmailVerification.user == user["id"]
|
|
|
|
|
)
|
|
|
|
|
await User.update({User.password_hash: verification["data"]}).where(
|
|
|
|
|
User.public_id == user["id"]
|
|
|
|
|
)
|
|
|
|
|
return {"status_code": 200, "msg": "Password updated"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/user/changeEmail/{verification_id}")
|
|
|
|
|
@LIMITER.limit("3/second")
|
|
|
|
|
async def change_email(
|
|
|
|
|
request: Request, verification_id: str, user: dict = Depends(UNVERIFIED_MANAGER)
|
|
|
|
|
) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
Modifies a user's email
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if not (
|
|
|
|
|
verification := await EmailVerification.select(*EmailVerification.all_columns())
|
|
|
|
|
.where(EmailVerification.id == verification_id)
|
|
|
|
|
.first()
|
|
|
|
|
):
|
|
|
|
|
raise HTTPException(status_code=404, detail="Request ID is invalid")
|
|
|
|
|
elif not verification["pending"]:
|
|
|
|
|
raise HTTPException(status_code=400, detail="This link has already been used")
|
|
|
|
|
elif datetime.now().astimezone(timezone.utc) - verification[
|
|
|
|
|
"creation_date"
|
|
|
|
|
].astimezone(timezone.utc) > timedelta(seconds=EMAIL_VERIFICATION_EXPIRATION):
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=400,
|
|
|
|
|
detail="Verification window has expired. Try again",
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
await EmailVerification.update({EmailVerification.pending: False}).where(
|
|
|
|
|
EmailVerification.user == user["id"]
|
|
|
|
|
)
|
|
|
|
|
await User.update({User.email_address: verification["data"].decode(),
|
|
|
|
|
User.email_verified: False}).where(
|
|
|
|
|
User.public_id == user["id"]
|
|
|
|
|
)
|
|
|
|
|
return {"status_code": 200, "msg": "Email updated"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.put("user/resendMail")
|
|
|
|
|
@LIMITER.limit("6/minute")
|
|
|
|
|
async def resend_email(request: Request, user: dict = Depends(UNVERIFIED_MANAGER)) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
Resends the verification email to the user if the previous has expired
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if user["email_verified"]:
|
|
|
|
|
raise HTTPException(status_code=400, detail="Email is already verified")
|
|
|
|
|
email_template = SMTP_TEMPLATES_DIRECTORY / f"{user['locale']}.json"
|
|
|
|
|
try:
|
|
|
|
|
email_template.resolve(strict=True)
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
email_template = SMTP_TEMPLATES_DIRECTORY / "en_US.json"
|
|
|
|
|
with email_template.open() as f:
|
|
|
|
|
email_message = json.load(f)["signup"]
|
|
|
|
|
verification_id = uuid.uuid4()
|
|
|
|
|
if await send_email(
|
|
|
|
|
SMTP_HOST,
|
|
|
|
|
SMTP_PORT,
|
|
|
|
|
email_message["content"].format(
|
|
|
|
|
first_name=user["first_name"],
|
|
|
|
|
last_name=user["last_name"],
|
|
|
|
|
username=user["username"],
|
|
|
|
|
email=user["email_address"],
|
|
|
|
|
link=f"http{'s' if HAS_HTTPS else ''}://{HOST}"
|
|
|
|
|
f"{'' if PORT == 443 and HAS_HTTPS or PORT == 80 else f':{PORT}'}/user/verifyEmail/{verification_id}",
|
|
|
|
|
platformName=PLATFORM_NAME,
|
|
|
|
|
),
|
|
|
|
|
SMTP_TIMEOUT,
|
|
|
|
|
SMTP_FROM_USER,
|
|
|
|
|
user["email_address"],
|
|
|
|
|
email_message["subject"].format(
|
|
|
|
|
first_name=user["first_name"],
|
|
|
|
|
last_name=user["last_name"],
|
|
|
|
|
username=user["username"],
|
|
|
|
|
email=user["email_address"],
|
|
|
|
|
platformName=PLATFORM_NAME,
|
|
|
|
|
),
|
|
|
|
|
SMTP_USER,
|
|
|
|
|
SMTP_PASSWORD,
|
|
|
|
|
use_tls=SMTP_USE_TLS,
|
|
|
|
|
):
|
|
|
|
|
await EmailVerification.update(
|
|
|
|
|
{
|
|
|
|
|
EmailVerification.id: verification_id,
|
|
|
|
|
EmailVerification.creation_date: datetime.now()
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
return {"status_code": 200, "msg": "Success"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.put("/user")
|
|
|
|
|
@LIMITER.limit("2/minute")
|
|
|
|
|
async def signup(
|
|
|
|
@ -320,6 +540,8 @@ async def signup(
|
|
|
|
|
username: str,
|
|
|
|
|
email: str,
|
|
|
|
|
password: str,
|
|
|
|
|
bio: str | None = None,
|
|
|
|
|
locale: str = "en_US",
|
|
|
|
|
) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
Endpoint used to create new users
|
|
|
|
@ -329,25 +551,78 @@ async def signup(
|
|
|
|
|
raise HTTPException(status_code=400, detail="Please logout first")
|
|
|
|
|
# We don't use FastAPI's validation because we want custom error
|
|
|
|
|
# messages
|
|
|
|
|
result, msg = await validate_user(first_name, last_name, username, email, password)
|
|
|
|
|
result, msg = await validate_user(
|
|
|
|
|
first_name, last_name, username, email, password, bio
|
|
|
|
|
)
|
|
|
|
|
if not result:
|
|
|
|
|
return {"status_code": 413, "msg": f"Signup failed: {msg}"}
|
|
|
|
|
else:
|
|
|
|
|
await User.insert(
|
|
|
|
|
User(
|
|
|
|
|
salt = bcrypt.gensalt(BCRYPT_ROUNDS)
|
|
|
|
|
user = User(
|
|
|
|
|
first_name=first_name,
|
|
|
|
|
last_name=last_name,
|
|
|
|
|
username=username,
|
|
|
|
|
email_address=email,
|
|
|
|
|
password_hash=bcrypt.hashpw(
|
|
|
|
|
password.encode(), salt
|
|
|
|
|
),
|
|
|
|
|
bio=bio,
|
|
|
|
|
)
|
|
|
|
|
email_template = SMTP_TEMPLATES_DIRECTORY / f"{locale}.json"
|
|
|
|
|
try:
|
|
|
|
|
email_template.resolve(strict=True)
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
email_template = SMTP_TEMPLATES_DIRECTORY / "en_US.json"
|
|
|
|
|
with email_template.open() as f:
|
|
|
|
|
email_message = json.load(f)["signup"]
|
|
|
|
|
verification_id = uuid.uuid4()
|
|
|
|
|
if await send_email(
|
|
|
|
|
SMTP_HOST,
|
|
|
|
|
SMTP_PORT,
|
|
|
|
|
email_message["content"].format(
|
|
|
|
|
first_name=first_name,
|
|
|
|
|
last_name=last_name,
|
|
|
|
|
username=username,
|
|
|
|
|
email_address=email,
|
|
|
|
|
password_hash=bcrypt.hashpw(
|
|
|
|
|
password.encode(), bcrypt.gensalt(BCRYPT_ROUNDS)
|
|
|
|
|
),
|
|
|
|
|
email=email,
|
|
|
|
|
link=f"http{'s' if HAS_HTTPS else ''}://{HOST}"
|
|
|
|
|
f"{'' if PORT == 443 and HAS_HTTPS or PORT == 80 else f':{PORT}'}/user/verifyEmail/{verification_id}",
|
|
|
|
|
platformName=PLATFORM_NAME,
|
|
|
|
|
),
|
|
|
|
|
SMTP_TIMEOUT,
|
|
|
|
|
SMTP_FROM_USER,
|
|
|
|
|
email,
|
|
|
|
|
email_message["subject"].format(
|
|
|
|
|
first_name=first_name,
|
|
|
|
|
last_name=last_name,
|
|
|
|
|
username=username,
|
|
|
|
|
email=email,
|
|
|
|
|
platformName=PLATFORM_NAME,
|
|
|
|
|
),
|
|
|
|
|
SMTP_USER,
|
|
|
|
|
SMTP_PASSWORD,
|
|
|
|
|
use_tls=SMTP_USE_TLS,
|
|
|
|
|
):
|
|
|
|
|
await User.insert(user)
|
|
|
|
|
await EmailVerification.insert(
|
|
|
|
|
EmailVerification(
|
|
|
|
|
{
|
|
|
|
|
EmailVerification.id: verification_id,
|
|
|
|
|
EmailVerification.user: user,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
return {"status_code": 200, "msg": "Success"}
|
|
|
|
|
else:
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=500,
|
|
|
|
|
detail="An error occurred while sending verification email, please"
|
|
|
|
|
" try again later",
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
return {"status_code": 200, "msg": "Success"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def validate_profile_picture(file: UploadFile) -> tuple[bool | None, str, bytes, str]:
|
|
|
|
|
async def validate_profile_picture(
|
|
|
|
|
file: UploadFile,
|
|
|
|
|
) -> tuple[bool | None, str, bytes, str]:
|
|
|
|
|
"""
|
|
|
|
|
Validates a profile picture's size and content to see if it fits
|
|
|
|
|
our criteria and returns a tuple result, ext, data where result is a
|
|
|
|
@ -365,69 +640,216 @@ async def validate_profile_picture(file: UploadFile) -> tuple[bool | None, str,
|
|
|
|
|
return False, "", b"", ""
|
|
|
|
|
if not (ext := imghdr.what(content.decode())) in ALLOWED_MEDIA_TYPES:
|
|
|
|
|
return None, "", b"", ""
|
|
|
|
|
return True, ext, zlib.compress(content, ZLIB_COMPRESSION_LEVEL), hashlib.sha256(content).hexdigest()
|
|
|
|
|
return (
|
|
|
|
|
True,
|
|
|
|
|
ext,
|
|
|
|
|
zlib.compress(content, ZLIB_COMPRESSION_LEVEL),
|
|
|
|
|
hashlib.sha256(content).hexdigest(),
|
|
|
|
|
)
|
|
|
|
|
except (UnicodeDecodeError, zlib.error):
|
|
|
|
|
return None, "", b"", ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.patch("/user")
|
|
|
|
|
async def update(request: Request, user: dict = Depends(MANAGER),
|
|
|
|
|
first_name: str | None = None, last_name: str | None = None,
|
|
|
|
|
username: str | None = None, profile_picture: UploadFile | None = None,
|
|
|
|
|
email_address: str | None = None, bio: str | None = None):
|
|
|
|
|
async def update(
|
|
|
|
|
request: Request,
|
|
|
|
|
user: dict = Depends(UNVERIFIED_MANAGER),
|
|
|
|
|
first_name: str | None = None,
|
|
|
|
|
last_name: str | None = None,
|
|
|
|
|
username: str | None = None,
|
|
|
|
|
profile_picture: UploadFile | None = None,
|
|
|
|
|
email_address: str | None = None,
|
|
|
|
|
password: str | None = None,
|
|
|
|
|
bio: str | None = None,
|
|
|
|
|
delete: bool = False,
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Updates a user's profile information. Parameters that are not specified are left unchanged.
|
|
|
|
|
At least one parameter has to be non-null. Setting a new email address is only allowed if the
|
|
|
|
|
old one is verified and will require the user to click a link sent to the current email address
|
|
|
|
|
to authorize the operation, after which the address is modified.
|
|
|
|
|
Updates a user's profile information. Parameters that are not specified are left unchanged unless
|
|
|
|
|
the delete option is set to True, in which case a value of null for a parameter indicates that it
|
|
|
|
|
is to be reset or removed. If delete equals False, the default, at least one parameter has to be non-null.
|
|
|
|
|
Setting a new email address is only allowed if the old one is verified and will require the user to click a
|
|
|
|
|
link sent to the current email address to authorize the operation, after which the address is modified.
|
|
|
|
|
A similar procedure is required for resetting the password, requiring an email confirmation before said
|
|
|
|
|
change is registered. Please also note that changing the email address undoes the account's email verification,
|
|
|
|
|
which needs to be carried out again. When delete equals True, only the bio and profile_picture fields are considered
|
|
|
|
|
since they're the only ones that can be set to a null value
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if not any((first_name, last_name, username, profile_picture, email_address, bio)):
|
|
|
|
|
raise HTTPException(status_code=400, detail="At least one value has to be specified")
|
|
|
|
|
result, msg = await validate_user(first_name, last_name, username, email_address, None)
|
|
|
|
|
if not delete and not any(
|
|
|
|
|
(first_name, last_name, username, profile_picture, email_address, bio, password)
|
|
|
|
|
):
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=400, detail="At least one value has to be specified"
|
|
|
|
|
)
|
|
|
|
|
result, msg = (
|
|
|
|
|
await validate_user(
|
|
|
|
|
first_name, last_name, username, email_address, password, bio
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if not result:
|
|
|
|
|
raise HTTPException(status_code=413, detail=f"Update failed: {msg}")
|
|
|
|
|
orig_user = user.copy()
|
|
|
|
|
if first_name:
|
|
|
|
|
user["first_name"] = first_name
|
|
|
|
|
if last_name:
|
|
|
|
|
user["last_name"] = last_name
|
|
|
|
|
if username:
|
|
|
|
|
user["username"] = username
|
|
|
|
|
if profile_picture:
|
|
|
|
|
result, ext, media, digest = validate_profile_picture(profile_picture)
|
|
|
|
|
if result is False:
|
|
|
|
|
raise HTTPException(status_code=415, detail="The file type is unsupported")
|
|
|
|
|
elif result is None:
|
|
|
|
|
raise HTTPException(status_code=413, detail="The file is too large")
|
|
|
|
|
elif await (old_media := Media.select(Media.media_id).where(Media.media_id == digest).first()) is None:
|
|
|
|
|
# This media hasn't been already uploaded (either by this user or by someone
|
|
|
|
|
# else), so we save it now. If it has been already uploaded, there's no need
|
|
|
|
|
# to do it again (that's what the hash is for)
|
|
|
|
|
match STORAGE_ENGINE:
|
|
|
|
|
case "database":
|
|
|
|
|
await Media.insert(Media(media_id=digest, media_type=MediaType.BLOB,
|
|
|
|
|
content_type=ext, content=base64.b64encode(media)))
|
|
|
|
|
case "local":
|
|
|
|
|
file = Path(STORAGE_FOLDER).resolve(strict=True) / str(digest)
|
|
|
|
|
file.touch(mode=0o644)
|
|
|
|
|
with file.open("wb") as f:
|
|
|
|
|
f.write(media)
|
|
|
|
|
await Media.insert(Media(media_id=digest, media_type=MediaType.FILE,
|
|
|
|
|
content_type=ext, content=file.as_posix()))
|
|
|
|
|
case "url":
|
|
|
|
|
pass # TODO: Use/implement CDN uploading
|
|
|
|
|
else:
|
|
|
|
|
user["media"] = old_media
|
|
|
|
|
if email_address:
|
|
|
|
|
if not user["email_verified"]:
|
|
|
|
|
raise HTTPException(status_code=403, detail="The email address needs to be verified first")
|
|
|
|
|
pass # TODO: Requires email verification
|
|
|
|
|
if not delete:
|
|
|
|
|
if first_name:
|
|
|
|
|
user["first_name"] = first_name
|
|
|
|
|
if last_name:
|
|
|
|
|
user["last_name"] = last_name
|
|
|
|
|
if username:
|
|
|
|
|
user["username"] = username
|
|
|
|
|
if bio:
|
|
|
|
|
user["bio"] = bio
|
|
|
|
|
if profile_picture:
|
|
|
|
|
result, ext, media, digest = validate_profile_picture(profile_picture)
|
|
|
|
|
if result is False:
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=415, detail="The file type is unsupported"
|
|
|
|
|
)
|
|
|
|
|
elif result is None:
|
|
|
|
|
raise HTTPException(status_code=413, detail="The file is too large")
|
|
|
|
|
elif (
|
|
|
|
|
await (
|
|
|
|
|
old_media := Media.select(Media.media_id)
|
|
|
|
|
.where(Media.media_id == digest)
|
|
|
|
|
.first()
|
|
|
|
|
)
|
|
|
|
|
is None
|
|
|
|
|
):
|
|
|
|
|
# This media hasn't been already uploaded (either by this user or by someone
|
|
|
|
|
# else), so we save it now. If it has been already uploaded, there's no need
|
|
|
|
|
# to do it again (that's what the hash is for)
|
|
|
|
|
match STORAGE_ENGINE:
|
|
|
|
|
case "database":
|
|
|
|
|
await Media.insert(
|
|
|
|
|
Media(
|
|
|
|
|
media_id=digest,
|
|
|
|
|
media_type=MediaType.BLOB,
|
|
|
|
|
content_type=ext,
|
|
|
|
|
content=base64.b64encode(media),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
case "local":
|
|
|
|
|
file = Path(STORAGE_FOLDER).resolve(strict=True) / str(digest)
|
|
|
|
|
file.touch(mode=0o644)
|
|
|
|
|
with file.open("wb") as f:
|
|
|
|
|
f.write(media)
|
|
|
|
|
await Media.insert(
|
|
|
|
|
Media(
|
|
|
|
|
media_id=digest,
|
|
|
|
|
media_type=MediaType.FILE,
|
|
|
|
|
content_type=ext,
|
|
|
|
|
content=file.as_posix(),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
case "url":
|
|
|
|
|
pass # TODO: Use/implement CDN uploading
|
|
|
|
|
else:
|
|
|
|
|
user["media"] = old_media
|
|
|
|
|
if password and not bcrypt.checkpw(password.encode(), user["password_hash"]):
|
|
|
|
|
email_template = SMTP_TEMPLATES_DIRECTORY / f"{user['locale']}.json"
|
|
|
|
|
try:
|
|
|
|
|
email_template.resolve(strict=True)
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
email_template = SMTP_TEMPLATES_DIRECTORY / "en_US.json"
|
|
|
|
|
with email_template.open() as f:
|
|
|
|
|
email_message = json.load(f)["password_change"]
|
|
|
|
|
verification_id = uuid.uuid4()
|
|
|
|
|
if not await send_email(
|
|
|
|
|
SMTP_HOST,
|
|
|
|
|
SMTP_PORT,
|
|
|
|
|
email_message["content"].format(
|
|
|
|
|
first_name=user["first_name"],
|
|
|
|
|
last_name=user["last_name"],
|
|
|
|
|
username=user["username"],
|
|
|
|
|
email=user["email_address"],
|
|
|
|
|
link=f"http{'s' if HAS_HTTPS else ''}://{HOST}"
|
|
|
|
|
f"{'' if PORT == 443 and HAS_HTTPS or PORT == 80 else f':{PORT}'}/user/resetPassword/{verification_id}",
|
|
|
|
|
platformName=PLATFORM_NAME,
|
|
|
|
|
),
|
|
|
|
|
SMTP_TIMEOUT,
|
|
|
|
|
SMTP_FROM_USER,
|
|
|
|
|
user["email_address"],
|
|
|
|
|
email_message["subject"].format(
|
|
|
|
|
first_name=user["first_name"],
|
|
|
|
|
last_name=user["last_name"],
|
|
|
|
|
username=user["username"],
|
|
|
|
|
email=user["email_address"],
|
|
|
|
|
platformName=PLATFORM_NAME,
|
|
|
|
|
),
|
|
|
|
|
SMTP_USER,
|
|
|
|
|
SMTP_PASSWORD,
|
|
|
|
|
use_tls=SMTP_USE_TLS,
|
|
|
|
|
):
|
|
|
|
|
raise HTTPException(500, detail="An error occurred while trying to send mail, please try again later")
|
|
|
|
|
else:
|
|
|
|
|
await EmailVerification.insert(
|
|
|
|
|
EmailVerification(
|
|
|
|
|
{
|
|
|
|
|
EmailVerification.id: verification_id,
|
|
|
|
|
EmailVerification.user: User(public_id=user["id"]),
|
|
|
|
|
EmailVerification.data: bcrypt.hashpw(password.encode(), user["password_hash"][:29])
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if email_address and user["email_address"] != email_address:
|
|
|
|
|
if not user["email_verified"]:
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
status_code=403,
|
|
|
|
|
detail="The email address needs to be verified first",
|
|
|
|
|
)
|
|
|
|
|
email_template = SMTP_TEMPLATES_DIRECTORY / f"{user['locale']}.json"
|
|
|
|
|
try:
|
|
|
|
|
email_template.resolve(strict=True)
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
email_template = SMTP_TEMPLATES_DIRECTORY / "en_US.json"
|
|
|
|
|
with email_template.open() as f:
|
|
|
|
|
email_message = json.load(f)["email_change"]
|
|
|
|
|
verification_id = uuid.uuid4()
|
|
|
|
|
if not await send_email(
|
|
|
|
|
SMTP_HOST,
|
|
|
|
|
SMTP_PORT,
|
|
|
|
|
email_message["content"].format(
|
|
|
|
|
first_name=user["first_name"],
|
|
|
|
|
last_name=user["last_name"],
|
|
|
|
|
username=user["username"],
|
|
|
|
|
email=user["email_address"],
|
|
|
|
|
platformName=PLATFORM_NAME,
|
|
|
|
|
link=f"http{'s' if HAS_HTTPS else ''}://{HOST}"
|
|
|
|
|
f"{'' if PORT == 443 and HAS_HTTPS or PORT == 80 else f':{PORT}'}/user/changeEmail/{verification_id}",
|
|
|
|
|
newMail=email_address,
|
|
|
|
|
),
|
|
|
|
|
SMTP_TIMEOUT,
|
|
|
|
|
SMTP_FROM_USER,
|
|
|
|
|
user["email_address"],
|
|
|
|
|
email_message["subject"].format(
|
|
|
|
|
first_name=user["first_name"],
|
|
|
|
|
last_name=user["last_name"],
|
|
|
|
|
username=user["username"],
|
|
|
|
|
email=user["email_address"],
|
|
|
|
|
platformName=PLATFORM_NAME,
|
|
|
|
|
),
|
|
|
|
|
SMTP_USER,
|
|
|
|
|
SMTP_PASSWORD,
|
|
|
|
|
use_tls=SMTP_USE_TLS,
|
|
|
|
|
):
|
|
|
|
|
raise HTTPException(500, detail="An error occurred while trying to send mail, please try again later")
|
|
|
|
|
else:
|
|
|
|
|
await EmailVerification.insert(EmailVerification({
|
|
|
|
|
EmailVerification.id: verification_id,
|
|
|
|
|
EmailVerification.user: User(public_id=user["id"]),
|
|
|
|
|
EmailVerification.data: email_address.encode()
|
|
|
|
|
}))
|
|
|
|
|
else:
|
|
|
|
|
if not bio:
|
|
|
|
|
user["bio"] = None
|
|
|
|
|
if not profile_picture:
|
|
|
|
|
user["profile_picture"] = None
|
|
|
|
|
fields = []
|
|
|
|
|
for field in user:
|
|
|
|
|
if field != "email_address" and orig_user[field] != user[field]:
|
|
|
|
|
if field not in ["email_address", "password"] and orig_user[field] != user[field]:
|
|
|
|
|
fields.append((field, user[field]))
|
|
|
|
|
if fields:
|
|
|
|
|
# If anything has changed, we update our info
|
|
|
|
|
await User.update({field: value for field, value in fields}).where(User.public_id == user["id"])
|
|
|
|
|
await User.update({field: value for field, value in fields}).where(
|
|
|
|
|
User.public_id == user["id"]
|
|
|
|
|
)
|
|
|
|
|
return {"status_code": 200, "msg": "Changes saved successfully"}
|
|
|
|
|