From 6d68a2276b8e7cc98fec63202bdac471da64a601 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Wed, 9 Feb 2022 12:28:14 +0100 Subject: [PATCH] Various additions (some are still to be finished) --- README.md | 7 +- src/mailMaker.py | 429 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 399 insertions(+), 37 deletions(-) mode change 100644 => 100755 src/mailMaker.py diff --git a/README.md b/README.md index f20d15b..071e35d 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,8 @@ # MailMaker -A simple tool to interact with poste.io's API \ No newline at end of file +A simple tool to interact with poste.io's API + +## Note + +Yes, the code is awful, poorly modular, full of repetitions and not so commented. I could not be arsed +to do that, sorry! \ No newline at end of file diff --git a/src/mailMaker.py b/src/mailMaker.py old mode 100644 new mode 100755 index e6ac889..47d6253 --- a/src/mailMaker.py +++ b/src/mailMaker.py @@ -21,6 +21,9 @@ import trio import json import httpx import random +import socket +import string +import getpass import argparse import validators from typing import Union, Any @@ -42,8 +45,10 @@ async def send_authenticated_request( """ try: - return getattr(httpx, method)(*args, **kwargs, auth=(user, password)) - except httpx.RequestError as e: + # print(f"calling httpx.{method}({f', '.join(map(repr, args))}, auth=({user!r}, {password!r}), {', '.join(f'{name}={repr(value)}' for name, value in kwargs.items())})") + async with httpx.AsyncClient() as client: + return await getattr(client, method)(*args, **kwargs, auth=(user, password)) + except (httpx.RequestError, socket.gaierror, OSError) as e: return e @@ -55,7 +60,7 @@ async def handle_errors(*args, **kwargs) -> Union[httpx.Response, httpx.RequestE if isinstance(resp := await send_authenticated_request(*args, **kwargs), httpx.Response): match resp.status_code: - case 200 | 204 | 404: + case 200 | 201 | 204 | 404 | 400: return resp case 401: print(fg.red("Authentication failed: Unauthorized (wrong credentials)")) @@ -122,9 +127,8 @@ async def loop(server: str, user: str, password: str) -> int: {fg.blue}[{fg.green}12{fg.blue}] {colored("Get the DKIM key pair for an existing virtual domain")} {fg.blue}[{fg.green}13{fg.blue}] {colored("Delete the DKIM key pair for an existing virtual domain")} {fg.blue}[{fg.green}14{fg.blue}] {colored("Get in/out statistics for an existing virtual domain")} - {fg.blue}[{fg.green}15{fg.blue}] {colored("Get status information for an existing mailbox")} - {fg.blue}[{fg.green}16{fg.blue}] {colored("Update an existing virtual domain's information")} - {fg.blue}[{fg.green}17{fg.blue}] {colored("Search a mailbox by address")}""" + {fg.blue}[{fg.green}15{fg.blue}] {colored("Update an existing virtual domain's information")} + {fg.blue}[{fg.green}16{fg.blue}] {colored("Search a mailbox by address")}""" skip = False prompted = False while True: @@ -147,8 +151,8 @@ async def loop(server: str, user: str, password: str) -> int: prompted = True if not choice.strip() or not choice.isnumeric(): skip = True - print(fg.yellow("Invalid choice")) - await trio.sleep(3) + print(fg.yellow("Invalid choice (must be a number)")) + await trio.sleep(2) else: # We clear each time because we don't want # to clear the screen in the default case @@ -177,7 +181,7 @@ async def loop(server: str, user: str, password: str) -> int: f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) - await trio.sleep(3) + await trio.sleep(2) os.system("cls||clear") continue case 2: @@ -219,12 +223,87 @@ async def loop(server: str, user: str, password: str) -> int: f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) - await trio.sleep(3) + await trio.sleep(2) os.system("cls||clear") continue case 3: os.system("cls||clear") print(colored("Creating new mailbox")) + name = input(colored("Choose a name for the mailbox: ")) + while True: + email = input(colored("Choose an email address for the mailbox: ")) + if not validators.email(email): + print(fg.red("Invalid email address")) + continue + break + while True: + mail_password = getpass.getpass(colored("Choose a password for the mailbox (empty for auto-generated password): ")) + if not mail_password.strip(): + mail_password = "".join((random.choice(string.ascii_letters + string.digits + string.punctuation) for _ in range(random.randint(10, 16)))) + print(colored(f"Generated password is {mail_password}")) + break + confirm_password = getpass.getpass(colored("Confirm the password: ")) + if mail_password != confirm_password: + print(fg.red("Passwords do not match")) + continue + break + while True: + destinations = input(colored("Is this mailbox a redirect?\nIf so, provide a comma-separated list of destinations here: ")) + if destinations and not all((validators.email(mail) for mail in destinations.strip().split(","))): + print(fg.red("One or more destinations are not valid")) + continue + break + confirmation = input(colored("System Admin? [y/N] ")) + if confirmation.strip().lower() in ("y", "yes"): + admin = True + else: + admin = False + if (t := destinations.strip().split(",")) == ['']: + # Split has some weird behavior + destinations = [] + else: + destinations = t + data = {"name": name, + "email": email, + "passwordPlaintext": mail_password, + "disabled": False, + "referenceId": "", # TODO: Set this? + "superAdmin": admin, + "redirectTo": destinations + } + confirmation = input(colored("Confirm? [y/N] ")) + if confirmation.strip().lower() in ("y", "yes"): + if isinstance( + resp := await handle_errors( + "post", + user, + password, + f"{server}/admin/api/v1/boxes", + follow_redirects=True, + data=data + ), + httpx.Response, + ): + if resp.status_code == 201: + print(colored(f"{email} successfully created!")) + elif resp.status_code == 400: + print(fg.red(f"An error occurred: {decode_json(resp) or resp.content.decode()}")) + elif resp.status_code == 404: + print(fg.red("Cannot create mailbox: the chosen domain is not registered")) + else: + print( + fg.red( + f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" + ) + ) + await trio.sleep(2) + os.system("cls||clear") + continue + else: + print(colored("Aborting")) + skip = True + await trio.sleep(2) + case 4: os.system("cls||clear") print(colored("Deleting an existing mailbox")) @@ -251,13 +330,13 @@ async def loop(server: str, user: str, password: str) -> int: f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) - await trio.sleep(3) + await trio.sleep(2) os.system("cls||clear") continue else: print(colored("Aborting")) skip = True - await trio.sleep(3) + await trio.sleep(2) case 5: data = { "passwordPlaintext": "", @@ -276,7 +355,7 @@ async def loop(server: str, user: str, password: str) -> int: ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) - await trio.sleep(3) + await trio.sleep(2) os.system("cls||clear") continue elif account_info := decode_json(resp): @@ -289,7 +368,7 @@ async def loop(server: str, user: str, password: str) -> int: f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) - await trio.sleep(3) + await trio.sleep(2) os.system("cls||clear") continue print( @@ -305,14 +384,16 @@ async def loop(server: str, user: str, password: str) -> int: while True: what = input(colored("What information do you want to update? ")) if not what or not what.isnumeric(): - print(fg.yellow("Invalid choice")) - await trio.sleep(3) + print(fg.yellow("Invalid choice (must be a number)")) + await trio.sleep(2) else: match int(what): case 1: new_name = input(colored(f"New Name for {email} (empty to abort): ")) if data["name"] == new_name: print(colored(f"The desired name is already set for {email}")) + await trio.sleep(2) + continue elif new_name.strip(): data["name"] = new_name keep = True @@ -320,13 +401,14 @@ async def loop(server: str, user: str, password: str) -> int: case 2: new_password = input(colored(f"New Password for {email} (empty to abort): ")) if new_password.strip(): - print(len(new_password)) data["passwordPlaintext"] = new_password keep = True break case 3: if data["superAdmin"]: print(colored(f"{email} is already a System Admin")) + await trio.sleep(2) + continue else: print(colored(f"Promoting {email} to System Admin")) data["superAdmin"] = True @@ -335,6 +417,8 @@ async def loop(server: str, user: str, password: str) -> int: case 4: if not data["superAdmin"]: print(colored(f"{email} is not a System Admin")) + await trio.sleep(2) + continue else: print(colored(f"Demoting {email} from System Admin")) data["superAdmin"] = False @@ -343,6 +427,8 @@ async def loop(server: str, user: str, password: str) -> int: case 5: if data["disabled"]: print(colored(f"{email} is already disabled")) + await trio.sleep(2) + continue else: print(colored(f"Disabling mailbox {email} ")) keep = True @@ -351,14 +437,16 @@ async def loop(server: str, user: str, password: str) -> int: case 6: if not data["disabled"]: print(colored(f"{email} is not disabled")) + await trio.sleep(2) + continue else: print(colored(f"Enabling mailbox {email} ")) keep = True data["disabled"] = False break case _: - print(fg.yellow("Invalid choice")) - await trio.sleep(3) + print(fg.yellow("Invalid choice (value out of range)")) + await trio.sleep(2) if keep: confirmation = input(colored("Confirm? [y/N] ")) if confirmation.strip().lower() in ("y", "yes"): @@ -379,13 +467,13 @@ async def loop(server: str, user: str, password: str) -> int: "The provided mailbox no longer exists: has it been deleted in the meantime?" ) ) - await trio.sleep(3) + await trio.sleep(2) elif resp.status_code == 204: print(colored("Information Updated")) else: print(colored("Aborting")) skip = True - await trio.sleep(3) + await trio.sleep(2) case 6: os.system("cls||clear") print(colored("Getting in/out statistics for existing mailbox")) @@ -398,7 +486,7 @@ async def loop(server: str, user: str, password: str) -> int: ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) - await trio.sleep(3) + await trio.sleep(2) os.system("cls||clear") continue elif stats := decode_json(resp): @@ -411,14 +499,129 @@ async def loop(server: str, user: str, password: str) -> int: f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) - await trio.sleep(3) + await trio.sleep(2) os.system("cls||clear") continue case 7: + os.system("cls||clear") print(colored("Getting quota limits for an existing mailbox")) + mail = input(colored("Choose a mailbox to get quota info for: ")) + if isinstance( + resp := await handle_errors( + "get", user, password, f"{server}/admin/api/v1/boxes/{mail}/quota", follow_redirects=True + ), + httpx.Response, + ): + if resp.status_code == 404: + print(fg.red("The provided mailbox does not exist")) + elif data := decode_json(resp): + print( + f"""{colored("Quota information for")} {colored(mail)}\n\n""" + f"""{colored(f"Storage Limit: {data['storage_limit']}")}\n""" + f"""{colored(f"Storage Usage: {data['storage_usage']}")}\n""" + f"""{colored(f"Count Limit: {data['count_limit']}")}\n""" + f"""{colored(f"Count Usage: {data['count_usage']}")}\n""" + ) case 8: os.system("cls||clear") print(colored("Updating quota limits for an existing mailbox")) + data = { + "storageLimit": 0, + "countLimit": 0, + } + email = input(colored("Select a mailbox to update quotas for: ")) + if isinstance( + resp := await handle_errors( + "get", user, password, f"{server}/admin/api/v1/boxes/{email}/quota", follow_redirects=True + ), + httpx.Response, + ): + if resp.status_code == 404: + print(fg.red("The provided mailbox does not exist")) + await trio.sleep(2) + os.system("cls||clear") + continue + elif account_info := decode_json(resp): + data["storageLimit"] = account_info["storage_limit"] + data["countLimit"] = account_info["count_limit"] + else: + print( + fg.red( + f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" + ) + ) + await trio.sleep(2) + os.system("cls||clear") + continue + print( + f"""{colored("Choose what to update:")}\n""" + f"""{fg.blue}[{fg.green}1{fg.blue}] Update Storage Limit\n""" + f"""{fg.blue}[{fg.green}2{fg.blue}] Update Count Limit\n""" + ) + keep = False + while True: + what = input(colored("What information do you want to update? ")) + if not what or not what.isnumeric(): + print(fg.yellow("Invalid choice (must be a number)")) + await trio.sleep(2) + else: + match int(what): + case 1: + new_storage = int(input(colored(f"New storage limit for {email} (current is {data['storageLimit']}, empty to abort): "))) + if data["storageLimit"] == new_storage: + print(colored(f"The desired storage limit is already set to {data['storageLimit']} for {email}")) + await trio.sleep(2) + continue + elif new_storage.strip(): + if not new_storage.isnumeric(): + print(fg.yellow("Invalid choice (must be a number)")) + await trio.sleep(2) + continue + data["storageLimit"] = new_name + keep = True + break + case 2: + new_count = int(input(colored(f"New count limit for {email} (current is {data['countLimit']}, empty to abort): "))) + if data["countLimit"] == new_count: + print(colored(f"The desired count limit is already set to {data['countLimit']} for {email}")) + await trio.sleep(2) + continue + else: + data["countLimit"] = new_count + keep = True + break + case _: + print(fg.yellow("Invalid choice (value out of range)")) + await trio.sleep(2) + if keep: + confirmation = input(colored("Confirm? [y/N] ")) + if confirmation.strip().lower() in ("y", "yes"): + if isinstance( + resp := await handle_errors( + "patch", + user, + password, + f"{server}/admin/api/v1/boxes/{email}/quota", + follow_redirects=True, + data=data + ), + httpx.Response, + ): + if resp.status_code == 404: + print( + fg.red( + "The provided mailbox no longer exists: has it been deleted in the meantime?" + ) + ) + await trio.sleep(2) + elif resp.status_code == 204: + print(colored("Quota Updated")) + elif resp.status_code == 500: + print(fg.yellow("Are you trying to update limits for a System Administrator which is now allowed?")) + else: + print(colored("Aborting")) + skip = True + await trio.sleep(2) case 9: os.system("cls||clear") print(colored("Listing all existing virtual domains")) @@ -443,7 +646,7 @@ async def loop(server: str, user: str, password: str) -> int: f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) - await trio.sleep(3) + await trio.sleep(2) os.system("cls||clear") continue case 10: @@ -490,7 +693,7 @@ async def loop(server: str, user: str, password: str) -> int: ): if resp.status_code == 404: print(fg.red("The provided virtual domain does not exist")) - await trio.sleep(3) + await trio.sleep(2) os.system("cls||clear") continue elif stats := decode_json(resp): @@ -503,25 +706,175 @@ async def loop(server: str, user: str, password: str) -> int: f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) - await trio.sleep(3) + await trio.sleep(2) os.system("cls||clear") continue case 15: os.system("cls||clear") - print(colored("Getting status information for an existing mailbox")) + print(colored("Updating an existing virtual domain")) + data = {"forward": False, + "forwardDomain": "", + "domainBin": False, + "domainBinAddress": "", + "forceRoute": False, + "forceRouteHost": "", + "referenceId": "" + } + domain = input(colored("Select a virtual domain to update: ")) + if isinstance( + resp := await handle_errors( + "get", user, password, f"{server}/admin/api/v1/domains/{domain}", follow_redirects=True + ), + httpx.Response, + ): + if resp.status_code == 404: + print(fg.red("The provided virtual domain does not exist")) + await trio.sleep(2) + os.system("cls||clear") + continue + elif domain_info := decode_json(resp): + data["forward"] = domain_info["forward"] + data["domainBin"] = domain_info["domain_bin"] + data["forceRoute"] = domain_info["force_route"] + if domain_info["forward"]: + data["forwardDomain"] = domain_info["forward_domain"] + if domain_info["force_route"]: + data["forceRouteHost"] = domain_info["force_route_host"] + if domain_info["domain_bin"]: + data["domainBinAddress"] = domain_info["domain_bin_address"] + else: + print( + fg.red( + f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" + ) + ) + await trio.sleep(2) + os.system("cls||clear") + continue + print( + f"""{colored("Choose what to update:")}\n""" + f"""{fg.blue}[{fg.green}1{fg.blue}] Set/Update domain bin (catch-all)\n""" + f"""{fg.blue}[{fg.green}2{fg.blue}] Set/Update forced routing\n""" + f"""{fg.blue}[{fg.green}3{fg.blue}] Set/Update forward domain\n""" + f"""{fg.blue}[{fg.green}4{fg.blue}] Unset domain bin (catch-all)\n""" + f"""{fg.blue}[{fg.green}5{fg.blue}] Unset forced routing\n""" + f"""{fg.blue}[{fg.green}6{fg.blue}] Unset forward domain\n""" + ) + keep = False + while True: + what = input(colored("What information do you want to update? ")) + if not what or not what.isnumeric(): + print(fg.yellow("Invalid choice (must be a number)")) + await trio.sleep(2) + else: + match int(what): + case 1: + if data["domainBin"]: + print(colored(f"Note: current catch-all for {domain} is set to {data['domainBinAddress']}")) + new_bin = input(colored(f"New domain catch-all for {domain} (empty to abort): ")) + if data["domainBinAddress"] and data["domainBinAddress"] == new_bin: + print(colored(f"The desired catch-all is already set to {new_bin} for {domain}")) + elif new_bin.strip(): + data["domainBinAddress"] = new_bin + data["domainBin"] = True + keep = True + break + case 2: + if data["forceRoute"]: + print(colored(f"Note: {domain} is currently routed trough {data['forceRouteHost']}")) + while True: + new_routing = input(colored(f"Set forced routing for {domain} (empty to abort): ")) + if not new_routing.strip(): + break + # I could've one-lined all these nested ifs but it'd look awful so shut up + if not validators.domain(new_routing): + # Not a domain + if not validators.ipv4(new_routing) or validators.ipv6(new_routing): + # Not a plain IPV4/IPV6 + if ':' in new_routing: + # Is it an IP:port pair? + split = new_routing.split(":", maxsplit=1) + if not split[1].isnumeric() or not (validators.ipv4(split[0]) or validators.ipv6(split[0])): + # Nope, not a valid host + print(fg.red("The provided host is not valid (not a domain, IP, nor an IP:port pair)")) + await trio.sleep(2) + continue + break + if new_routing.strip(): + if data["forceRoute"] and data["forceRouteHost"] == new_routing.strip(): + print(colored("The virtual domain {domain} is already routed trough {new_routing}")) + await trio.sleep(2) + continue + data["forceRouteHost"] = new_routing + data["forceRoute"] = True + keep = True + break + case 3: + while True: + forward_address = input(colored("Choose a new forwarding address: ")) + if validators.email(forward_address): + break + else: + print(colored("Not a valid email address")) + if data["forward"]: + print(colored(f"Note: {domain} currently forwards to {data['forwardAddress']}")) + if data["forward"] and data["forwardAddress"] == forward_address: + print(colored(f"{domain} already forwards to {data['forwardAddress']}")) + else: + data["forward"] = True + data["forwardAddress"] = True + keep = True + break + case _: + print(fg.yellow("Invalid choice (value out of range)")) + await trio.sleep(2) + if keep: + confirmation = input(colored("Confirm? [y/N] ")) + if confirmation.strip().lower() in ("y", "yes"): + if isinstance( + resp := await handle_errors( + "patch", + user, + password, + f"{server}/admin/api/v1/domains/{domain}", + follow_redirects=True, + data=data + ), + httpx.Response, + ): + if resp.status_code == 400: + if err := decode_json(resp): + print(fg.red(f"An error occurred: {err['message']}")) + print(fg.red("Errors: ")) + for error_kind in err["errors"]["children"].keys(): + if not err["errors"]["children"][error_kind]: + continue + print(fg.red(f" - Error kind: {error_kind}")) + for error_message in err["errors"]["children"][error_kind]["errors"]: + print(fg.red(f" - {error_message}")) + if resp.status_code == 404: + print( + fg.red( + "The provided domain no longer exists: has it been deleted in the meantime?" + ) + ) + await trio.sleep(2) + elif resp.status_code == 204: + print(colored("Information Updated")) + else: + print(colored("Aborting")) + skip = True + await trio.sleep(2) case 16: - os.system("cls||clear") - print(colored("Updating an existing virtual domain's information")) - case 17: os.system("cls||clear") print(colored("Searching a mailbox by address")) case _: skip = True - print(fg.yellow("Invalid choice")) - await trio.sleep(3) + print(fg.yellow("Invalid choice (value out of range)")) + await trio.sleep(2) os.system("cls||clear") if not skip: - pause(fg.cyan("Press any key to continue")) + pause(colored("Press any key to continue")) # Awful, but it works so shut up os.system("cls||clear") except KeyboardInterrupt: @@ -532,6 +885,10 @@ async def loop(server: str, user: str, password: str) -> int: # main menu if not prompted: return 0 + except KeyError as k: + print(fg.red("The JSON response from the API was missing an expected field ({k!r}), has the version changed?")) + await trio.sleep(2) + os.system("cls||clear") except EOFError: os.system("cls||clear") if not prompted: @@ -619,7 +976,7 @@ async def main(args: argparse.Namespace) -> int: print(fg.green("Authentication successful!")) return await loop(args.server_url, user, password) else: - print(fg.red(f"An error occurred while attempting to log in -> {type(resp.__name__)} -> {resp}")) + print(fg.red(f"An error occurred while attempting to log in -> {type(resp).__name__} -> {resp}")) return -1 @@ -650,8 +1007,8 @@ if __name__ == "__main__": "fg", (object,), { - "__getattr__": lambda self, a: type( - "i", (object,), {"__repr__": lambda s: "", "__call__": lambda m, x: x} + "__getattr__": lambda self, _: type( + "i", (object,), {"__repr__": lambda _: "", "__call__": lambda _, x: x} )(), }, )()