#!/usr/bin/env python3 """ Copyright (C) 2021 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import re import os import sys import trio import json import httpx import random import socket import string import getpass import argparse import validators from typing import Union, Any from console import fg try: from console.utils import pause except ImportError: # For some reason the lib does not export utils by default ¯\_(ツ)_/¯ print(f"Please edit the console library's __init__.py file at line 58 and add the line 'from . import utils'") sys.exit(0) async def send_authenticated_request( method: str, user: str, password: str, *args, **kwargs ) -> Union[httpx.Response, httpx.RequestError]: """ Sends an HTTP request using httpx and basic HTTP authentication """ try: # print(f"calling httpx.{method}({f', '.join(map(repr, args))}, auth=({user!r}, {password!r}), {', '.join(f'{name}={repr(value)}' for name, value in kwargs.items())})") async with httpx.AsyncClient() as client: return await getattr(client, method)(*args, **kwargs, auth=(user, password)) except (httpx.RequestError, socket.gaierror, OSError) as e: return e async def handle_errors(*args, **kwargs) -> Union[httpx.Response, httpx.RequestError]: """ Simple wrapper to avoid copy-pasting the same error handling code 20 times """ if isinstance(resp := await send_authenticated_request(*args, **kwargs), httpx.Response): match resp.status_code: case 200 | 201 | 204 | 404 | 400: return resp case 401: print(fg.red("Authentication failed: Unauthorized (wrong credentials)")) case 403: print(fg.red("Authentication failed: Forbidden")) case resp.status_code if resp.status_code >= 500: print(fg.red(f"The server raised a {resp.status_code} error, maybe try again later?")) case _: print(fg.yellow(f"The server returned unexpected status code {resp.status_code}, exiting")) return resp def decode_json(response: httpx.Response) -> Any: """ Handles potential exceptions from httpx.Response.json(): """ try: return response.json() except json.decoder.JSONDecodeError as json_error: print( fg.red( f"A JSON error occurred while attempting to decode some data -> {type(json_error).__name__}:" f" {json_error}" ) ) return "" _last_color = None def colored(s: str) -> str: """ Calls fg() with a random color each time, but making sure it doesn't use the same color across two consecutive calls """ global _last_color # Ew... while (c := random.choice(["blue", "magenta", "red", "cyan", "blue", "purple", "yellow", "green"])) == _last_color: pass _last_color = c return getattr(fg, _last_color)(s) async def loop(server: str, user: str, password: str) -> int: """ Program main loop """ prompt_pattern = f"""{fg.green}Welcome to MailMaker! Choose what to do:{fg.default} {fg.blue}[{fg.green}1{fg.blue}] {colored("List all existing mailboxes")} {fg.blue}[{fg.green}2{fg.blue}] {colored("Get info about an existing mailbox")} {fg.blue}[{fg.green}3{fg.blue}] {colored("Create a new mailbox")} {fg.blue}[{fg.green}4{fg.blue}] {colored("Delete an existing mailbox")} {fg.blue}[{fg.green}5{fg.blue}] {colored("Update an existing mailbox's information")} {fg.blue}[{fg.green}6{fg.blue}] {colored("Get in/out statistics for an existing mailbox")} {fg.blue}[{fg.green}7{fg.blue}] {colored("Get quota limits for an existing mailbox")} {fg.blue}[{fg.green}8{fg.blue}] {colored("Update quota limits for an existing mailbox")} {fg.blue}[{fg.green}9{fg.blue}] {colored("List all existing virtual domains")} {fg.blue}[{fg.green}10{fg.blue}] {colored("Get info about an existing virtual domain")} {fg.blue}[{fg.green}11{fg.blue}] {colored("Generate a DKIM key pair for an existing virtual domain")} {fg.blue}[{fg.green}12{fg.blue}] {colored("Get the DKIM key pair for an existing virtual domain")} {fg.blue}[{fg.green}13{fg.blue}] {colored("Delete the DKIM key pair for an existing virtual domain")} {fg.blue}[{fg.green}14{fg.blue}] {colored("Get in/out statistics for an existing virtual domain")} {fg.blue}[{fg.green}15{fg.blue}] {colored("Update an existing virtual domain's information")} {fg.blue}[{fg.green}16{fg.blue}] {colored("Search a mailbox by address")}""" skip = False prompted = False while True: try: # Some unicorn puke :D prompt = prompt_pattern.format( *( ( getattr( fg, random.choice(["blue", "magenta", "red", "cyan", "blue", "purple", "yellow", "green"]) ) ) for _ in range(len(prompt_pattern.splitlines()) - 1) ) ) print(prompt) skip = False prompted = False choice = input(fg.purple("Make your choice: ")) prompted = True if not choice.strip() or not choice.isnumeric(): skip = True print(fg.yellow("Invalid choice (must be a number)")) await trio.sleep(2) else: # We clear each time because we don't want # to clear the screen in the default case match int(choice): case 1: os.system("cls||clear") print(colored("Listing all mailboxes")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/boxes", params={"page": 1, "paging": 999999}, follow_redirects=True, ), httpx.Response, ): if data := decode_json(resp): print(fg.green(f"Fetched {data['results_count']} addresses")) for mailbox in data["results"]: print(f"\t{fg.yellow}- {colored(mailbox['address'])}") else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(2) os.system("cls||clear") continue case 2: os.system("cls||clear") address = input(colored("Choose an inbox to get info for: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/boxes/{address}", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) elif data := decode_json(resp): if not data["redirect_to"]: redirect_list = "None" else: redirect_list = "" for redirect in data["redirect_to"]: redirect_list += f"\n\t- {redirect}\n" print( f"""{colored("Mailbox information for")} {colored(address)}\n\n""" f"""{colored(f"User: {data['user']}")}\n""" f"""{colored(f"Name: {data['name'] or 'None'}")}\n""" f"""{colored(f"System Admin: {'Yes' if data['super_admin'] else 'No'}")}\n""" f"""{colored(f"Domain Admin: {'Yes' if data['domain_admin'] else 'No'}")}\n""" f"""{colored(f"Home Dir: {data['home']}")}\n""" f"""{colored(f"Strict 'From' Header: {'Not Enforced' if data['strict_from_disabled'] else 'Enforced'}")}\n""" f"""{colored(f"Creation Date: {data['created']}")}\n""" f"""{colored(f"Last Update Date: {data['updated']}")}\n""" f"""{colored(f"Redirect Only: {'Yes' if data['redirect_only'] else 'No'}")}\n""" f"""{colored(f"Redirects to: {redirect_list}")}\n""" f"""{colored(f"Discards Incoming Mail: {'Yes' if data['discard'] else 'No'}")}\n""" f"""{colored(f"Disabled: {'Yes' if data['disabled'] else 'No'}")}\n""" ) else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(2) os.system("cls||clear") continue case 3: os.system("cls||clear") print(colored("Creating new mailbox")) name = input(colored("Choose a name for the mailbox: ")) while True: email = input(colored("Choose an email address for the mailbox: ")) if not validators.email(email): print(fg.red("Invalid email address")) continue break while True: mail_password = getpass.getpass(colored("Choose a password for the mailbox (empty for auto-generated password): ")) if not mail_password.strip(): mail_password = "".join((random.choice(string.ascii_letters + string.digits + string.punctuation) for _ in range(random.randint(10, 16)))) print(colored(f"Generated password is {mail_password}")) break confirm_password = getpass.getpass(colored("Confirm the password: ")) if mail_password != confirm_password: print(fg.red("Passwords do not match")) continue break while True: destinations = input(colored("Is this mailbox a redirect?\nIf so, provide a comma-separated list of destinations here: ")) if destinations and not all((validators.email(mail) for mail in destinations.strip().split(","))): print(fg.red("One or more destinations are not valid")) continue break confirmation = input(colored("System Admin? [y/N] ")) if confirmation.strip().lower() in ("y", "yes"): admin = True else: admin = False if (t := destinations.strip().split(",")) == ['']: # Split has some weird behavior destinations = [] else: destinations = t data = {"name": name, "email": email, "passwordPlaintext": mail_password, "disabled": False, "referenceId": "", # TODO: Set this? "superAdmin": admin, "redirectTo": destinations } confirmation = input(colored("Confirm? [y/N] ")) if confirmation.strip().lower() in ("y", "yes"): if isinstance( resp := await handle_errors( "post", user, password, f"{server}/admin/api/v1/boxes", follow_redirects=True, data=data ), httpx.Response, ): if resp.status_code == 201: print(colored(f"{email} successfully created!")) elif resp.status_code == 400: print(fg.red(f"An error occurred: {decode_json(resp) or resp.content.decode()}")) elif resp.status_code == 404: print(fg.red("Cannot create mailbox: the chosen domain is not registered")) else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(2) os.system("cls||clear") continue else: print(colored("Aborting")) skip = True await trio.sleep(2) case 4: os.system("cls||clear") print(colored("Deleting an existing mailbox")) address = input(colored("Choose an inbox to delete: ")) confirmation = input(colored("Confirm? [y/N] ")) if confirmation.strip().lower() in ("y", "yes"): if isinstance( resp := await handle_errors( "delete", user, password, f"{server}/admin/api/v1/boxes/{address}", follow_redirects=True, ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) elif resp.status_code == 204: print(colored(f"{address} successfully deleted!")) else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(2) os.system("cls||clear") continue else: print(colored("Aborting")) skip = True await trio.sleep(2) case 5: data = { "passwordPlaintext": "", "name": "", "disabled": False, "superAdmin": False, } os.system("cls||clear") print(colored("Updating an existing mailbox")) email = input(colored("Select a mailbox to update: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/boxes/{email}", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) await trio.sleep(2) os.system("cls||clear") continue elif account_info := decode_json(resp): data["disabled"] = account_info["disabled"] data["superAdmin"] = account_info["super_admin"] data["name"] = account_info["name"] else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(2) os.system("cls||clear") continue print( f"""{colored("Choose what to update:")}\n""" f"""{fg.blue}[{fg.green}1{fg.blue}] Update Name\n""" f"""{fg.blue}[{fg.green}2{fg.blue}] Update Password\n""" f"""{fg.blue}[{fg.green}3{fg.blue}] Promote to System Admin\n""" f"""{fg.blue}[{fg.green}4{fg.blue}] Demote from System Admin\n""" f"""{fg.blue}[{fg.green}5{fg.blue}] Disable account\n""" f"""{fg.blue}[{fg.green}6{fg.blue}] Enable account\n""" ) keep = False while True: what = input(colored("What information do you want to update? ")) if not what or not what.isnumeric(): print(fg.yellow("Invalid choice (must be a number)")) await trio.sleep(2) else: match int(what): case 1: new_name = input(colored(f"New Name for {email} (empty to abort): ")) if data["name"] == new_name: print(colored(f"The desired name is already set for {email}")) await trio.sleep(2) continue elif new_name.strip(): data["name"] = new_name keep = True break case 2: new_password = input(colored(f"New Password for {email} (empty to abort): ")) if new_password.strip(): data["passwordPlaintext"] = new_password keep = True break case 3: if data["superAdmin"]: print(colored(f"{email} is already a System Admin")) await trio.sleep(2) continue else: print(colored(f"Promoting {email} to System Admin")) data["superAdmin"] = True keep = True break case 4: if not data["superAdmin"]: print(colored(f"{email} is not a System Admin")) await trio.sleep(2) continue else: print(colored(f"Demoting {email} from System Admin")) data["superAdmin"] = False keep = True break case 5: if data["disabled"]: print(colored(f"{email} is already disabled")) await trio.sleep(2) continue else: print(colored(f"Disabling mailbox {email} ")) keep = True data["disabled"] = True break case 6: if not data["disabled"]: print(colored(f"{email} is not disabled")) await trio.sleep(2) continue else: print(colored(f"Enabling mailbox {email} ")) keep = True data["disabled"] = False break case _: print(fg.yellow("Invalid choice (value out of range)")) await trio.sleep(2) if keep: confirmation = input(colored("Confirm? [y/N] ")) if confirmation.strip().lower() in ("y", "yes"): if isinstance( resp := await handle_errors( "patch", user, password, f"{server}/admin/api/v1/boxes/{email}", follow_redirects=True, data=data ), httpx.Response, ): if resp.status_code == 404: print( fg.red( "The provided mailbox no longer exists: has it been deleted in the meantime?" ) ) await trio.sleep(2) elif resp.status_code == 204: print(colored("Information Updated")) else: print(colored("Aborting")) skip = True await trio.sleep(2) case 6: os.system("cls||clear") print(colored("Getting in/out statistics for existing mailbox")) email = input(colored("Select a mailbox to fetch stats for: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/boxes/{email}/stats", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) await trio.sleep(2) os.system("cls||clear") continue elif stats := decode_json(resp): for day in stats: print(colored(day.replace("-", "/"))) print(colored(f"\t- In: {stats[day]['in']}\n\t- Out: {stats[day]['out']}")) else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(2) os.system("cls||clear") continue case 7: os.system("cls||clear") print(colored("Getting quota limits for an existing mailbox")) mail = input(colored("Choose a mailbox to get quota info for: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/boxes/{mail}/quota", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) elif data := decode_json(resp): print( f"""{colored("Quota information for")} {colored(mail)}\n\n""" f"""{colored(f"Storage Limit: {data['storage_limit']}")}\n""" f"""{colored(f"Storage Usage: {data['storage_usage']}")}\n""" f"""{colored(f"Count Limit: {data['count_limit']}")}\n""" f"""{colored(f"Count Usage: {data['count_usage']}")}\n""" ) case 8: os.system("cls||clear") print(colored("Updating quota limits for an existing mailbox")) data = { "storageLimit": 0, "countLimit": 0, } email = input(colored("Select a mailbox to update quotas for: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/boxes/{email}/quota", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) await trio.sleep(2) os.system("cls||clear") continue elif account_info := decode_json(resp): data["storageLimit"] = account_info["storage_limit"] data["countLimit"] = account_info["count_limit"] else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(2) os.system("cls||clear") continue print( f"""{colored("Choose what to update:")}\n""" f"""{fg.blue}[{fg.green}1{fg.blue}] Update Storage Limit\n""" f"""{fg.blue}[{fg.green}2{fg.blue}] Update Count Limit\n""" ) keep = False while True: what = input(colored("What information do you want to update? ")) if not what or not what.isnumeric(): print(fg.yellow("Invalid choice (must be a number)")) await trio.sleep(2) else: match int(what): case 1: new_storage = int(input(colored(f"New storage limit for {email} (current is {data['storageLimit']}, empty to abort): "))) if data["storageLimit"] == new_storage: print(colored(f"The desired storage limit is already set to {data['storageLimit']} for {email}")) await trio.sleep(2) continue elif new_storage.strip(): if not new_storage.isnumeric(): print(fg.yellow("Invalid choice (must be a number)")) await trio.sleep(2) continue data["storageLimit"] = new_name keep = True break case 2: new_count = int(input(colored(f"New count limit for {email} (current is {data['countLimit']}, empty to abort): "))) if data["countLimit"] == new_count: print(colored(f"The desired count limit is already set to {data['countLimit']} for {email}")) await trio.sleep(2) continue else: data["countLimit"] = new_count keep = True break case _: print(fg.yellow("Invalid choice (value out of range)")) await trio.sleep(2) if keep: confirmation = input(colored("Confirm? [y/N] ")) if confirmation.strip().lower() in ("y", "yes"): if isinstance( resp := await handle_errors( "patch", user, password, f"{server}/admin/api/v1/boxes/{email}/quota", follow_redirects=True, data=data ), httpx.Response, ): if resp.status_code == 404: print( fg.red( "The provided mailbox no longer exists: has it been deleted in the meantime?" ) ) await trio.sleep(2) elif resp.status_code == 204: print(colored("Quota Updated")) elif resp.status_code == 500: print(fg.yellow("Are you trying to update limits for a System Administrator which is now allowed?")) else: print(colored("Aborting")) skip = True await trio.sleep(2) case 9: os.system("cls||clear") print(colored("Listing all existing virtual domains")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/domains", params={"page": 1, "paging": 999999}, follow_redirects=True, ), httpx.Response, ): if data := decode_json(resp): print(fg.green(f"Fetched {data['results_count']} domains")) for domain in data["results"]: print(f"\t{fg.yellow}- {colored(domain['name'])}") else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(2) os.system("cls||clear") continue case 10: os.system("cls||clear") print(colored("Getting info about an existing virtual domain")) domain = input(colored("Choose a virtual domain to get info for: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/domains/{domain}", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided virtual domain does not exist")) elif data := decode_json(resp): print( f"""{colored("Virtual domain information for")} {colored(domain)}\n\n""" f"""{colored(f"Home Dir: {data['home']}")}\n""" f"""{colored(f"Forward: {data['forward_domain'] if data['forward'] else 'No'}")}\n""" f"""{colored(f"Creation Date: {data['created']}")}\n""" f"""{colored(f"Last Update Date: {data['updated']}")}\n""" f"""{colored(f"Domain bin (catch-all): {data['domain_bin_address'] if data['domain_bin'] else 'No'}")}\n""" f"""{colored(f"Force Route: {data['force_route_host'] if data['force_route'] else 'No'}")}\n""" ) case 11: os.system("cls||clear") print(colored("Generating a DKIM key pair for an existing virtual domain")) case 12: os.system("cls||clear") print(colored("Getting the DKIM key pair for an existing virtual domain")) case 13: os.system("cls||clear") print(colored("Deleting the DKIM key pair for an existing virtual domain")) case 14: os.system("cls||clear") print(colored("Getting in/out statistics for an existing virtual domain")) domain = input(colored("Select a virtual domain to fetch stats for: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/domains/{domain}/stats", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided virtual domain does not exist")) await trio.sleep(2) os.system("cls||clear") continue elif stats := decode_json(resp): for day in stats: print(colored(day.replace("-", "/"))) print(colored(f"\t- In: {stats[day]['in']}\n\t- Out: {stats[day]['out']}")) else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(2) os.system("cls||clear") continue case 15: os.system("cls||clear") print(colored("Updating an existing virtual domain")) data = {"forward": False, "forwardDomain": "", "domainBin": False, "domainBinAddress": "", "forceRoute": False, "forceRouteHost": "", "referenceId": "" } domain = input(colored("Select a virtual domain to update: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/domains/{domain}", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided virtual domain does not exist")) await trio.sleep(2) os.system("cls||clear") continue elif domain_info := decode_json(resp): data["forward"] = domain_info["forward"] data["domainBin"] = domain_info["domain_bin"] data["forceRoute"] = domain_info["force_route"] if domain_info["forward"]: data["forwardDomain"] = domain_info["forward_domain"] if domain_info["force_route"]: data["forceRouteHost"] = domain_info["force_route_host"] if domain_info["domain_bin"]: data["domainBinAddress"] = domain_info["domain_bin_address"] else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(2) os.system("cls||clear") continue print( f"""{colored("Choose what to update:")}\n""" f"""{fg.blue}[{fg.green}1{fg.blue}] Set/Update domain bin (catch-all)\n""" f"""{fg.blue}[{fg.green}2{fg.blue}] Set/Update forced routing\n""" f"""{fg.blue}[{fg.green}3{fg.blue}] Set/Update forward domain\n""" f"""{fg.blue}[{fg.green}4{fg.blue}] Unset domain bin (catch-all)\n""" f"""{fg.blue}[{fg.green}5{fg.blue}] Unset forced routing\n""" f"""{fg.blue}[{fg.green}6{fg.blue}] Unset forward domain\n""" ) keep = False while True: what = input(colored("What information do you want to update? ")) if not what or not what.isnumeric(): print(fg.yellow("Invalid choice (must be a number)")) await trio.sleep(2) else: match int(what): case 1: if data["domainBin"]: print(colored(f"Note: current catch-all for {domain} is set to {data['domainBinAddress']}")) new_bin = input(colored(f"New domain catch-all for {domain} (empty to abort): ")) if data["domainBinAddress"] and data["domainBinAddress"] == new_bin: print(colored(f"The desired catch-all is already set to {new_bin} for {domain}")) elif new_bin.strip(): data["domainBinAddress"] = new_bin data["domainBin"] = True keep = True break case 2: if data["forceRoute"]: print(colored(f"Note: {domain} is currently routed trough {data['forceRouteHost']}")) while True: new_routing = input(colored(f"Set forced routing for {domain} (empty to abort): ")) if not new_routing.strip(): break # I could've one-lined all these nested ifs but it'd look awful so shut up if not validators.domain(new_routing): # Not a domain if not validators.ipv4(new_routing) or validators.ipv6(new_routing): # Not a plain IPV4/IPV6 if ':' in new_routing: # Is it an IP:port pair? split = new_routing.split(":", maxsplit=1) if not split[1].isnumeric() or not (validators.ipv4(split[0]) or validators.ipv6(split[0])): # Nope, not a valid host print(fg.red("The provided host is not valid (not a domain, IP, nor an IP:port pair)")) await trio.sleep(2) continue break if new_routing.strip(): if data["forceRoute"] and data["forceRouteHost"] == new_routing.strip(): print(colored("The virtual domain {domain} is already routed trough {new_routing}")) await trio.sleep(2) continue data["forceRouteHost"] = new_routing data["forceRoute"] = True keep = True break case 3: while True: forward_address = input(colored("Choose a new forwarding address: ")) if validators.email(forward_address): break else: print(colored("Not a valid email address")) if data["forward"]: print(colored(f"Note: {domain} currently forwards to {data['forwardAddress']}")) if data["forward"] and data["forwardAddress"] == forward_address: print(colored(f"{domain} already forwards to {data['forwardAddress']}")) else: data["forward"] = True data["forwardAddress"] = True keep = True break case _: print(fg.yellow("Invalid choice (value out of range)")) await trio.sleep(2) if keep: confirmation = input(colored("Confirm? [y/N] ")) if confirmation.strip().lower() in ("y", "yes"): if isinstance( resp := await handle_errors( "patch", user, password, f"{server}/admin/api/v1/domains/{domain}", follow_redirects=True, data=data ), httpx.Response, ): if resp.status_code == 400: if err := decode_json(resp): print(fg.red(f"An error occurred: {err['message']}")) print(fg.red("Errors: ")) for error_kind in err["errors"]["children"].keys(): if not err["errors"]["children"][error_kind]: continue print(fg.red(f" - Error kind: {error_kind}")) for error_message in err["errors"]["children"][error_kind]["errors"]: print(fg.red(f" - {error_message}")) if resp.status_code == 404: print( fg.red( "The provided domain no longer exists: has it been deleted in the meantime?" ) ) await trio.sleep(2) elif resp.status_code == 204: print(colored("Information Updated")) else: print(colored("Aborting")) skip = True await trio.sleep(2) case 16: os.system("cls||clear") print(colored("Searching a mailbox by address")) case _: skip = True print(fg.yellow("Invalid choice (value out of range)")) await trio.sleep(2) os.system("cls||clear") if not skip: pause(colored("Press any key to continue")) # Awful, but it works so shut up os.system("cls||clear") except KeyboardInterrupt: os.system("cls||clear") # This way when you press Ctrl+C # or Ctrl+D from an inner section # you are brought back to the # main menu if not prompted: return 0 except KeyError as k: print(fg.red("The JSON response from the API was missing an expected field ({k!r}), has the version changed?")) await trio.sleep(2) os.system("cls||clear") except EOFError: os.system("cls||clear") if not prompted: return -1 # This tool technically doesn't *need* to be async, but shut up async def main(args: argparse.Namespace) -> int: """ Main program entry point """ user, password = (None, None) if not re.match(r"http(s)?://", args.server_url): print(fg.red("Missing or invalid schema in server URL!")) return -1 elif not validators.url(args.server_url, public=True): # bool(ValidationFailure) is False :) print(fg.red("Invalid server URL, exiting")) return -1 if os.path.isfile("credentials.json"): try: with open("credentials.json") as credentials: data = json.load(credentials) except json.decoder.JSONDecodeError as json_error: print( fg.red( f"A JSON error occurred while attempting to load credentials.json -> {type(json_error).__name__}:" f" {json_error}" ) ) except PermissionError: print( fg.red( "Could not read credentials.json due to a permission issue. Make sure the file is readable by" " the current user and try again" ) ) except OSError as os_error: print( fg.red( f"Could not read credentials.json due to a generic OS error -> {type(os_error).__name__}:" f"{os_error}" ) ) else: try: user, password = data["username"], data["password"] except KeyError: print(fg.yellow("The JSON payload in credentials.json is not valid")) if not all([user, password]) and not all([args.username, args.password]): print( fg.yellow("Credentials could not be loaded and an explicit username/password pair wasn't provided, exiting") ) return -1 elif all([args.username, args.password]): user, password = args.username, args.password if args.save: print(fg.green(f"Saving credentials for {user} to credentials.json")) try: with open("credentials.json", "w") as save_file: json.dump({"username": user, "password": password}, save_file) except PermissionError: print( fg.red( "Could not write credentials.json due to a permission issue. Make sure the file is readable by" " the current user and try again" ) ) except OSError as os_error: print( fg.red( f"Could not write credentials.json due to a generic OS error -> {type(os_error).__name__}:" f"{os_error}" ) ) else: print(fg.green(f"Loaded credentials for {user} from credentials.json")) print(fg.yellow(f"Attempting login with {user} to {args.server_url}")) if isinstance( resp := await handle_errors("get", user, password, f"{args.server_url}/admin/api/doc", follow_redirects=True), httpx.Response, ): if resp.status_code == 200: print(fg.green("Authentication successful!")) return await loop(args.server_url, user, password) else: print(fg.red(f"An error occurred while attempting to log in -> {type(resp).__name__} -> {resp}")) return -1 if __name__ == "__main__": parser = argparse.ArgumentParser(description="A simple tool to interact with the REST API of poste.io instances") parser.add_argument("--username", help="The username to authenticate with the API") parser.add_argument("--password", help="The password to authenticate with the API") parser.add_argument( "--save", help="Saves credentials in a credentials.json file for future use. Overwrites any existing credentials file", action="store_true", default=False, ) parser.add_argument( "--server-url", help="The URL of your poste.io instance. MUST include an http(s):// prefix", required=True ) parser.add_argument( "--nocolor", help="Disables the unicorn puke and outputs plain old boring text", action="store_true", default=False, ) arguments = parser.parse_args() if arguments.nocolor: # This hack is awful, but it works and is much better than using # parameters or globals everywhere. All hail dynamic languages fg = type( "fg", (object,), { "__getattr__": lambda self, _: type( "i", (object,), {"__repr__": lambda _: "", "__call__": lambda _, x: x} )(), }, )() try: os.system("cls||clear") print(colored("MailMaker v0.0.1, made with <3 and Python 3.8 by @nocturn9x")) sys.exit(trio.run(main, parser.parse_args())) except KeyboardInterrupt: os.system("cls||clear") sys.exit(0)