Added project files

This commit is contained in:
Mattia Giambirtone 2022-02-06 23:05:02 +01:00
parent 0503fc7f55
commit ce328c59e5
2 changed files with 689 additions and 0 deletions

664
mailMaker.py Normal file
View File

@ -0,0 +1,664 @@
#!/usr/bin/env python3
"""
Copyright (C) 2021 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import re
import os
import sys
import trio
import json
import httpx
import random
import argparse
import validators
from typing import Union, Any
from console import fg
try:
from console.utils import pause
except ImportError:
# For some reason the lib does not export utils by default ¯\_(ツ)_/¯
print(f"Please edit the console library's __init__.py file at line 58 and add the line 'from . import utils'")
sys.exit(0)
async def send_authenticated_request(
method: str, user: str, password: str, *args, **kwargs
) -> Union[httpx.Response, httpx.RequestError]:
"""
Sends an HTTP request using httpx and basic HTTP authentication
"""
try:
return getattr(httpx, method)(*args, **kwargs, auth=(user, password))
except httpx.RequestError as e:
return e
async def handle_errors(*args, **kwargs) -> Union[httpx.Response, httpx.RequestError]:
"""
Simple wrapper to avoid copy-pasting the same
error handling code 20 times
"""
if isinstance(resp := await send_authenticated_request(*args, **kwargs), httpx.Response):
match resp.status_code:
case 200 | 204 | 404:
return resp
case 401:
print(fg.red("Authentication failed: Unauthorized (wrong credentials)"))
case 403:
print(fg.red("Authentication failed: Forbidden"))
case resp.status_code if resp.status_code >= 500:
print(fg.red(f"The server raised a {resp.status_code} error, maybe try again later?"))
case _:
print(fg.yellow(f"The server returned unexpected status code {resp.status_code}, exiting"))
return resp
def decode_json(response: httpx.Response) -> Any:
"""
Handles potential exceptions from httpx.Response.json():
"""
try:
return response.json()
except json.decoder.JSONDecodeError as json_error:
print(
fg.red(
f"A JSON error occurred while attempting to decode some data -> {type(json_error).__name__}:"
f" {json_error}"
)
)
return ""
_last_color = None
def colored(s: str) -> str:
"""
Calls fg() with a random color each time, but
making sure it doesn't use the same color across
two consecutive calls
"""
global _last_color # Ew...
while (c := random.choice(["blue", "magenta", "red", "cyan", "blue", "purple", "yellow", "green"])) == _last_color:
pass
_last_color = c
return getattr(fg, _last_color)(s)
async def loop(server: str, user: str, password: str) -> int:
"""
Program main loop
"""
prompt_pattern = f"""{fg.green}Welcome to MailMaker! Choose what to do:{fg.default}
{fg.blue}[{fg.green}1{fg.blue}] {colored("List all existing mailboxes")}
{fg.blue}[{fg.green}2{fg.blue}] {colored("Get info about an existing mailbox")}
{fg.blue}[{fg.green}3{fg.blue}] {colored("Create a new mailbox")}
{fg.blue}[{fg.green}4{fg.blue}] {colored("Delete an existing mailbox")}
{fg.blue}[{fg.green}5{fg.blue}] {colored("Update an existing mailbox's information")}
{fg.blue}[{fg.green}6{fg.blue}] {colored("Get in/out statistics for an existing mailbox")}
{fg.blue}[{fg.green}7{fg.blue}] {colored("Get quota limits for an existing mailbox")}
{fg.blue}[{fg.green}8{fg.blue}] {colored("Update quota limits for an existing mailbox")}
{fg.blue}[{fg.green}9{fg.blue}] {colored("List all existing virtual domains")}
{fg.blue}[{fg.green}10{fg.blue}] {colored("Get info about an existing virtual domain")}
{fg.blue}[{fg.green}11{fg.blue}] {colored("Generate a DKIM key pair for an existing virtual domain")}
{fg.blue}[{fg.green}12{fg.blue}] {colored("Get the DKIM key pair for an existing virtual domain")}
{fg.blue}[{fg.green}13{fg.blue}] {colored("Delete the DKIM key pair for an existing virtual domain")}
{fg.blue}[{fg.green}14{fg.blue}] {colored("Get in/out statistics for an existing virtual domain")}
{fg.blue}[{fg.green}15{fg.blue}] {colored("Get status information for an existing mailbox")}
{fg.blue}[{fg.green}16{fg.blue}] {colored("Update an existing virtual domain's information")}
{fg.blue}[{fg.green}17{fg.blue}] {colored("Search a mailbox by address")}"""
skip = False
prompted = False
while True:
try:
# Some unicorn puke :D
prompt = prompt_pattern.format(
*(
(
getattr(
fg, random.choice(["blue", "magenta", "red", "cyan", "blue", "purple", "yellow", "green"])
)
)
for _ in range(len(prompt_pattern.splitlines()) - 1)
)
)
print(prompt)
skip = False
prompted = False
choice = input(fg.purple("Make your choice: "))
prompted = True
if not choice.strip() or not choice.isnumeric():
skip = True
print(fg.yellow("Invalid choice"))
await trio.sleep(3)
else:
# We clear each time because we don't want
# to clear the screen in the default case
match int(choice):
case 1:
os.system("cls||clear")
print(colored("Listing all mailboxes"))
if isinstance(
resp := await handle_errors(
"get",
user,
password,
f"{server}/admin/api/v1/boxes",
params={"page": 1, "paging": 999999},
follow_redirects=True,
),
httpx.Response,
):
if data := decode_json(resp):
print(fg.green(f"Fetched {data['results_count']} addresses"))
for mailbox in data["results"]:
print(f"\t{fg.yellow}- {colored(mailbox['address'])}")
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(3)
os.system("cls||clear")
continue
case 2:
os.system("cls||clear")
address = input(colored("Choose an inbox to get info for: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/boxes/{address}", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided mailbox does not exist"))
elif data := decode_json(resp):
if not data["redirect_to"]:
redirect_list = "None"
else:
redirect_list = ""
for redirect in data["redirect_to"]:
redirect_list += f"\n\t- {redirect}\n"
print(
f"""{colored("Mailbox information for")} {colored(address)}\n\n"""
f"""{colored(f"User: {data['user']}")}\n"""
f"""{colored(f"Name: {data['name'] or 'None'}")}\n"""
f"""{colored(f"System Admin: {'Yes' if data['super_admin'] else 'No'}")}\n"""
f"""{colored(f"Domain Admin: {'Yes' if data['domain_admin'] else 'No'}")}\n"""
f"""{colored(f"Home Dir: {data['home']}")}\n"""
f"""{colored(f"Strict 'From' Header: {'Not Enforced' if data['strict_from_disabled'] else 'Enforced'}")}\n"""
f"""{colored(f"Creation Date: {data['created']}")}\n"""
f"""{colored(f"Last Update Date: {data['updated']}")}\n"""
f"""{colored(f"Redirect Only: {'Yes' if data['redirect_only'] else 'No'}")}\n"""
f"""{colored(f"Redirects to: {redirect_list}")}\n"""
f"""{colored(f"Discards Incoming Mail: {'Yes' if data['discard'] else 'No'}")}\n"""
f"""{colored(f"Disabled: {'Yes' if data['disabled'] else 'No'}")}\n"""
)
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(3)
os.system("cls||clear")
continue
case 3:
os.system("cls||clear")
print(colored("Creating new mailbox"))
case 4:
os.system("cls||clear")
print(colored("Deleting an existing mailbox"))
address = input(colored("Choose an inbox to delete: "))
confirmation = input(colored("Confirm? [y/N] "))
if confirmation.strip().lower() in ("y", "yes"):
if isinstance(
resp := await handle_errors(
"delete",
user,
password,
f"{server}/admin/api/v1/boxes/{address}",
follow_redirects=True,
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided mailbox does not exist"))
elif resp.status_code == 204:
print(colored(f"{address} successfully deleted!"))
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(3)
os.system("cls||clear")
continue
else:
print(colored("Aborting"))
skip = True
await trio.sleep(3)
case 5:
data = {
"passwordPlaintext": "",
"name": "",
"disabled": False,
"superAdmin": False,
}
os.system("cls||clear")
print(colored("Updating an existing mailbox"))
email = input(colored("Select a mailbox to update: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/boxes/{email}", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided mailbox does not exist"))
await trio.sleep(3)
os.system("cls||clear")
continue
elif account_info := decode_json(resp):
data["disabled"] = account_info["disabled"]
data["superAdmin"] = account_info["super_admin"]
data["name"] = account_info["name"]
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(3)
os.system("cls||clear")
continue
print(
f"""{colored("Choose what to update:")}\n"""
f"""{fg.blue}[{fg.green}1{fg.blue}] Update Name\n"""
f"""{fg.blue}[{fg.green}2{fg.blue}] Update Password\n"""
f"""{fg.blue}[{fg.green}3{fg.blue}] Promote to System Admin\n"""
f"""{fg.blue}[{fg.green}4{fg.blue}] Demote from System Admin\n"""
f"""{fg.blue}[{fg.green}5{fg.blue}] Disable account\n"""
f"""{fg.blue}[{fg.green}6{fg.blue}] Enable account\n"""
)
keep = False
while True:
what = input(colored("What information do you want to update? "))
if not what or not what.isnumeric():
print(fg.yellow("Invalid choice"))
await trio.sleep(3)
else:
match int(what):
case 1:
new_name = input(colored(f"New Name for {email} (empty to abort): "))
if data["name"] == new_name:
print(colored(f"The desired name is already set for {email}"))
elif new_name.strip():
data["name"] = new_name
keep = True
break
case 2:
new_password = input(colored(f"New Password for {email} (empty to abort): "))
if new_password.strip():
print(len(new_password))
data["passwordPlaintext"] = new_password
keep = True
break
case 3:
if data["superAdmin"]:
print(colored(f"{email} is already a System Admin"))
else:
print(colored(f"Promoting {email} to System Admin"))
data["superAdmin"] = True
keep = True
break
case 4:
if not data["superAdmin"]:
print(colored(f"{email} is not a System Admin"))
else:
print(colored(f"Demoting {email} from System Admin"))
data["superAdmin"] = False
keep = True
break
case 5:
if data["disabled"]:
print(colored(f"{email} is already disabled"))
else:
print(colored(f"Disabling mailbox {email} "))
keep = True
data["disabled"] = True
break
case 6:
if not data["disabled"]:
print(colored(f"{email} is not disabled"))
else:
print(colored(f"Enabling mailbox {email} "))
keep = True
data["disabled"] = False
break
case _:
print(fg.yellow("Invalid choice"))
await trio.sleep(3)
if keep:
confirmation = input(colored("Confirm? [y/N] "))
if confirmation.strip().lower() in ("y", "yes"):
if isinstance(
resp := await handle_errors(
"patch",
user,
password,
f"{server}/admin/api/v1/boxes/{email}",
follow_redirects=True,
data=data
),
httpx.Response,
):
if resp.status_code == 404:
print(
fg.red(
"The provided mailbox no longer exists: has it been deleted in the meantime?"
)
)
await trio.sleep(3)
elif resp.status_code == 204:
print(colored("Information Updated"))
else:
print(colored("Aborting"))
skip = True
await trio.sleep(3)
case 6:
os.system("cls||clear")
print(colored("Getting in/out statistics for existing mailbox"))
email = input(colored("Select a mailbox to fetch stats for: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/boxes/{email}/stats", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided mailbox does not exist"))
await trio.sleep(3)
os.system("cls||clear")
continue
elif stats := decode_json(resp):
for day in stats:
print(colored(day.replace("-", "/")))
print(colored(f"\t- In: {stats[day]['in']}\n\t- Out: {stats[day]['out']}"))
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(3)
os.system("cls||clear")
continue
case 7:
print(colored("Getting quota limits for an existing mailbox"))
case 8:
os.system("cls||clear")
print(colored("Updating quota limits for an existing mailbox"))
case 9:
os.system("cls||clear")
print(colored("Listing all existing virtual domains"))
if isinstance(
resp := await handle_errors(
"get",
user,
password,
f"{server}/admin/api/v1/domains",
params={"page": 1, "paging": 999999},
follow_redirects=True,
),
httpx.Response,
):
if data := decode_json(resp):
print(fg.green(f"Fetched {data['results_count']} domains"))
for domain in data["results"]:
print(f"\t{fg.yellow}- {colored(domain['name'])}")
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(3)
os.system("cls||clear")
continue
case 10:
os.system("cls||clear")
print(colored("Getting info about an existing virtual domain"))
domain = input(colored("Choose a virtual domain to get info for: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/domains/{domain}", follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided virtual domain does not exist"))
elif data := decode_json(resp):
print(
f"""{colored("Virtual domain information for")} {colored(domain)}\n\n"""
f"""{colored(f"Home Dir: {data['home']}")}\n"""
f"""{colored(f"Forward: {data['forward_domain'] if data['forward'] else 'No'}")}\n"""
f"""{colored(f"Creation Date: {data['created']}")}\n"""
f"""{colored(f"Last Update Date: {data['updated']}")}\n"""
f"""{colored(f"Domain bin (catch-all): {data['domain_bin_address'] if data['domain_bin'] else 'No'}")}\n"""
f"""{colored(f"Force Route: {data['force_route_host'] if data['force_route'] else 'No'}")}\n"""
)
case 11:
os.system("cls||clear")
print(colored("Generating a DKIM key pair for an existing virtual domain"))
case 12:
os.system("cls||clear")
print(colored("Getting the DKIM key pair for an existing virtual domain"))
case 13:
os.system("cls||clear")
print(colored("Deleting the DKIM key pair for an existing virtual domain"))
case 14:
os.system("cls||clear")
print(colored("Getting in/out statistics for an existing virtual domain"))
domain = input(colored("Select a virtual domain to fetch stats for: "))
if isinstance(
resp := await handle_errors(
"get", user, password, f"{server}/admin/api/v1/domains/{domain}/stats",
follow_redirects=True
),
httpx.Response,
):
if resp.status_code == 404:
print(fg.red("The provided virtual domain does not exist"))
await trio.sleep(3)
os.system("cls||clear")
continue
elif stats := decode_json(resp):
for day in stats:
print(colored(day.replace("-", "/")))
print(colored(f"\t- In: {stats[day]['in']}\n\t- Out: {stats[day]['out']}"))
else:
print(
fg.red(
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
)
)
await trio.sleep(3)
os.system("cls||clear")
continue
case 15:
os.system("cls||clear")
print(colored("Getting status information for an existing mailbox"))
case 16:
os.system("cls||clear")
print(colored("Updating an existing virtual domain's information"))
case 17:
os.system("cls||clear")
print(colored("Searching a mailbox by address"))
case _:
skip = True
print(fg.yellow("Invalid choice"))
await trio.sleep(3)
os.system("cls||clear")
if not skip:
pause(fg.cyan("Press any key to continue"))
# Awful, but it works so shut up
os.system("cls||clear")
except KeyboardInterrupt:
os.system("cls||clear")
# This way when you press Ctrl+C
# or Ctrl+D from an inner section
# you are brought back to the
# main menu
if not prompted:
return 0
except EOFError:
os.system("cls||clear")
if not prompted:
return -1
# This tool technically doesn't *need* to be async, but shut up
async def main(args: argparse.Namespace) -> int:
"""
Main program entry point
"""
user, password = (None, None)
if not re.match(r"http(s)?://", args.server_url):
print(fg.red("Missing or invalid schema in server URL!"))
return -1
elif not validators.url(args.server_url, public=True):
# bool(ValidationFailure) is False :)
print(fg.red("Invalid server URL, exiting"))
return -1
if os.path.isfile("credentials.json"):
try:
with open("credentials.json") as credentials:
data = json.load(credentials)
except json.decoder.JSONDecodeError as json_error:
print(
fg.red(
f"A JSON error occurred while attempting to load credentials.json -> {type(json_error).__name__}:"
f" {json_error}"
)
)
except PermissionError:
print(
fg.red(
"Could not read credentials.json due to a permission issue. Make sure the file is readable by"
" the current user and try again"
)
)
except OSError as os_error:
print(
fg.red(
f"Could not read credentials.json due to a generic OS error -> {type(os_error).__name__}:"
f"{os_error}"
)
)
else:
try:
user, password = data["username"], data["password"]
except KeyError:
print(fg.yellow("The JSON payload in credentials.json is not valid"))
if not all([user, password]) and not all([args.username, args.password]):
print(
fg.yellow("Credentials could not be loaded and an explicit username/password pair wasn't provided, exiting")
)
return -1
elif all([args.username, args.password]):
user, password = args.username, args.password
if args.save:
print(fg.green(f"Saving credentials for {user} to credentials.json"))
try:
with open("credentials.json", "w") as save_file:
json.dump({"username": user, "password": password}, save_file)
except PermissionError:
print(
fg.red(
"Could not write credentials.json due to a permission issue. Make sure the file is readable by"
" the current user and try again"
)
)
except OSError as os_error:
print(
fg.red(
f"Could not write credentials.json due to a generic OS error -> {type(os_error).__name__}:"
f"{os_error}"
)
)
else:
print(fg.green(f"Loaded credentials for {user} from credentials.json"))
print(fg.yellow(f"Attempting login with {user} to {args.server_url}"))
if isinstance(
resp := await handle_errors("get", user, password, f"{args.server_url}/admin/api/doc", follow_redirects=True),
httpx.Response,
):
if resp.status_code == 200:
print(fg.green("Authentication successful!"))
return await loop(args.server_url, user, password)
else:
print(fg.red(f"An error occurred while attempting to log in -> {type(resp.__name__)} -> {resp}"))
return -1
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A simple tool to interact with the REST API of poste.io instances")
parser.add_argument("--username", help="The username to authenticate with the API")
parser.add_argument("--password", help="The password to authenticate with the API")
parser.add_argument(
"--save",
help="Saves credentials in a credentials.json file for future use. Overwrites any existing credentials file",
action="store_true",
default=False,
)
parser.add_argument(
"--server-url", help="The URL of your poste.io instance. MUST include an http(s):// prefix", required=True
)
parser.add_argument(
"--nocolor",
help="Disables the unicorn puke and outputs plain old boring text",
action="store_true",
default=False,
)
arguments = parser.parse_args()
if arguments.nocolor:
# This hack is awful, but it works and is much better than using
# parameters or globals everywhere. All hail dynamic languages
fg = type(
"fg",
(object,),
{
"__getattr__": lambda self, a: type(
"i", (object,), {"__repr__": lambda s: "", "__call__": lambda m, x: x}
)(),
},
)()
try:
os.system("cls||clear")
print(colored("MailMaker v0.0.1, made with <3 and Python 3.8 by @nocturn9x"))
sys.exit(trio.run(main, parser.parse_args()))
except KeyboardInterrupt:
os.system("cls||clear")
sys.exit(0)

25
requirements.txt Normal file
View File

@ -0,0 +1,25 @@
anyio==3.5.0
async-generator==1.10
attrs==21.4.0
certifi==2021.10.8
charset-normalizer==2.0.11
colorama==0.4.4
commonmark==0.9.1
console @ git+https://github.com/mixmastamyk/console
decorator==5.1.1
ezenv==0.92
grapejuice==4.11.2
h11==0.12.0
httpcore==0.14.7
httpx==0.22.0
idna==3.3
outcome==1.1.0
Pygments==2.11.2
rfc3986==1.5.0
rich==11.1.0
six==1.16.0
sniffio==1.2.0
sortedcontainers==2.4.0
trio==0.19.0
validators==0.18.2
webcolors==1.11.1