#!/usr/bin/env python3 """ Copyright (C) 2021 nocturn9x Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import re import os import sys import trio import json import httpx import random import argparse import validators from typing import Union, Any from console import fg try: from console.utils import pause except ImportError: # For some reason the lib does not export utils by default ¯\_(ツ)_/¯ print(f"Please edit the console library's __init__.py file at line 58 and add the line 'from . import utils'") sys.exit(0) async def send_authenticated_request( method: str, user: str, password: str, *args, **kwargs ) -> Union[httpx.Response, httpx.RequestError]: """ Sends an HTTP request using httpx and basic HTTP authentication """ try: return getattr(httpx, method)(*args, **kwargs, auth=(user, password)) except httpx.RequestError as e: return e async def handle_errors(*args, **kwargs) -> Union[httpx.Response, httpx.RequestError]: """ Simple wrapper to avoid copy-pasting the same error handling code 20 times """ if isinstance(resp := await send_authenticated_request(*args, **kwargs), httpx.Response): match resp.status_code: case 200 | 204 | 404: return resp case 401: print(fg.red("Authentication failed: Unauthorized (wrong credentials)")) case 403: print(fg.red("Authentication failed: Forbidden")) case resp.status_code if resp.status_code >= 500: print(fg.red(f"The server raised a {resp.status_code} error, maybe try again later?")) case _: print(fg.yellow(f"The server returned unexpected status code {resp.status_code}, exiting")) return resp def decode_json(response: httpx.Response) -> Any: """ Handles potential exceptions from httpx.Response.json(): """ try: return response.json() except json.decoder.JSONDecodeError as json_error: print( fg.red( f"A JSON error occurred while attempting to decode some data -> {type(json_error).__name__}:" f" {json_error}" ) ) return "" _last_color = None def colored(s: str) -> str: """ Calls fg() with a random color each time, but making sure it doesn't use the same color across two consecutive calls """ global _last_color # Ew... while (c := random.choice(["blue", "magenta", "red", "cyan", "blue", "purple", "yellow", "green"])) == _last_color: pass _last_color = c return getattr(fg, _last_color)(s) async def loop(server: str, user: str, password: str) -> int: """ Program main loop """ prompt_pattern = f"""{fg.green}Welcome to MailMaker! Choose what to do:{fg.default} {fg.blue}[{fg.green}1{fg.blue}] {colored("List all existing mailboxes")} {fg.blue}[{fg.green}2{fg.blue}] {colored("Get info about an existing mailbox")} {fg.blue}[{fg.green}3{fg.blue}] {colored("Create a new mailbox")} {fg.blue}[{fg.green}4{fg.blue}] {colored("Delete an existing mailbox")} {fg.blue}[{fg.green}5{fg.blue}] {colored("Update an existing mailbox's information")} {fg.blue}[{fg.green}6{fg.blue}] {colored("Get in/out statistics for an existing mailbox")} {fg.blue}[{fg.green}7{fg.blue}] {colored("Get quota limits for an existing mailbox")} {fg.blue}[{fg.green}8{fg.blue}] {colored("Update quota limits for an existing mailbox")} {fg.blue}[{fg.green}9{fg.blue}] {colored("List all existing virtual domains")} {fg.blue}[{fg.green}10{fg.blue}] {colored("Get info about an existing virtual domain")} {fg.blue}[{fg.green}11{fg.blue}] {colored("Generate a DKIM key pair for an existing virtual domain")} {fg.blue}[{fg.green}12{fg.blue}] {colored("Get the DKIM key pair for an existing virtual domain")} {fg.blue}[{fg.green}13{fg.blue}] {colored("Delete the DKIM key pair for an existing virtual domain")} {fg.blue}[{fg.green}14{fg.blue}] {colored("Get in/out statistics for an existing virtual domain")} {fg.blue}[{fg.green}15{fg.blue}] {colored("Get status information for an existing mailbox")} {fg.blue}[{fg.green}16{fg.blue}] {colored("Update an existing virtual domain's information")} {fg.blue}[{fg.green}17{fg.blue}] {colored("Search a mailbox by address")}""" skip = False prompted = False while True: try: # Some unicorn puke :D prompt = prompt_pattern.format( *( ( getattr( fg, random.choice(["blue", "magenta", "red", "cyan", "blue", "purple", "yellow", "green"]) ) ) for _ in range(len(prompt_pattern.splitlines()) - 1) ) ) print(prompt) skip = False prompted = False choice = input(fg.purple("Make your choice: ")) prompted = True if not choice.strip() or not choice.isnumeric(): skip = True print(fg.yellow("Invalid choice")) await trio.sleep(3) else: # We clear each time because we don't want # to clear the screen in the default case match int(choice): case 1: os.system("cls||clear") print(colored("Listing all mailboxes")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/boxes", params={"page": 1, "paging": 999999}, follow_redirects=True, ), httpx.Response, ): if data := decode_json(resp): print(fg.green(f"Fetched {data['results_count']} addresses")) for mailbox in data["results"]: print(f"\t{fg.yellow}- {colored(mailbox['address'])}") else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(3) os.system("cls||clear") continue case 2: os.system("cls||clear") address = input(colored("Choose an inbox to get info for: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/boxes/{address}", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) elif data := decode_json(resp): if not data["redirect_to"]: redirect_list = "None" else: redirect_list = "" for redirect in data["redirect_to"]: redirect_list += f"\n\t- {redirect}\n" print( f"""{colored("Mailbox information for")} {colored(address)}\n\n""" f"""{colored(f"User: {data['user']}")}\n""" f"""{colored(f"Name: {data['name'] or 'None'}")}\n""" f"""{colored(f"System Admin: {'Yes' if data['super_admin'] else 'No'}")}\n""" f"""{colored(f"Domain Admin: {'Yes' if data['domain_admin'] else 'No'}")}\n""" f"""{colored(f"Home Dir: {data['home']}")}\n""" f"""{colored(f"Strict 'From' Header: {'Not Enforced' if data['strict_from_disabled'] else 'Enforced'}")}\n""" f"""{colored(f"Creation Date: {data['created']}")}\n""" f"""{colored(f"Last Update Date: {data['updated']}")}\n""" f"""{colored(f"Redirect Only: {'Yes' if data['redirect_only'] else 'No'}")}\n""" f"""{colored(f"Redirects to: {redirect_list}")}\n""" f"""{colored(f"Discards Incoming Mail: {'Yes' if data['discard'] else 'No'}")}\n""" f"""{colored(f"Disabled: {'Yes' if data['disabled'] else 'No'}")}\n""" ) else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(3) os.system("cls||clear") continue case 3: os.system("cls||clear") print(colored("Creating new mailbox")) case 4: os.system("cls||clear") print(colored("Deleting an existing mailbox")) address = input(colored("Choose an inbox to delete: ")) confirmation = input(colored("Confirm? [y/N] ")) if confirmation.strip().lower() in ("y", "yes"): if isinstance( resp := await handle_errors( "delete", user, password, f"{server}/admin/api/v1/boxes/{address}", follow_redirects=True, ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) elif resp.status_code == 204: print(colored(f"{address} successfully deleted!")) else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(3) os.system("cls||clear") continue else: print(colored("Aborting")) skip = True await trio.sleep(3) case 5: data = { "passwordPlaintext": "", "name": "", "disabled": False, "superAdmin": False, } os.system("cls||clear") print(colored("Updating an existing mailbox")) email = input(colored("Select a mailbox to update: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/boxes/{email}", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) await trio.sleep(3) os.system("cls||clear") continue elif account_info := decode_json(resp): data["disabled"] = account_info["disabled"] data["superAdmin"] = account_info["super_admin"] data["name"] = account_info["name"] else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(3) os.system("cls||clear") continue print( f"""{colored("Choose what to update:")}\n""" f"""{fg.blue}[{fg.green}1{fg.blue}] Update Name\n""" f"""{fg.blue}[{fg.green}2{fg.blue}] Update Password\n""" f"""{fg.blue}[{fg.green}3{fg.blue}] Promote to System Admin\n""" f"""{fg.blue}[{fg.green}4{fg.blue}] Demote from System Admin\n""" f"""{fg.blue}[{fg.green}5{fg.blue}] Disable account\n""" f"""{fg.blue}[{fg.green}6{fg.blue}] Enable account\n""" ) keep = False while True: what = input(colored("What information do you want to update? ")) if not what or not what.isnumeric(): print(fg.yellow("Invalid choice")) await trio.sleep(3) else: match int(what): case 1: new_name = input(colored(f"New Name for {email} (empty to abort): ")) if data["name"] == new_name: print(colored(f"The desired name is already set for {email}")) elif new_name.strip(): data["name"] = new_name keep = True break case 2: new_password = input(colored(f"New Password for {email} (empty to abort): ")) if new_password.strip(): print(len(new_password)) data["passwordPlaintext"] = new_password keep = True break case 3: if data["superAdmin"]: print(colored(f"{email} is already a System Admin")) else: print(colored(f"Promoting {email} to System Admin")) data["superAdmin"] = True keep = True break case 4: if not data["superAdmin"]: print(colored(f"{email} is not a System Admin")) else: print(colored(f"Demoting {email} from System Admin")) data["superAdmin"] = False keep = True break case 5: if data["disabled"]: print(colored(f"{email} is already disabled")) else: print(colored(f"Disabling mailbox {email} ")) keep = True data["disabled"] = True break case 6: if not data["disabled"]: print(colored(f"{email} is not disabled")) else: print(colored(f"Enabling mailbox {email} ")) keep = True data["disabled"] = False break case _: print(fg.yellow("Invalid choice")) await trio.sleep(3) if keep: confirmation = input(colored("Confirm? [y/N] ")) if confirmation.strip().lower() in ("y", "yes"): if isinstance( resp := await handle_errors( "patch", user, password, f"{server}/admin/api/v1/boxes/{email}", follow_redirects=True, data=data ), httpx.Response, ): if resp.status_code == 404: print( fg.red( "The provided mailbox no longer exists: has it been deleted in the meantime?" ) ) await trio.sleep(3) elif resp.status_code == 204: print(colored("Information Updated")) else: print(colored("Aborting")) skip = True await trio.sleep(3) case 6: os.system("cls||clear") print(colored("Getting in/out statistics for existing mailbox")) email = input(colored("Select a mailbox to fetch stats for: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/boxes/{email}/stats", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided mailbox does not exist")) await trio.sleep(3) os.system("cls||clear") continue elif stats := decode_json(resp): for day in stats: print(colored(day.replace("-", "/"))) print(colored(f"\t- In: {stats[day]['in']}\n\t- Out: {stats[day]['out']}")) else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(3) os.system("cls||clear") continue case 7: print(colored("Getting quota limits for an existing mailbox")) case 8: os.system("cls||clear") print(colored("Updating quota limits for an existing mailbox")) case 9: os.system("cls||clear") print(colored("Listing all existing virtual domains")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/domains", params={"page": 1, "paging": 999999}, follow_redirects=True, ), httpx.Response, ): if data := decode_json(resp): print(fg.green(f"Fetched {data['results_count']} domains")) for domain in data["results"]: print(f"\t{fg.yellow}- {colored(domain['name'])}") else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(3) os.system("cls||clear") continue case 10: os.system("cls||clear") print(colored("Getting info about an existing virtual domain")) domain = input(colored("Choose a virtual domain to get info for: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/domains/{domain}", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided virtual domain does not exist")) elif data := decode_json(resp): print( f"""{colored("Virtual domain information for")} {colored(domain)}\n\n""" f"""{colored(f"Home Dir: {data['home']}")}\n""" f"""{colored(f"Forward: {data['forward_domain'] if data['forward'] else 'No'}")}\n""" f"""{colored(f"Creation Date: {data['created']}")}\n""" f"""{colored(f"Last Update Date: {data['updated']}")}\n""" f"""{colored(f"Domain bin (catch-all): {data['domain_bin_address'] if data['domain_bin'] else 'No'}")}\n""" f"""{colored(f"Force Route: {data['force_route_host'] if data['force_route'] else 'No'}")}\n""" ) case 11: os.system("cls||clear") print(colored("Generating a DKIM key pair for an existing virtual domain")) case 12: os.system("cls||clear") print(colored("Getting the DKIM key pair for an existing virtual domain")) case 13: os.system("cls||clear") print(colored("Deleting the DKIM key pair for an existing virtual domain")) case 14: os.system("cls||clear") print(colored("Getting in/out statistics for an existing virtual domain")) domain = input(colored("Select a virtual domain to fetch stats for: ")) if isinstance( resp := await handle_errors( "get", user, password, f"{server}/admin/api/v1/domains/{domain}/stats", follow_redirects=True ), httpx.Response, ): if resp.status_code == 404: print(fg.red("The provided virtual domain does not exist")) await trio.sleep(3) os.system("cls||clear") continue elif stats := decode_json(resp): for day in stats: print(colored(day.replace("-", "/"))) print(colored(f"\t- In: {stats[day]['in']}\n\t- Out: {stats[day]['out']}")) else: print( fg.red( f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}" ) ) await trio.sleep(3) os.system("cls||clear") continue case 15: os.system("cls||clear") print(colored("Getting status information for an existing mailbox")) case 16: os.system("cls||clear") print(colored("Updating an existing virtual domain's information")) case 17: os.system("cls||clear") print(colored("Searching a mailbox by address")) case _: skip = True print(fg.yellow("Invalid choice")) await trio.sleep(3) os.system("cls||clear") if not skip: pause(fg.cyan("Press any key to continue")) # Awful, but it works so shut up os.system("cls||clear") except KeyboardInterrupt: os.system("cls||clear") # This way when you press Ctrl+C # or Ctrl+D from an inner section # you are brought back to the # main menu if not prompted: return 0 except EOFError: os.system("cls||clear") if not prompted: return -1 # This tool technically doesn't *need* to be async, but shut up async def main(args: argparse.Namespace) -> int: """ Main program entry point """ user, password = (None, None) if not re.match(r"http(s)?://", args.server_url): print(fg.red("Missing or invalid schema in server URL!")) return -1 elif not validators.url(args.server_url, public=True): # bool(ValidationFailure) is False :) print(fg.red("Invalid server URL, exiting")) return -1 if os.path.isfile("credentials.json"): try: with open("credentials.json") as credentials: data = json.load(credentials) except json.decoder.JSONDecodeError as json_error: print( fg.red( f"A JSON error occurred while attempting to load credentials.json -> {type(json_error).__name__}:" f" {json_error}" ) ) except PermissionError: print( fg.red( "Could not read credentials.json due to a permission issue. Make sure the file is readable by" " the current user and try again" ) ) except OSError as os_error: print( fg.red( f"Could not read credentials.json due to a generic OS error -> {type(os_error).__name__}:" f"{os_error}" ) ) else: try: user, password = data["username"], data["password"] except KeyError: print(fg.yellow("The JSON payload in credentials.json is not valid")) if not all([user, password]) and not all([args.username, args.password]): print( fg.yellow("Credentials could not be loaded and an explicit username/password pair wasn't provided, exiting") ) return -1 elif all([args.username, args.password]): user, password = args.username, args.password if args.save: print(fg.green(f"Saving credentials for {user} to credentials.json")) try: with open("credentials.json", "w") as save_file: json.dump({"username": user, "password": password}, save_file) except PermissionError: print( fg.red( "Could not write credentials.json due to a permission issue. Make sure the file is readable by" " the current user and try again" ) ) except OSError as os_error: print( fg.red( f"Could not write credentials.json due to a generic OS error -> {type(os_error).__name__}:" f"{os_error}" ) ) else: print(fg.green(f"Loaded credentials for {user} from credentials.json")) print(fg.yellow(f"Attempting login with {user} to {args.server_url}")) if isinstance( resp := await handle_errors("get", user, password, f"{args.server_url}/admin/api/doc", follow_redirects=True), httpx.Response, ): if resp.status_code == 200: print(fg.green("Authentication successful!")) return await loop(args.server_url, user, password) else: print(fg.red(f"An error occurred while attempting to log in -> {type(resp.__name__)} -> {resp}")) return -1 if __name__ == "__main__": parser = argparse.ArgumentParser(description="A simple tool to interact with the REST API of poste.io instances") parser.add_argument("--username", help="The username to authenticate with the API") parser.add_argument("--password", help="The password to authenticate with the API") parser.add_argument( "--save", help="Saves credentials in a credentials.json file for future use. Overwrites any existing credentials file", action="store_true", default=False, ) parser.add_argument( "--server-url", help="The URL of your poste.io instance. MUST include an http(s):// prefix", required=True ) parser.add_argument( "--nocolor", help="Disables the unicorn puke and outputs plain old boring text", action="store_true", default=False, ) arguments = parser.parse_args() if arguments.nocolor: # This hack is awful, but it works and is much better than using # parameters or globals everywhere. All hail dynamic languages fg = type( "fg", (object,), { "__getattr__": lambda self, a: type( "i", (object,), {"__repr__": lambda s: "", "__call__": lambda m, x: x} )(), }, )() try: os.system("cls||clear") print(colored("MailMaker v0.0.1, made with <3 and Python 3.8 by @nocturn9x")) sys.exit(trio.run(main, parser.parse_args())) except KeyboardInterrupt: os.system("cls||clear") sys.exit(0)