MailMaker/src/mailMaker.py

1022 lines
56 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Copyright (C) 2021 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import re
import os
import sys
import trio
import json
import httpx
import random
import socket
import string
import getpass
import argparse
import validators
from typing import Union, Any
from console import fg
try:
from console.utils import pause
except ImportError:
# For some reason the lib does not export utils by default ¯\_(ツ)_/¯
print(f"Please edit the console library's __init__.py file at line 58 and add the line 'from . import utils'")
sys.exit(0)
async def send_authenticated_request(
method: str, user: str, password: str, *args, **kwargs
) -> Union[httpx.Response, httpx.RequestError]:
"""
Sends an HTTP request using httpx and basic HTTP authentication
"""
try:
# print(f"calling httpx.{method}({f', '.join(map(repr, args))}, auth=({user!r}, {password!r}), {', '.join(f'{name}={repr(value)}' for name, value in kwargs.items())})")
async with httpx.AsyncClient() as client:
return await getattr(client, method)(*args, **kwargs, auth=(user, password))
except (httpx.RequestError, socket.gaierror, OSError) as e:
return e
async def handle_errors(*args, **kwargs) -> Union[httpx.Response, httpx.RequestError]:
"""
Simple wrapper to avoid copy-pasting the same
error handling code 20 times
"""
if isinstance(resp := await send_authenticated_request(*args, **kwargs), httpx.Response):
match resp.status_code:
case 200 | 201 | 204 | 404 | 400:
return resp
case 401:
print(fg.red("Authentication failed: Unauthorized (wrong credentials)"))
case 403:
print(fg.red("Authentication failed: Forbidden"))
case resp.status_code if resp.status_code >= 500:
print(fg.red(f"The server raised a {resp.status_code} error, maybe try again later?"))
case _:
print(fg.yellow(f"The server returned unexpected status code {resp.status_code}, exiting"))
return resp
def decode_json(response: httpx.Response) -> Any:
"""
Handles potential exceptions from httpx.Response.json():
"""
try:
return response.json()
except json.decoder.JSONDecodeError as json_error:
print(
fg.red(
f"A JSON error occurred while attempting to decode some data -> {type(json_error).__name__}:"
f" {json_error}"
)
)
return ""
_last_color = None
def colored(s: str) -> str:
"""
Calls fg() with a random color each time, but
making sure it doesn't use the same color across
two consecutive calls
"""
global _last_color # Ew...
while (c := random.choice(["blue", "magenta", "red", "cyan", "blue", "purple", "yellow", "green"])) == _last_color:
pass
_last_color = c
return getattr(fg, _last_color)(s)
async def loop(server: str, user: str, password: str) -> int:
"""
Program main loop
"""
prompt_pattern = f"""{fg.green}Welcome to MailMaker! Choose what to do:{fg.default}
{fg.blue}[{fg.green}1{fg.blue}] {colored("List all existing mailboxes")}
{fg.blue}[{fg.green}2{fg.blue}] {colored("Get info about an existing mailbox")}
{fg.blue}[{fg.green}3{fg.blue}] {colored("Create a new mailbox")}
{fg.blue}[{fg.green}4{fg.blue}] {colored("Delete an existing mailbox")}
{fg.blue}[{fg.green}5{fg.blue}] {colored("Update an existing mailbox's information")}
{fg.blue}[{fg.green}6{fg.blue}] {colored("Get in/out statistics for an existing mailbox")}
{fg.blue}[{fg.green}7{fg.blue}] {colored("Get quota limits for an existing mailbox")}
{fg.blue}[{fg.green}8{fg.blue}] {colored("Update quota limits for an existing mailbox")}
{fg.blue}[{fg.green}9{fg.blue}] {colored("List all existing virtual domains")}
{fg.blue}[{fg.green}10{fg.blue}] {colored("Get info about an existing virtual domain")}
{fg.blue}[{fg.green}11{fg.blue}] {colored("Generate a DKIM key pair for an existing virtual domain")}
{fg.blue}[{fg.green}12{fg.blue}] {colored("Get the DKIM key pair for an existing virtual domain")}
{fg.blue}[{fg.green}13{fg.blue}] {colored("Delete the DKIM key pair for an existing virtual domain")}
{fg.blue}[{fg.green}14{fg.blue}] {colored("Get in/out statistics for an existing virtual domain")}
{fg.blue}[{fg.green}15{fg.blue}] {colored("Update an existing virtual domain's information")}
{fg.blue}[{fg.green}16{fg.blue}] {colored("Search a mailbox by address")}"""
skip = False
prompted = False
while True:
try:
# Some unicorn puke :D
prompt = prompt_pattern.format(
*(
(
getattr(
fg, random.choice(["blue", "magenta", "red", "cyan", "blue", "purple", "yellow", "green"])
)
)
for _ in range(len(prompt_pattern.splitlines()) - 1)
)
)
print(prompt)
skip = False
prompted = False
choice = input(fg.purple("Make your choice: "))
prompted = True
if not choice.strip() or not choice.isnumeric():
skip = True
print(fg.yellow("Invalid choice (must be a number)"))
await trio.sleep(2)
else:
# We clear each time because we don't want
# to clear the screen in the default case
match int(choice):
case 1:
os.system("cls||clear")
print(colored("Listing all mailboxes"))
if isinstance(
resp := await handle_errors(
"get",
user,
password,
f"{server}/admin/api/v1/boxes",
params={"page": 1, "paging": 999999},
follow_redirects=True,
),
httpx.Response,
):
if data := decode_json(resp):
print(fg.green(f"Fetched {data['results_count']} addresses"))
for mailbox in data["results"]:
print(f"\t{fg.yellow}- {colored(mailbox['address'])}")
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(2)
os.system("cls||clear")
continue
case 2:
os.system("cls||clear")
address = input(colored("Choose an inbox to get info for: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/boxes/{address}", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided mailbox does not exist"))
elif data := decode_json(resp):
if not data["redirect_to"]:
redirect_list = "None"
else:
redirect_list = ""
for redirect in data["redirect_to"]:
redirect_list += f"\n\t- {redirect}\n"
print(
f"""{colored("Mailbox information for")} {colored(address)}\n\n"""
f"""{colored(f"User: {data['user']}")}\n"""
f"""{colored(f"Name: {data['name'] or 'None'}")}\n"""
f"""{colored(f"System Admin: {'Yes' if data['super_admin'] else 'No'}")}\n"""
f"""{colored(f"Domain Admin: {'Yes' if data['domain_admin'] else 'No'}")}\n"""
f"""{colored(f"Home Dir: {data['home']}")}\n"""
f"""{colored(f"Strict 'From' Header: {'Not Enforced' if data['strict_from_disabled'] else 'Enforced'}")}\n"""
f"""{colored(f"Creation Date: {data['created']}")}\n"""
f"""{colored(f"Last Update Date: {data['updated']}")}\n"""
f"""{colored(f"Redirect Only: {'Yes' if data['redirect_only'] else 'No'}")}\n"""
f"""{colored(f"Redirects to: {redirect_list}")}\n"""
f"""{colored(f"Discards Incoming Mail: {'Yes' if data['discard'] else 'No'}")}\n"""
f"""{colored(f"Disabled: {'Yes' if data['disabled'] else 'No'}")}\n"""
)
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(2)
os.system("cls||clear")
continue
case 3:
os.system("cls||clear")
print(colored("Creating new mailbox"))
name = input(colored("Choose a name for the mailbox: "))
while True:
email = input(colored("Choose an email address for the mailbox: "))
if not validators.email(email):
print(fg.red("Invalid email address"))
continue
break
while True:
mail_password = getpass.getpass(colored("Choose a password for the mailbox (empty for auto-generated password): "))
if not mail_password.strip():
mail_password = "".join((random.choice(string.ascii_letters + string.digits + string.punctuation) for _ in range(random.randint(10, 16))))
print(colored(f"Generated password is {mail_password}"))
break
confirm_password = getpass.getpass(colored("Confirm the password: "))
if mail_password != confirm_password:
print(fg.red("Passwords do not match"))
continue
break
while True:
destinations = input(colored("Is this mailbox a redirect?\nIf so, provide a comma-separated list of destinations here: "))
if destinations and not all((validators.email(mail) for mail in destinations.strip().split(","))):
print(fg.red("One or more destinations are not valid"))
continue
break
confirmation = input(colored("System Admin? [y/N] "))
if confirmation.strip().lower() in ("y", "yes"):
admin = True
else:
admin = False
if (t := destinations.strip().split(",")) == ['']:
# Split has some weird behavior
destinations = []
else:
destinations = t
data = {"name": name,
"email": email,
"passwordPlaintext": mail_password,
"disabled": False,
"referenceId": "", # TODO: Set this?
"superAdmin": admin,
"redirectTo": destinations
}
confirmation = input(colored("Confirm? [y/N] "))
if confirmation.strip().lower() in ("y", "yes"):
if isinstance(
resp := await handle_errors(
"post",
user,
password,
f"{server}/admin/api/v1/boxes",
follow_redirects=True,
data=data
),
httpx.Response,
):
if resp.status_code == 201:
print(colored(f"{email} successfully created!"))
elif resp.status_code == 400:
print(fg.red(f"An error occurred: {decode_json(resp) or resp.content.decode()}"))
elif resp.status_code == 404:
print(fg.red("Cannot create mailbox: the chosen domain is not registered"))
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(2)
os.system("cls||clear")
continue
else:
print(colored("Aborting"))
skip = True
await trio.sleep(2)
case 4:
os.system("cls||clear")
print(colored("Deleting an existing mailbox"))
address = input(colored("Choose an inbox to delete: "))
confirmation = input(colored("Confirm? [y/N] "))
if confirmation.strip().lower() in ("y", "yes"):
if isinstance(
resp := await handle_errors(
"delete",
user,
password,
f"{server}/admin/api/v1/boxes/{address}",
follow_redirects=True,
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided mailbox does not exist"))
elif resp.status_code == 204:
print(colored(f"{address} successfully deleted!"))
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(2)
os.system("cls||clear")
continue
else:
print(colored("Aborting"))
skip = True
await trio.sleep(2)
case 5:
data = {
"passwordPlaintext": "",
"name": "",
"disabled": False,
"superAdmin": False,
}
os.system("cls||clear")
print(colored("Updating an existing mailbox"))
email = input(colored("Select a mailbox to update: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/boxes/{email}", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided mailbox does not exist"))
await trio.sleep(2)
os.system("cls||clear")
continue
elif account_info := decode_json(resp):
data["disabled"] = account_info["disabled"]
data["superAdmin"] = account_info["super_admin"]
data["name"] = account_info["name"]
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(2)
os.system("cls||clear")
continue
print(
f"""{colored("Choose what to update:")}\n"""
f"""{fg.blue}[{fg.green}1{fg.blue}] Update Name\n"""
f"""{fg.blue}[{fg.green}2{fg.blue}] Update Password\n"""
f"""{fg.blue}[{fg.green}3{fg.blue}] Promote to System Admin\n"""
f"""{fg.blue}[{fg.green}4{fg.blue}] Demote from System Admin\n"""
f"""{fg.blue}[{fg.green}5{fg.blue}] Disable account\n"""
f"""{fg.blue}[{fg.green}6{fg.blue}] Enable account\n"""
)
keep = False
while True:
what = input(colored("What information do you want to update? "))
if not what or not what.isnumeric():
print(fg.yellow("Invalid choice (must be a number)"))
await trio.sleep(2)
else:
match int(what):
case 1:
new_name = input(colored(f"New Name for {email} (empty to abort): "))
if data["name"] == new_name:
print(colored(f"The desired name is already set for {email}"))
await trio.sleep(2)
continue
elif new_name.strip():
data["name"] = new_name
keep = True
break
case 2:
new_password = input(colored(f"New Password for {email} (empty to abort): "))
if new_password.strip():
data["passwordPlaintext"] = new_password
keep = True
break
case 3:
if data["superAdmin"]:
print(colored(f"{email} is already a System Admin"))
await trio.sleep(2)
continue
else:
print(colored(f"Promoting {email} to System Admin"))
data["superAdmin"] = True
keep = True
break
case 4:
if not data["superAdmin"]:
print(colored(f"{email} is not a System Admin"))
await trio.sleep(2)
continue
else:
print(colored(f"Demoting {email} from System Admin"))
data["superAdmin"] = False
keep = True
break
case 5:
if data["disabled"]:
print(colored(f"{email} is already disabled"))
await trio.sleep(2)
continue
else:
print(colored(f"Disabling mailbox {email} "))
keep = True
data["disabled"] = True
break
case 6:
if not data["disabled"]:
print(colored(f"{email} is not disabled"))
await trio.sleep(2)
continue
else:
print(colored(f"Enabling mailbox {email} "))
keep = True
data["disabled"] = False
break
case _:
print(fg.yellow("Invalid choice (value out of range)"))
await trio.sleep(2)
if keep:
confirmation = input(colored("Confirm? [y/N] "))
if confirmation.strip().lower() in ("y", "yes"):
if isinstance(
resp := await handle_errors(
"patch",
user,
password,
f"{server}/admin/api/v1/boxes/{email}",
follow_redirects=True,
data=data
),
httpx.Response,
):
if resp.status_code == 404:
print(
fg.red(
"The provided mailbox no longer exists: has it been deleted in the meantime?"
)
)
await trio.sleep(2)
elif resp.status_code == 204:
print(colored("Information Updated"))
else:
print(colored("Aborting"))
skip = True
await trio.sleep(2)
case 6:
os.system("cls||clear")
print(colored("Getting in/out statistics for existing mailbox"))
email = input(colored("Select a mailbox to fetch stats for: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/boxes/{email}/stats", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided mailbox does not exist"))
await trio.sleep(2)
os.system("cls||clear")
continue
elif stats := decode_json(resp):
for day in stats:
print(colored(day.replace("-", "/")))
print(colored(f"\t- In: {stats[day]['in']}\n\t- Out: {stats[day]['out']}"))
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(2)
os.system("cls||clear")
continue
case 7:
os.system("cls||clear")
print(colored("Getting quota limits for an existing mailbox"))
mail = input(colored("Choose a mailbox to get quota info for: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/boxes/{mail}/quota", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided mailbox does not exist"))
elif data := decode_json(resp):
print(
f"""{colored("Quota information for")} {colored(mail)}\n\n"""
f"""{colored(f"Storage Limit: {data['storage_limit']}")}\n"""
f"""{colored(f"Storage Usage: {data['storage_usage']}")}\n"""
f"""{colored(f"Count Limit: {data['count_limit']}")}\n"""
f"""{colored(f"Count Usage: {data['count_usage']}")}\n"""
)
case 8:
os.system("cls||clear")
print(colored("Updating quota limits for an existing mailbox"))
data = {
"storageLimit": 0,
"countLimit": 0,
}
email = input(colored("Select a mailbox to update quotas for: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/boxes/{email}/quota", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided mailbox does not exist"))
await trio.sleep(2)
os.system("cls||clear")
continue
elif account_info := decode_json(resp):
data["storageLimit"] = account_info["storage_limit"]
data["countLimit"] = account_info["count_limit"]
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(2)
os.system("cls||clear")
continue
print(
f"""{colored("Choose what to update:")}\n"""
f"""{fg.blue}[{fg.green}1{fg.blue}] Update Storage Limit\n"""
f"""{fg.blue}[{fg.green}2{fg.blue}] Update Count Limit\n"""
)
keep = False
while True:
what = input(colored("What information do you want to update? "))
if not what or not what.isnumeric():
print(fg.yellow("Invalid choice (must be a number)"))
await trio.sleep(2)
else:
match int(what):
case 1:
new_storage = int(input(colored(f"New storage limit for {email} (current is {data['storageLimit']}, empty to abort): ")))
if data["storageLimit"] == new_storage:
print(colored(f"The desired storage limit is already set to {data['storageLimit']} for {email}"))
await trio.sleep(2)
continue
elif new_storage.strip():
if not new_storage.isnumeric():
print(fg.yellow("Invalid choice (must be a number)"))
await trio.sleep(2)
continue
data["storageLimit"] = new_name
keep = True
break
case 2:
new_count = int(input(colored(f"New count limit for {email} (current is {data['countLimit']}, empty to abort): ")))
if data["countLimit"] == new_count:
print(colored(f"The desired count limit is already set to {data['countLimit']} for {email}"))
await trio.sleep(2)
continue
else:
data["countLimit"] = new_count
keep = True
break
case _:
print(fg.yellow("Invalid choice (value out of range)"))
await trio.sleep(2)
if keep:
confirmation = input(colored("Confirm? [y/N] "))
if confirmation.strip().lower() in ("y", "yes"):
if isinstance(
resp := await handle_errors(
"patch",
user,
password,
f"{server}/admin/api/v1/boxes/{email}/quota",
follow_redirects=True,
data=data
),
httpx.Response,
):
if resp.status_code == 404:
print(
fg.red(
"The provided mailbox no longer exists: has it been deleted in the meantime?"
)
)
await trio.sleep(2)
elif resp.status_code == 204:
print(colored("Quota Updated"))
elif resp.status_code == 500:
print(fg.yellow("Are you trying to update limits for a System Administrator which is now allowed?"))
else:
print(colored("Aborting"))
skip = True
await trio.sleep(2)
case 9:
os.system("cls||clear")
print(colored("Listing all existing virtual domains"))
if isinstance(
resp := await handle_errors(
"get",
user,
password,
f"{server}/admin/api/v1/domains",
params={"page": 1, "paging": 999999},
follow_redirects=True,
),
httpx.Response,
):
if data := decode_json(resp):
print(fg.green(f"Fetched {data['results_count']} domains"))
for domain in data["results"]:
print(f"\t{fg.yellow}- {colored(domain['name'])}")
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(2)
os.system("cls||clear")
continue
case 10:
os.system("cls||clear")
print(colored("Getting info about an existing virtual domain"))
domain = input(colored("Choose a virtual domain to get info for: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/domains/{domain}", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided virtual domain does not exist"))
elif data := decode_json(resp):
print(
f"""{colored("Virtual domain information for")} {colored(domain)}\n\n"""
f"""{colored(f"Home Dir: {data['home']}")}\n"""
f"""{colored(f"Forward: {data['forward_domain'] if data['forward'] else 'No'}")}\n"""
f"""{colored(f"Creation Date: {data['created']}")}\n"""
f"""{colored(f"Last Update Date: {data['updated']}")}\n"""
f"""{colored(f"Domain bin (catch-all): {data['domain_bin_address'] if data['domain_bin'] else 'No'}")}\n"""
f"""{colored(f"Force Route: {data['force_route_host'] if data['force_route'] else 'No'}")}\n"""
)
case 11:
os.system("cls||clear")
print(colored("Generating a DKIM key pair for an existing virtual domain"))
case 12:
os.system("cls||clear")
print(colored("Getting the DKIM key pair for an existing virtual domain"))
case 13:
os.system("cls||clear")
print(colored("Deleting the DKIM key pair for an existing virtual domain"))
case 14:
os.system("cls||clear")
print(colored("Getting in/out statistics for an existing virtual domain"))
domain = input(colored("Select a virtual domain to fetch stats for: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/domains/{domain}/stats",
follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided virtual domain does not exist"))
await trio.sleep(2)
os.system("cls||clear")
continue
elif stats := decode_json(resp):
for day in stats:
print(colored(day.replace("-", "/")))
print(colored(f"\t- In: {stats[day]['in']}\n\t- Out: {stats[day]['out']}"))
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(2)
os.system("cls||clear")
continue
case 15:
os.system("cls||clear")
print(colored("Updating an existing virtual domain"))
data = {"forward": False,
"forwardDomain": "",
"domainBin": False,
"domainBinAddress": "",
"forceRoute": False,
"forceRouteHost": "",
"referenceId": ""
}
domain = input(colored("Select a virtual domain to update: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/domains/{domain}", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided virtual domain does not exist"))
await trio.sleep(2)
os.system("cls||clear")
continue
elif domain_info := decode_json(resp):
data["forward"] = domain_info["forward"]
data["domainBin"] = domain_info["domain_bin"]
data["forceRoute"] = domain_info["force_route"]
if domain_info["forward"]:
data["forwardDomain"] = domain_info["forward_domain"]
if domain_info["force_route"]:
data["forceRouteHost"] = domain_info["force_route_host"]
if domain_info["domain_bin"]:
data["domainBinAddress"] = domain_info["domain_bin_address"]
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(2)
os.system("cls||clear")
continue
print(
f"""{colored("Choose what to update:")}\n"""
f"""{fg.blue}[{fg.green}1{fg.blue}] Set/Update domain bin (catch-all)\n"""
f"""{fg.blue}[{fg.green}2{fg.blue}] Set/Update forced routing\n"""
f"""{fg.blue}[{fg.green}3{fg.blue}] Set/Update forward domain\n"""
f"""{fg.blue}[{fg.green}4{fg.blue}] Unset domain bin (catch-all)\n"""
f"""{fg.blue}[{fg.green}5{fg.blue}] Unset forced routing\n"""
f"""{fg.blue}[{fg.green}6{fg.blue}] Unset forward domain\n"""
)
keep = False
while True:
what = input(colored("What information do you want to update? "))
if not what or not what.isnumeric():
print(fg.yellow("Invalid choice (must be a number)"))
await trio.sleep(2)
else:
match int(what):
case 1:
if data["domainBin"]:
print(colored(f"Note: current catch-all for {domain} is set to {data['domainBinAddress']}"))
new_bin = input(colored(f"New domain catch-all for {domain} (empty to abort): "))
if data["domainBinAddress"] and data["domainBinAddress"] == new_bin:
print(colored(f"The desired catch-all is already set to {new_bin} for {domain}"))
elif new_bin.strip():
data["domainBinAddress"] = new_bin
data["domainBin"] = True
keep = True
break
case 2:
if data["forceRoute"]:
print(colored(f"Note: {domain} is currently routed trough {data['forceRouteHost']}"))
while True:
new_routing = input(colored(f"Set forced routing for {domain} (empty to abort): "))
if not new_routing.strip():
break
# I could've one-lined all these nested ifs but it'd look awful so shut up
if not validators.domain(new_routing):
# Not a domain
if not validators.ipv4(new_routing) or validators.ipv6(new_routing):
# Not a plain IPV4/IPV6
if ':' in new_routing:
# Is it an IP:port pair?
split = new_routing.split(":", maxsplit=1)
if not split[1].isnumeric() or not (validators.ipv4(split[0]) or validators.ipv6(split[0])):
# Nope, not a valid host
print(fg.red("The provided host is not valid (not a domain, IP, nor an IP:port pair)"))
await trio.sleep(2)
continue
break
if new_routing.strip():
if data["forceRoute"] and data["forceRouteHost"] == new_routing.strip():
print(colored("The virtual domain {domain} is already routed trough {new_routing}"))
await trio.sleep(2)
continue
data["forceRouteHost"] = new_routing
data["forceRoute"] = True
keep = True
break
case 3:
while True:
forward_address = input(colored("Choose a new forwarding address: "))
if validators.email(forward_address):
break
else:
print(colored("Not a valid email address"))
if data["forward"]:
print(colored(f"Note: {domain} currently forwards to {data['forwardAddress']}"))
if data["forward"] and data["forwardAddress"] == forward_address:
print(colored(f"{domain} already forwards to {data['forwardAddress']}"))
else:
data["forward"] = True
data["forwardAddress"] = True
keep = True
break
case _:
print(fg.yellow("Invalid choice (value out of range)"))
await trio.sleep(2)
if keep:
confirmation = input(colored("Confirm? [y/N] "))
if confirmation.strip().lower() in ("y", "yes"):
if isinstance(
resp := await handle_errors(
"patch",
user,
password,
f"{server}/admin/api/v1/domains/{domain}",
follow_redirects=True,
data=data
),
httpx.Response,
):
if resp.status_code == 400:
if err := decode_json(resp):
print(fg.red(f"An error occurred: {err['message']}"))
print(fg.red("Errors: "))
for error_kind in err["errors"]["children"].keys():
if not err["errors"]["children"][error_kind]:
continue
print(fg.red(f" - Error kind: {error_kind}"))
for error_message in err["errors"]["children"][error_kind]["errors"]:
print(fg.red(f" - {error_message}"))
if resp.status_code == 404:
print(
fg.red(
"The provided domain no longer exists: has it been deleted in the meantime?"
)
)
await trio.sleep(2)
elif resp.status_code == 204:
print(colored("Information Updated"))
else:
print(colored("Aborting"))
skip = True
await trio.sleep(2)
case 16:
os.system("cls||clear")
print(colored("Searching a mailbox by address"))
case _:
skip = True
print(fg.yellow("Invalid choice (value out of range)"))
await trio.sleep(2)
os.system("cls||clear")
if not skip:
pause(colored("Press any key to continue"))
# Awful, but it works so shut up
os.system("cls||clear")
except KeyboardInterrupt:
os.system("cls||clear")
# This way when you press Ctrl+C
# or Ctrl+D from an inner section
# you are brought back to the
# main menu
if not prompted:
return 0
except KeyError as k:
print(fg.red("The JSON response from the API was missing an expected field ({k!r}), has the version changed?"))
await trio.sleep(2)
os.system("cls||clear")
except EOFError:
os.system("cls||clear")
if not prompted:
return -1
# This tool technically doesn't *need* to be async, but shut up
async def main(args: argparse.Namespace) -> int:
"""
Main program entry point
"""
user, password = (None, None)
if not re.match(r"http(s)?://", args.server_url):
print(fg.red("Missing or invalid schema in server URL!"))
return -1
elif not validators.url(args.server_url, public=True):
# bool(ValidationFailure) is False :)
print(fg.red("Invalid server URL, exiting"))
return -1
if os.path.isfile("credentials.json"):
try:
with open("credentials.json") as credentials:
data = json.load(credentials)
except json.decoder.JSONDecodeError as json_error:
print(
fg.red(
f"A JSON error occurred while attempting to load credentials.json -> {type(json_error).__name__}:"
f" {json_error}"
)
)
except PermissionError:
print(
fg.red(
"Could not read credentials.json due to a permission issue. Make sure the file is readable by"
" the current user and try again"
)
)
except OSError as os_error:
print(
fg.red(
f"Could not read credentials.json due to a generic OS error -> {type(os_error).__name__}:"
f"{os_error}"
)
)
else:
try:
user, password = data["username"], data["password"]
except KeyError:
print(fg.yellow("The JSON payload in credentials.json is not valid"))
if not all([user, password]) and not all([args.username, args.password]):
print(
fg.yellow("Credentials could not be loaded and an explicit username/password pair wasn't provided, exiting")
)
return -1
elif all([args.username, args.password]):
user, password = args.username, args.password
if args.save:
print(fg.green(f"Saving credentials for {user} to credentials.json"))
try:
with open("credentials.json", "w") as save_file:
json.dump({"username": user, "password": password}, save_file)
except PermissionError:
print(
fg.red(
"Could not write credentials.json due to a permission issue. Make sure the file is readable by"
" the current user and try again"
)
)
except OSError as os_error:
print(
fg.red(
f"Could not write credentials.json due to a generic OS error -> {type(os_error).__name__}:"
f"{os_error}"
)
)
else:
print(fg.green(f"Loaded credentials for {user} from credentials.json"))
print(fg.yellow(f"Attempting login with {user} to {args.server_url}"))
if isinstance(
resp := await handle_errors("get", user, password, f"{args.server_url}/admin/api/doc", follow_redirects=True),
httpx.Response,
):
if resp.status_code == 200:
print(fg.green("Authentication successful!"))
return await loop(args.server_url, user, password)
else:
print(fg.red(f"An error occurred while attempting to log in -> {type(resp).__name__} -> {resp}"))
return -1
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A simple tool to interact with the REST API of poste.io instances")
parser.add_argument("--username", help="The username to authenticate with the API")
parser.add_argument("--password", help="The password to authenticate with the API")
parser.add_argument(
"--save",
help="Saves credentials in a credentials.json file for future use. Overwrites any existing credentials file",
action="store_true",
default=False,
)
parser.add_argument(
"--server-url", help="The URL of your poste.io instance. MUST include an http(s):// prefix", required=True
)
parser.add_argument(
"--nocolor",
help="Disables the unicorn puke and outputs plain old boring text",
action="store_true",
default=False,
)
arguments = parser.parse_args()
if arguments.nocolor:
# This hack is awful, but it works and is much better than using
# parameters or globals everywhere. All hail dynamic languages
fg = type(
"fg",
(object,),
{
"__getattr__": lambda self, _: type(
"i", (object,), {"__repr__": lambda _: "", "__call__": lambda _, x: x}
)(),
},
)()
try:
os.system("cls||clear")
print(colored("MailMaker v0.0.1, made with <3 and Python 3.8 by @nocturn9x"))
sys.exit(trio.run(main, parser.parse_args()))
except KeyboardInterrupt:
os.system("cls||clear")
sys.exit(0)