1022 lines
56 KiB
Python
Executable File
1022 lines
56 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Copyright (C) 2021 nocturn9x
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
https://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
"""
|
|
import re
|
|
import os
|
|
import sys
|
|
import trio
|
|
import json
|
|
import httpx
|
|
import random
|
|
import socket
|
|
import string
|
|
import getpass
|
|
import argparse
|
|
import validators
|
|
from typing import Union, Any
|
|
from console import fg
|
|
|
|
try:
|
|
from console.utils import pause
|
|
except ImportError:
|
|
# For some reason the lib does not export utils by default ¯\_(ツ)_/¯
|
|
print(f"Please edit the console library's __init__.py file at line 58 and add the line 'from . import utils'")
|
|
sys.exit(0)
|
|
|
|
|
|
async def send_authenticated_request(
|
|
method: str, user: str, password: str, *args, **kwargs
|
|
) -> Union[httpx.Response, httpx.RequestError]:
|
|
"""
|
|
Sends an HTTP request using httpx and basic HTTP authentication
|
|
"""
|
|
|
|
try:
|
|
# print(f"calling httpx.{method}({f', '.join(map(repr, args))}, auth=({user!r}, {password!r}), {', '.join(f'{name}={repr(value)}' for name, value in kwargs.items())})")
|
|
async with httpx.AsyncClient() as client:
|
|
return await getattr(client, method)(*args, **kwargs, auth=(user, password))
|
|
except (httpx.RequestError, socket.gaierror, OSError) as e:
|
|
return e
|
|
|
|
|
|
async def handle_errors(*args, **kwargs) -> Union[httpx.Response, httpx.RequestError]:
|
|
"""
|
|
Simple wrapper to avoid copy-pasting the same
|
|
error handling code 20 times
|
|
"""
|
|
|
|
if isinstance(resp := await send_authenticated_request(*args, **kwargs), httpx.Response):
|
|
match resp.status_code:
|
|
case 200 | 201 | 204 | 404 | 400:
|
|
return resp
|
|
case 401:
|
|
print(fg.red("Authentication failed: Unauthorized (wrong credentials)"))
|
|
case 403:
|
|
print(fg.red("Authentication failed: Forbidden"))
|
|
case resp.status_code if resp.status_code >= 500:
|
|
print(fg.red(f"The server raised a {resp.status_code} error, maybe try again later?"))
|
|
case _:
|
|
print(fg.yellow(f"The server returned unexpected status code {resp.status_code}, exiting"))
|
|
return resp
|
|
|
|
|
|
def decode_json(response: httpx.Response) -> Any:
|
|
"""
|
|
Handles potential exceptions from httpx.Response.json():
|
|
"""
|
|
|
|
try:
|
|
return response.json()
|
|
except json.decoder.JSONDecodeError as json_error:
|
|
print(
|
|
fg.red(
|
|
f"A JSON error occurred while attempting to decode some data -> {type(json_error).__name__}:"
|
|
f" {json_error}"
|
|
)
|
|
)
|
|
return ""
|
|
|
|
|
|
_last_color = None
|
|
|
|
|
|
def colored(s: str) -> str:
|
|
"""
|
|
Calls fg() with a random color each time, but
|
|
making sure it doesn't use the same color across
|
|
two consecutive calls
|
|
"""
|
|
|
|
global _last_color # Ew...
|
|
while (c := random.choice(["blue", "magenta", "red", "cyan", "blue", "purple", "yellow", "green"])) == _last_color:
|
|
pass
|
|
_last_color = c
|
|
return getattr(fg, _last_color)(s)
|
|
|
|
|
|
async def loop(server: str, user: str, password: str) -> int:
|
|
"""
|
|
Program main loop
|
|
"""
|
|
|
|
prompt_pattern = f"""{fg.green}Welcome to MailMaker! Choose what to do:{fg.default}
|
|
{fg.blue}[{fg.green}1{fg.blue}] {colored("List all existing mailboxes")}
|
|
{fg.blue}[{fg.green}2{fg.blue}] {colored("Get info about an existing mailbox")}
|
|
{fg.blue}[{fg.green}3{fg.blue}] {colored("Create a new mailbox")}
|
|
{fg.blue}[{fg.green}4{fg.blue}] {colored("Delete an existing mailbox")}
|
|
{fg.blue}[{fg.green}5{fg.blue}] {colored("Update an existing mailbox's information")}
|
|
{fg.blue}[{fg.green}6{fg.blue}] {colored("Get in/out statistics for an existing mailbox")}
|
|
{fg.blue}[{fg.green}7{fg.blue}] {colored("Get quota limits for an existing mailbox")}
|
|
{fg.blue}[{fg.green}8{fg.blue}] {colored("Update quota limits for an existing mailbox")}
|
|
{fg.blue}[{fg.green}9{fg.blue}] {colored("List all existing virtual domains")}
|
|
{fg.blue}[{fg.green}10{fg.blue}] {colored("Get info about an existing virtual domain")}
|
|
{fg.blue}[{fg.green}11{fg.blue}] {colored("Generate a DKIM key pair for an existing virtual domain")}
|
|
{fg.blue}[{fg.green}12{fg.blue}] {colored("Get the DKIM key pair for an existing virtual domain")}
|
|
{fg.blue}[{fg.green}13{fg.blue}] {colored("Delete the DKIM key pair for an existing virtual domain")}
|
|
{fg.blue}[{fg.green}14{fg.blue}] {colored("Get in/out statistics for an existing virtual domain")}
|
|
{fg.blue}[{fg.green}15{fg.blue}] {colored("Update an existing virtual domain's information")}
|
|
{fg.blue}[{fg.green}16{fg.blue}] {colored("Search a mailbox by address")}"""
|
|
skip = False
|
|
prompted = False
|
|
while True:
|
|
try:
|
|
# Some unicorn puke :D
|
|
prompt = prompt_pattern.format(
|
|
*(
|
|
(
|
|
getattr(
|
|
fg, random.choice(["blue", "magenta", "red", "cyan", "blue", "purple", "yellow", "green"])
|
|
)
|
|
)
|
|
for _ in range(len(prompt_pattern.splitlines()) - 1)
|
|
)
|
|
)
|
|
print(prompt)
|
|
skip = False
|
|
prompted = False
|
|
choice = input(fg.purple("Make your choice: "))
|
|
prompted = True
|
|
if not choice.strip() or not choice.isnumeric():
|
|
skip = True
|
|
print(fg.yellow("Invalid choice (must be a number)"))
|
|
await trio.sleep(2)
|
|
else:
|
|
# We clear each time because we don't want
|
|
# to clear the screen in the default case
|
|
match int(choice):
|
|
case 1:
|
|
os.system("cls||clear")
|
|
print(colored("Listing all mailboxes"))
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"get",
|
|
user,
|
|
password,
|
|
f"{server}/admin/api/v1/boxes",
|
|
params={"page": 1, "paging": 999999},
|
|
follow_redirects=True,
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if data := decode_json(resp):
|
|
print(fg.green(f"Fetched {data['results_count']} addresses"))
|
|
for mailbox in data["results"]:
|
|
print(f"\t{fg.yellow}- {colored(mailbox['address'])}")
|
|
else:
|
|
print(
|
|
fg.red(
|
|
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
case 2:
|
|
os.system("cls||clear")
|
|
address = input(colored("Choose an inbox to get info for: "))
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"get", user, password, f"{server}/admin/api/v1/boxes/{address}", follow_redirects=True
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(fg.red("The provided mailbox does not exist"))
|
|
elif data := decode_json(resp):
|
|
if not data["redirect_to"]:
|
|
redirect_list = "None"
|
|
else:
|
|
redirect_list = ""
|
|
for redirect in data["redirect_to"]:
|
|
redirect_list += f"\n\t- {redirect}\n"
|
|
print(
|
|
f"""{colored("Mailbox information for")} {colored(address)}\n\n"""
|
|
f"""{colored(f"User: {data['user']}")}\n"""
|
|
f"""{colored(f"Name: {data['name'] or 'None'}")}\n"""
|
|
f"""{colored(f"System Admin: {'Yes' if data['super_admin'] else 'No'}")}\n"""
|
|
f"""{colored(f"Domain Admin: {'Yes' if data['domain_admin'] else 'No'}")}\n"""
|
|
f"""{colored(f"Home Dir: {data['home']}")}\n"""
|
|
f"""{colored(f"Strict 'From' Header: {'Not Enforced' if data['strict_from_disabled'] else 'Enforced'}")}\n"""
|
|
f"""{colored(f"Creation Date: {data['created']}")}\n"""
|
|
f"""{colored(f"Last Update Date: {data['updated']}")}\n"""
|
|
f"""{colored(f"Redirect Only: {'Yes' if data['redirect_only'] else 'No'}")}\n"""
|
|
f"""{colored(f"Redirects to: {redirect_list}")}\n"""
|
|
f"""{colored(f"Discards Incoming Mail: {'Yes' if data['discard'] else 'No'}")}\n"""
|
|
f"""{colored(f"Disabled: {'Yes' if data['disabled'] else 'No'}")}\n"""
|
|
)
|
|
else:
|
|
print(
|
|
fg.red(
|
|
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
case 3:
|
|
os.system("cls||clear")
|
|
print(colored("Creating new mailbox"))
|
|
name = input(colored("Choose a name for the mailbox: "))
|
|
while True:
|
|
email = input(colored("Choose an email address for the mailbox: "))
|
|
if not validators.email(email):
|
|
print(fg.red("Invalid email address"))
|
|
continue
|
|
break
|
|
while True:
|
|
mail_password = getpass.getpass(colored("Choose a password for the mailbox (empty for auto-generated password): "))
|
|
if not mail_password.strip():
|
|
mail_password = "".join((random.choice(string.ascii_letters + string.digits + string.punctuation) for _ in range(random.randint(10, 16))))
|
|
print(colored(f"Generated password is {mail_password}"))
|
|
break
|
|
confirm_password = getpass.getpass(colored("Confirm the password: "))
|
|
if mail_password != confirm_password:
|
|
print(fg.red("Passwords do not match"))
|
|
continue
|
|
break
|
|
while True:
|
|
destinations = input(colored("Is this mailbox a redirect?\nIf so, provide a comma-separated list of destinations here: "))
|
|
if destinations and not all((validators.email(mail) for mail in destinations.strip().split(","))):
|
|
print(fg.red("One or more destinations are not valid"))
|
|
continue
|
|
break
|
|
confirmation = input(colored("System Admin? [y/N] "))
|
|
if confirmation.strip().lower() in ("y", "yes"):
|
|
admin = True
|
|
else:
|
|
admin = False
|
|
if (t := destinations.strip().split(",")) == ['']:
|
|
# Split has some weird behavior
|
|
destinations = []
|
|
else:
|
|
destinations = t
|
|
data = {"name": name,
|
|
"email": email,
|
|
"passwordPlaintext": mail_password,
|
|
"disabled": False,
|
|
"referenceId": "", # TODO: Set this?
|
|
"superAdmin": admin,
|
|
"redirectTo": destinations
|
|
}
|
|
confirmation = input(colored("Confirm? [y/N] "))
|
|
if confirmation.strip().lower() in ("y", "yes"):
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"post",
|
|
user,
|
|
password,
|
|
f"{server}/admin/api/v1/boxes",
|
|
follow_redirects=True,
|
|
data=data
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 201:
|
|
print(colored(f"{email} successfully created!"))
|
|
elif resp.status_code == 400:
|
|
print(fg.red(f"An error occurred: {decode_json(resp) or resp.content.decode()}"))
|
|
elif resp.status_code == 404:
|
|
print(fg.red("Cannot create mailbox: the chosen domain is not registered"))
|
|
else:
|
|
print(
|
|
fg.red(
|
|
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
else:
|
|
print(colored("Aborting"))
|
|
skip = True
|
|
await trio.sleep(2)
|
|
|
|
case 4:
|
|
os.system("cls||clear")
|
|
print(colored("Deleting an existing mailbox"))
|
|
address = input(colored("Choose an inbox to delete: "))
|
|
confirmation = input(colored("Confirm? [y/N] "))
|
|
if confirmation.strip().lower() in ("y", "yes"):
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"delete",
|
|
user,
|
|
password,
|
|
f"{server}/admin/api/v1/boxes/{address}",
|
|
follow_redirects=True,
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(fg.red("The provided mailbox does not exist"))
|
|
elif resp.status_code == 204:
|
|
print(colored(f"{address} successfully deleted!"))
|
|
else:
|
|
print(
|
|
fg.red(
|
|
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
else:
|
|
print(colored("Aborting"))
|
|
skip = True
|
|
await trio.sleep(2)
|
|
case 5:
|
|
data = {
|
|
"passwordPlaintext": "",
|
|
"name": "",
|
|
"disabled": False,
|
|
"superAdmin": False,
|
|
}
|
|
os.system("cls||clear")
|
|
print(colored("Updating an existing mailbox"))
|
|
email = input(colored("Select a mailbox to update: "))
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"get", user, password, f"{server}/admin/api/v1/boxes/{email}", follow_redirects=True
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(fg.red("The provided mailbox does not exist"))
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
elif account_info := decode_json(resp):
|
|
data["disabled"] = account_info["disabled"]
|
|
data["superAdmin"] = account_info["super_admin"]
|
|
data["name"] = account_info["name"]
|
|
else:
|
|
print(
|
|
fg.red(
|
|
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
print(
|
|
f"""{colored("Choose what to update:")}\n"""
|
|
f"""{fg.blue}[{fg.green}1{fg.blue}] Update Name\n"""
|
|
f"""{fg.blue}[{fg.green}2{fg.blue}] Update Password\n"""
|
|
f"""{fg.blue}[{fg.green}3{fg.blue}] Promote to System Admin\n"""
|
|
f"""{fg.blue}[{fg.green}4{fg.blue}] Demote from System Admin\n"""
|
|
f"""{fg.blue}[{fg.green}5{fg.blue}] Disable account\n"""
|
|
f"""{fg.blue}[{fg.green}6{fg.blue}] Enable account\n"""
|
|
)
|
|
keep = False
|
|
while True:
|
|
what = input(colored("What information do you want to update? "))
|
|
if not what or not what.isnumeric():
|
|
print(fg.yellow("Invalid choice (must be a number)"))
|
|
await trio.sleep(2)
|
|
else:
|
|
match int(what):
|
|
case 1:
|
|
new_name = input(colored(f"New Name for {email} (empty to abort): "))
|
|
if data["name"] == new_name:
|
|
print(colored(f"The desired name is already set for {email}"))
|
|
await trio.sleep(2)
|
|
continue
|
|
elif new_name.strip():
|
|
data["name"] = new_name
|
|
keep = True
|
|
break
|
|
case 2:
|
|
new_password = input(colored(f"New Password for {email} (empty to abort): "))
|
|
if new_password.strip():
|
|
data["passwordPlaintext"] = new_password
|
|
keep = True
|
|
break
|
|
case 3:
|
|
if data["superAdmin"]:
|
|
print(colored(f"{email} is already a System Admin"))
|
|
await trio.sleep(2)
|
|
continue
|
|
else:
|
|
print(colored(f"Promoting {email} to System Admin"))
|
|
data["superAdmin"] = True
|
|
keep = True
|
|
break
|
|
case 4:
|
|
if not data["superAdmin"]:
|
|
print(colored(f"{email} is not a System Admin"))
|
|
await trio.sleep(2)
|
|
continue
|
|
else:
|
|
print(colored(f"Demoting {email} from System Admin"))
|
|
data["superAdmin"] = False
|
|
keep = True
|
|
break
|
|
case 5:
|
|
if data["disabled"]:
|
|
print(colored(f"{email} is already disabled"))
|
|
await trio.sleep(2)
|
|
continue
|
|
else:
|
|
print(colored(f"Disabling mailbox {email} "))
|
|
keep = True
|
|
data["disabled"] = True
|
|
break
|
|
case 6:
|
|
if not data["disabled"]:
|
|
print(colored(f"{email} is not disabled"))
|
|
await trio.sleep(2)
|
|
continue
|
|
else:
|
|
print(colored(f"Enabling mailbox {email} "))
|
|
keep = True
|
|
data["disabled"] = False
|
|
break
|
|
case _:
|
|
print(fg.yellow("Invalid choice (value out of range)"))
|
|
await trio.sleep(2)
|
|
if keep:
|
|
confirmation = input(colored("Confirm? [y/N] "))
|
|
if confirmation.strip().lower() in ("y", "yes"):
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"patch",
|
|
user,
|
|
password,
|
|
f"{server}/admin/api/v1/boxes/{email}",
|
|
follow_redirects=True,
|
|
data=data
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(
|
|
fg.red(
|
|
"The provided mailbox no longer exists: has it been deleted in the meantime?"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
elif resp.status_code == 204:
|
|
print(colored("Information Updated"))
|
|
else:
|
|
print(colored("Aborting"))
|
|
skip = True
|
|
await trio.sleep(2)
|
|
case 6:
|
|
os.system("cls||clear")
|
|
print(colored("Getting in/out statistics for existing mailbox"))
|
|
email = input(colored("Select a mailbox to fetch stats for: "))
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"get", user, password, f"{server}/admin/api/v1/boxes/{email}/stats", follow_redirects=True
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(fg.red("The provided mailbox does not exist"))
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
elif stats := decode_json(resp):
|
|
for day in stats:
|
|
print(colored(day.replace("-", "/")))
|
|
print(colored(f"\t- In: {stats[day]['in']}\n\t- Out: {stats[day]['out']}"))
|
|
else:
|
|
print(
|
|
fg.red(
|
|
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
case 7:
|
|
os.system("cls||clear")
|
|
print(colored("Getting quota limits for an existing mailbox"))
|
|
mail = input(colored("Choose a mailbox to get quota info for: "))
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"get", user, password, f"{server}/admin/api/v1/boxes/{mail}/quota", follow_redirects=True
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(fg.red("The provided mailbox does not exist"))
|
|
elif data := decode_json(resp):
|
|
print(
|
|
f"""{colored("Quota information for")} {colored(mail)}\n\n"""
|
|
f"""{colored(f"Storage Limit: {data['storage_limit']}")}\n"""
|
|
f"""{colored(f"Storage Usage: {data['storage_usage']}")}\n"""
|
|
f"""{colored(f"Count Limit: {data['count_limit']}")}\n"""
|
|
f"""{colored(f"Count Usage: {data['count_usage']}")}\n"""
|
|
)
|
|
case 8:
|
|
os.system("cls||clear")
|
|
print(colored("Updating quota limits for an existing mailbox"))
|
|
data = {
|
|
"storageLimit": 0,
|
|
"countLimit": 0,
|
|
}
|
|
email = input(colored("Select a mailbox to update quotas for: "))
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"get", user, password, f"{server}/admin/api/v1/boxes/{email}/quota", follow_redirects=True
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(fg.red("The provided mailbox does not exist"))
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
elif account_info := decode_json(resp):
|
|
data["storageLimit"] = account_info["storage_limit"]
|
|
data["countLimit"] = account_info["count_limit"]
|
|
else:
|
|
print(
|
|
fg.red(
|
|
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
print(
|
|
f"""{colored("Choose what to update:")}\n"""
|
|
f"""{fg.blue}[{fg.green}1{fg.blue}] Update Storage Limit\n"""
|
|
f"""{fg.blue}[{fg.green}2{fg.blue}] Update Count Limit\n"""
|
|
)
|
|
keep = False
|
|
while True:
|
|
what = input(colored("What information do you want to update? "))
|
|
if not what or not what.isnumeric():
|
|
print(fg.yellow("Invalid choice (must be a number)"))
|
|
await trio.sleep(2)
|
|
else:
|
|
match int(what):
|
|
case 1:
|
|
new_storage = int(input(colored(f"New storage limit for {email} (current is {data['storageLimit']}, empty to abort): ")))
|
|
if data["storageLimit"] == new_storage:
|
|
print(colored(f"The desired storage limit is already set to {data['storageLimit']} for {email}"))
|
|
await trio.sleep(2)
|
|
continue
|
|
elif new_storage.strip():
|
|
if not new_storage.isnumeric():
|
|
print(fg.yellow("Invalid choice (must be a number)"))
|
|
await trio.sleep(2)
|
|
continue
|
|
data["storageLimit"] = new_name
|
|
keep = True
|
|
break
|
|
case 2:
|
|
new_count = int(input(colored(f"New count limit for {email} (current is {data['countLimit']}, empty to abort): ")))
|
|
if data["countLimit"] == new_count:
|
|
print(colored(f"The desired count limit is already set to {data['countLimit']} for {email}"))
|
|
await trio.sleep(2)
|
|
continue
|
|
else:
|
|
data["countLimit"] = new_count
|
|
keep = True
|
|
break
|
|
case _:
|
|
print(fg.yellow("Invalid choice (value out of range)"))
|
|
await trio.sleep(2)
|
|
if keep:
|
|
confirmation = input(colored("Confirm? [y/N] "))
|
|
if confirmation.strip().lower() in ("y", "yes"):
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"patch",
|
|
user,
|
|
password,
|
|
f"{server}/admin/api/v1/boxes/{email}/quota",
|
|
follow_redirects=True,
|
|
data=data
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(
|
|
fg.red(
|
|
"The provided mailbox no longer exists: has it been deleted in the meantime?"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
elif resp.status_code == 204:
|
|
print(colored("Quota Updated"))
|
|
elif resp.status_code == 500:
|
|
print(fg.yellow("Are you trying to update limits for a System Administrator which is now allowed?"))
|
|
else:
|
|
print(colored("Aborting"))
|
|
skip = True
|
|
await trio.sleep(2)
|
|
case 9:
|
|
os.system("cls||clear")
|
|
print(colored("Listing all existing virtual domains"))
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"get",
|
|
user,
|
|
password,
|
|
f"{server}/admin/api/v1/domains",
|
|
params={"page": 1, "paging": 999999},
|
|
follow_redirects=True,
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if data := decode_json(resp):
|
|
print(fg.green(f"Fetched {data['results_count']} domains"))
|
|
for domain in data["results"]:
|
|
print(f"\t{fg.yellow}- {colored(domain['name'])}")
|
|
else:
|
|
print(
|
|
fg.red(
|
|
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
case 10:
|
|
os.system("cls||clear")
|
|
print(colored("Getting info about an existing virtual domain"))
|
|
domain = input(colored("Choose a virtual domain to get info for: "))
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"get", user, password, f"{server}/admin/api/v1/domains/{domain}", follow_redirects=True
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(fg.red("The provided virtual domain does not exist"))
|
|
elif data := decode_json(resp):
|
|
print(
|
|
f"""{colored("Virtual domain information for")} {colored(domain)}\n\n"""
|
|
f"""{colored(f"Home Dir: {data['home']}")}\n"""
|
|
f"""{colored(f"Forward: {data['forward_domain'] if data['forward'] else 'No'}")}\n"""
|
|
f"""{colored(f"Creation Date: {data['created']}")}\n"""
|
|
f"""{colored(f"Last Update Date: {data['updated']}")}\n"""
|
|
f"""{colored(f"Domain bin (catch-all): {data['domain_bin_address'] if data['domain_bin'] else 'No'}")}\n"""
|
|
f"""{colored(f"Force Route: {data['force_route_host'] if data['force_route'] else 'No'}")}\n"""
|
|
)
|
|
case 11:
|
|
os.system("cls||clear")
|
|
print(colored("Generating a DKIM key pair for an existing virtual domain"))
|
|
case 12:
|
|
os.system("cls||clear")
|
|
print(colored("Getting the DKIM key pair for an existing virtual domain"))
|
|
case 13:
|
|
os.system("cls||clear")
|
|
print(colored("Deleting the DKIM key pair for an existing virtual domain"))
|
|
case 14:
|
|
os.system("cls||clear")
|
|
print(colored("Getting in/out statistics for an existing virtual domain"))
|
|
domain = input(colored("Select a virtual domain to fetch stats for: "))
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"get", user, password, f"{server}/admin/api/v1/domains/{domain}/stats",
|
|
follow_redirects=True
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(fg.red("The provided virtual domain does not exist"))
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
elif stats := decode_json(resp):
|
|
for day in stats:
|
|
print(colored(day.replace("-", "/")))
|
|
print(colored(f"\t- In: {stats[day]['in']}\n\t- Out: {stats[day]['out']}"))
|
|
else:
|
|
print(
|
|
fg.red(
|
|
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
case 15:
|
|
os.system("cls||clear")
|
|
print(colored("Updating an existing virtual domain"))
|
|
data = {"forward": False,
|
|
"forwardDomain": "",
|
|
"domainBin": False,
|
|
"domainBinAddress": "",
|
|
"forceRoute": False,
|
|
"forceRouteHost": "",
|
|
"referenceId": ""
|
|
}
|
|
domain = input(colored("Select a virtual domain to update: "))
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"get", user, password, f"{server}/admin/api/v1/domains/{domain}", follow_redirects=True
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 404:
|
|
print(fg.red("The provided virtual domain does not exist"))
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
elif domain_info := decode_json(resp):
|
|
data["forward"] = domain_info["forward"]
|
|
data["domainBin"] = domain_info["domain_bin"]
|
|
data["forceRoute"] = domain_info["force_route"]
|
|
if domain_info["forward"]:
|
|
data["forwardDomain"] = domain_info["forward_domain"]
|
|
if domain_info["force_route"]:
|
|
data["forceRouteHost"] = domain_info["force_route_host"]
|
|
if domain_info["domain_bin"]:
|
|
data["domainBinAddress"] = domain_info["domain_bin_address"]
|
|
else:
|
|
print(
|
|
fg.red(
|
|
f"An HTTP exception occurred while sending request -> {type(resp).__name__}: {resp}"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
continue
|
|
print(
|
|
f"""{colored("Choose what to update:")}\n"""
|
|
f"""{fg.blue}[{fg.green}1{fg.blue}] Set/Update domain bin (catch-all)\n"""
|
|
f"""{fg.blue}[{fg.green}2{fg.blue}] Set/Update forced routing\n"""
|
|
f"""{fg.blue}[{fg.green}3{fg.blue}] Set/Update forward domain\n"""
|
|
f"""{fg.blue}[{fg.green}4{fg.blue}] Unset domain bin (catch-all)\n"""
|
|
f"""{fg.blue}[{fg.green}5{fg.blue}] Unset forced routing\n"""
|
|
f"""{fg.blue}[{fg.green}6{fg.blue}] Unset forward domain\n"""
|
|
)
|
|
keep = False
|
|
while True:
|
|
what = input(colored("What information do you want to update? "))
|
|
if not what or not what.isnumeric():
|
|
print(fg.yellow("Invalid choice (must be a number)"))
|
|
await trio.sleep(2)
|
|
else:
|
|
match int(what):
|
|
case 1:
|
|
if data["domainBin"]:
|
|
print(colored(f"Note: current catch-all for {domain} is set to {data['domainBinAddress']}"))
|
|
new_bin = input(colored(f"New domain catch-all for {domain} (empty to abort): "))
|
|
if data["domainBinAddress"] and data["domainBinAddress"] == new_bin:
|
|
print(colored(f"The desired catch-all is already set to {new_bin} for {domain}"))
|
|
elif new_bin.strip():
|
|
data["domainBinAddress"] = new_bin
|
|
data["domainBin"] = True
|
|
keep = True
|
|
break
|
|
case 2:
|
|
if data["forceRoute"]:
|
|
print(colored(f"Note: {domain} is currently routed trough {data['forceRouteHost']}"))
|
|
while True:
|
|
new_routing = input(colored(f"Set forced routing for {domain} (empty to abort): "))
|
|
if not new_routing.strip():
|
|
break
|
|
# I could've one-lined all these nested ifs but it'd look awful so shut up
|
|
if not validators.domain(new_routing):
|
|
# Not a domain
|
|
if not validators.ipv4(new_routing) or validators.ipv6(new_routing):
|
|
# Not a plain IPV4/IPV6
|
|
if ':' in new_routing:
|
|
# Is it an IP:port pair?
|
|
split = new_routing.split(":", maxsplit=1)
|
|
if not split[1].isnumeric() or not (validators.ipv4(split[0]) or validators.ipv6(split[0])):
|
|
# Nope, not a valid host
|
|
print(fg.red("The provided host is not valid (not a domain, IP, nor an IP:port pair)"))
|
|
await trio.sleep(2)
|
|
continue
|
|
break
|
|
if new_routing.strip():
|
|
if data["forceRoute"] and data["forceRouteHost"] == new_routing.strip():
|
|
print(colored("The virtual domain {domain} is already routed trough {new_routing}"))
|
|
await trio.sleep(2)
|
|
continue
|
|
data["forceRouteHost"] = new_routing
|
|
data["forceRoute"] = True
|
|
keep = True
|
|
break
|
|
case 3:
|
|
while True:
|
|
forward_address = input(colored("Choose a new forwarding address: "))
|
|
if validators.email(forward_address):
|
|
break
|
|
else:
|
|
print(colored("Not a valid email address"))
|
|
if data["forward"]:
|
|
print(colored(f"Note: {domain} currently forwards to {data['forwardAddress']}"))
|
|
if data["forward"] and data["forwardAddress"] == forward_address:
|
|
print(colored(f"{domain} already forwards to {data['forwardAddress']}"))
|
|
else:
|
|
data["forward"] = True
|
|
data["forwardAddress"] = True
|
|
keep = True
|
|
break
|
|
case _:
|
|
print(fg.yellow("Invalid choice (value out of range)"))
|
|
await trio.sleep(2)
|
|
if keep:
|
|
confirmation = input(colored("Confirm? [y/N] "))
|
|
if confirmation.strip().lower() in ("y", "yes"):
|
|
if isinstance(
|
|
resp := await handle_errors(
|
|
"patch",
|
|
user,
|
|
password,
|
|
f"{server}/admin/api/v1/domains/{domain}",
|
|
follow_redirects=True,
|
|
data=data
|
|
),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 400:
|
|
if err := decode_json(resp):
|
|
print(fg.red(f"An error occurred: {err['message']}"))
|
|
print(fg.red("Errors: "))
|
|
for error_kind in err["errors"]["children"].keys():
|
|
if not err["errors"]["children"][error_kind]:
|
|
continue
|
|
print(fg.red(f" - Error kind: {error_kind}"))
|
|
for error_message in err["errors"]["children"][error_kind]["errors"]:
|
|
print(fg.red(f" - {error_message}"))
|
|
if resp.status_code == 404:
|
|
print(
|
|
fg.red(
|
|
"The provided domain no longer exists: has it been deleted in the meantime?"
|
|
)
|
|
)
|
|
await trio.sleep(2)
|
|
elif resp.status_code == 204:
|
|
print(colored("Information Updated"))
|
|
else:
|
|
print(colored("Aborting"))
|
|
skip = True
|
|
await trio.sleep(2)
|
|
case 16:
|
|
os.system("cls||clear")
|
|
print(colored("Searching a mailbox by address"))
|
|
case _:
|
|
skip = True
|
|
print(fg.yellow("Invalid choice (value out of range)"))
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
if not skip:
|
|
pause(colored("Press any key to continue"))
|
|
# Awful, but it works so shut up
|
|
os.system("cls||clear")
|
|
except KeyboardInterrupt:
|
|
os.system("cls||clear")
|
|
# This way when you press Ctrl+C
|
|
# or Ctrl+D from an inner section
|
|
# you are brought back to the
|
|
# main menu
|
|
if not prompted:
|
|
return 0
|
|
except KeyError as k:
|
|
print(fg.red("The JSON response from the API was missing an expected field ({k!r}), has the version changed?"))
|
|
await trio.sleep(2)
|
|
os.system("cls||clear")
|
|
except EOFError:
|
|
os.system("cls||clear")
|
|
if not prompted:
|
|
return -1
|
|
|
|
|
|
# This tool technically doesn't *need* to be async, but shut up
|
|
async def main(args: argparse.Namespace) -> int:
|
|
"""
|
|
Main program entry point
|
|
"""
|
|
|
|
user, password = (None, None)
|
|
if not re.match(r"http(s)?://", args.server_url):
|
|
print(fg.red("Missing or invalid schema in server URL!"))
|
|
return -1
|
|
elif not validators.url(args.server_url, public=True):
|
|
# bool(ValidationFailure) is False :)
|
|
print(fg.red("Invalid server URL, exiting"))
|
|
return -1
|
|
if os.path.isfile("credentials.json"):
|
|
try:
|
|
with open("credentials.json") as credentials:
|
|
data = json.load(credentials)
|
|
except json.decoder.JSONDecodeError as json_error:
|
|
print(
|
|
fg.red(
|
|
f"A JSON error occurred while attempting to load credentials.json -> {type(json_error).__name__}:"
|
|
f" {json_error}"
|
|
)
|
|
)
|
|
except PermissionError:
|
|
print(
|
|
fg.red(
|
|
"Could not read credentials.json due to a permission issue. Make sure the file is readable by"
|
|
" the current user and try again"
|
|
)
|
|
)
|
|
except OSError as os_error:
|
|
print(
|
|
fg.red(
|
|
f"Could not read credentials.json due to a generic OS error -> {type(os_error).__name__}:"
|
|
f"{os_error}"
|
|
)
|
|
)
|
|
else:
|
|
try:
|
|
user, password = data["username"], data["password"]
|
|
except KeyError:
|
|
print(fg.yellow("The JSON payload in credentials.json is not valid"))
|
|
if not all([user, password]) and not all([args.username, args.password]):
|
|
print(
|
|
fg.yellow("Credentials could not be loaded and an explicit username/password pair wasn't provided, exiting")
|
|
)
|
|
return -1
|
|
elif all([args.username, args.password]):
|
|
user, password = args.username, args.password
|
|
if args.save:
|
|
print(fg.green(f"Saving credentials for {user} to credentials.json"))
|
|
try:
|
|
with open("credentials.json", "w") as save_file:
|
|
json.dump({"username": user, "password": password}, save_file)
|
|
except PermissionError:
|
|
print(
|
|
fg.red(
|
|
"Could not write credentials.json due to a permission issue. Make sure the file is readable by"
|
|
" the current user and try again"
|
|
)
|
|
)
|
|
except OSError as os_error:
|
|
print(
|
|
fg.red(
|
|
f"Could not write credentials.json due to a generic OS error -> {type(os_error).__name__}:"
|
|
f"{os_error}"
|
|
)
|
|
)
|
|
else:
|
|
print(fg.green(f"Loaded credentials for {user} from credentials.json"))
|
|
print(fg.yellow(f"Attempting login with {user} to {args.server_url}"))
|
|
if isinstance(
|
|
resp := await handle_errors("get", user, password, f"{args.server_url}/admin/api/doc", follow_redirects=True),
|
|
httpx.Response,
|
|
):
|
|
if resp.status_code == 200:
|
|
print(fg.green("Authentication successful!"))
|
|
return await loop(args.server_url, user, password)
|
|
else:
|
|
print(fg.red(f"An error occurred while attempting to log in -> {type(resp).__name__} -> {resp}"))
|
|
return -1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="A simple tool to interact with the REST API of poste.io instances")
|
|
parser.add_argument("--username", help="The username to authenticate with the API")
|
|
parser.add_argument("--password", help="The password to authenticate with the API")
|
|
parser.add_argument(
|
|
"--save",
|
|
help="Saves credentials in a credentials.json file for future use. Overwrites any existing credentials file",
|
|
action="store_true",
|
|
default=False,
|
|
)
|
|
parser.add_argument(
|
|
"--server-url", help="The URL of your poste.io instance. MUST include an http(s):// prefix", required=True
|
|
)
|
|
parser.add_argument(
|
|
"--nocolor",
|
|
help="Disables the unicorn puke and outputs plain old boring text",
|
|
action="store_true",
|
|
default=False,
|
|
)
|
|
arguments = parser.parse_args()
|
|
if arguments.nocolor:
|
|
# This hack is awful, but it works and is much better than using
|
|
# parameters or globals everywhere. All hail dynamic languages
|
|
fg = type(
|
|
"fg",
|
|
(object,),
|
|
{
|
|
"__getattr__": lambda self, _: type(
|
|
"i", (object,), {"__repr__": lambda _: "", "__call__": lambda _, x: x}
|
|
)(),
|
|
},
|
|
)()
|
|
try:
|
|
os.system("cls||clear")
|
|
print(colored("MailMaker v0.0.1, made with <3 and Python 3.8 by @nocturn9x"))
|
|
sys.exit(trio.run(main, parser.parse_args()))
|
|
except KeyboardInterrupt:
|
|
os.system("cls||clear")
|
|
sys.exit(0)
|