2022-02-06 23:05:02 +01:00
#!/usr/bin/env python3
"""
Copyright ( C ) 2021 nocturn9x
Licensed under the Apache License , Version 2.0 ( the " License " ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
https : / / www . apache . org / licenses / LICENSE - 2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an " AS IS " BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
"""
import re
import os
import sys
import trio
import json
import httpx
import random
2022-02-09 12:28:14 +01:00
import socket
import string
import getpass
2022-02-06 23:05:02 +01:00
import argparse
import validators
from typing import Union , Any
from console import fg
try :
from console . utils import pause
except ImportError :
# For some reason the lib does not export utils by default ¯\_(ツ)_/¯
print ( f " Please edit the console library ' s __init__.py file at line 58 and add the line ' from . import utils ' " )
sys . exit ( 0 )
async def send_authenticated_request (
method : str , user : str , password : str , * args , * * kwargs
) - > Union [ httpx . Response , httpx . RequestError ] :
"""
Sends an HTTP request using httpx and basic HTTP authentication
"""
try :
2022-02-09 12:28:14 +01:00
# print(f"calling httpx.{method}({f', '.join(map(repr, args))}, auth=({user!r}, {password!r}), {', '.join(f'{name}={repr(value)}' for name, value in kwargs.items())})")
async with httpx . AsyncClient ( ) as client :
return await getattr ( client , method ) ( * args , * * kwargs , auth = ( user , password ) )
except ( httpx . RequestError , socket . gaierror , OSError ) as e :
2022-02-06 23:05:02 +01:00
return e
async def handle_errors ( * args , * * kwargs ) - > Union [ httpx . Response , httpx . RequestError ] :
"""
Simple wrapper to avoid copy - pasting the same
error handling code 20 times
"""
if isinstance ( resp := await send_authenticated_request ( * args , * * kwargs ) , httpx . Response ) :
match resp . status_code :
2022-02-09 12:28:14 +01:00
case 200 | 201 | 204 | 404 | 400 :
2022-02-06 23:05:02 +01:00
return resp
case 401 :
print ( fg . red ( " Authentication failed: Unauthorized (wrong credentials) " ) )
case 403 :
print ( fg . red ( " Authentication failed: Forbidden " ) )
case resp . status_code if resp . status_code > = 500 :
print ( fg . red ( f " The server raised a { resp . status_code } error, maybe try again later? " ) )
case _ :
print ( fg . yellow ( f " The server returned unexpected status code { resp . status_code } , exiting " ) )
return resp
def decode_json ( response : httpx . Response ) - > Any :
"""
Handles potential exceptions from httpx . Response . json ( ) :
"""
try :
return response . json ( )
except json . decoder . JSONDecodeError as json_error :
print (
fg . red (
f " A JSON error occurred while attempting to decode some data -> { type ( json_error ) . __name__ } : "
f " { json_error } "
)
)
return " "
_last_color = None
def colored ( s : str ) - > str :
"""
Calls fg ( ) with a random color each time , but
making sure it doesn ' t use the same color across
two consecutive calls
"""
global _last_color # Ew...
while ( c := random . choice ( [ " blue " , " magenta " , " red " , " cyan " , " blue " , " purple " , " yellow " , " green " ] ) ) == _last_color :
pass
_last_color = c
return getattr ( fg , _last_color ) ( s )
async def loop ( server : str , user : str , password : str ) - > int :
"""
Program main loop
"""
prompt_pattern = f """ { fg . green } Welcome to MailMaker! Choose what to do: { fg . default }
{ fg . blue } [ { fg . green } 1 { fg . blue } ] { colored ( " List all existing mailboxes " ) }
{ fg . blue } [ { fg . green } 2 { fg . blue } ] { colored ( " Get info about an existing mailbox " ) }
{ fg . blue } [ { fg . green } 3 { fg . blue } ] { colored ( " Create a new mailbox " ) }
{ fg . blue } [ { fg . green } 4 { fg . blue } ] { colored ( " Delete an existing mailbox " ) }
{ fg . blue } [ { fg . green } 5 { fg . blue } ] { colored ( " Update an existing mailbox ' s information " ) }
{ fg . blue } [ { fg . green } 6 { fg . blue } ] { colored ( " Get in/out statistics for an existing mailbox " ) }
{ fg . blue } [ { fg . green } 7 { fg . blue } ] { colored ( " Get quota limits for an existing mailbox " ) }
{ fg . blue } [ { fg . green } 8 { fg . blue } ] { colored ( " Update quota limits for an existing mailbox " ) }
{ fg . blue } [ { fg . green } 9 { fg . blue } ] { colored ( " List all existing virtual domains " ) }
{ fg . blue } [ { fg . green } 10 { fg . blue } ] { colored ( " Get info about an existing virtual domain " ) }
{ fg . blue } [ { fg . green } 11 { fg . blue } ] { colored ( " Generate a DKIM key pair for an existing virtual domain " ) }
{ fg . blue } [ { fg . green } 12 { fg . blue } ] { colored ( " Get the DKIM key pair for an existing virtual domain " ) }
{ fg . blue } [ { fg . green } 13 { fg . blue } ] { colored ( " Delete the DKIM key pair for an existing virtual domain " ) }
{ fg . blue } [ { fg . green } 14 { fg . blue } ] { colored ( " Get in/out statistics for an existing virtual domain " ) }
2022-02-09 12:28:14 +01:00
{ fg . blue } [ { fg . green } 15 { fg . blue } ] { colored ( " Update an existing virtual domain ' s information " ) }
{ fg . blue } [ { fg . green } 16 { fg . blue } ] { colored ( " Search a mailbox by address " ) } """
2022-02-06 23:05:02 +01:00
skip = False
prompted = False
while True :
try :
# Some unicorn puke :D
prompt = prompt_pattern . format (
* (
(
getattr (
fg , random . choice ( [ " blue " , " magenta " , " red " , " cyan " , " blue " , " purple " , " yellow " , " green " ] )
)
)
for _ in range ( len ( prompt_pattern . splitlines ( ) ) - 1 )
)
)
print ( prompt )
skip = False
prompted = False
choice = input ( fg . purple ( " Make your choice: " ) )
prompted = True
if not choice . strip ( ) or not choice . isnumeric ( ) :
skip = True
2022-02-09 12:28:14 +01:00
print ( fg . yellow ( " Invalid choice (must be a number) " ) )
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
else :
# We clear each time because we don't want
# to clear the screen in the default case
match int ( choice ) :
case 1 :
os . system ( " cls||clear " )
print ( colored ( " Listing all mailboxes " ) )
if isinstance (
resp := await handle_errors (
" get " ,
user ,
password ,
f " { server } /admin/api/v1/boxes " ,
params = { " page " : 1 , " paging " : 999999 } ,
follow_redirects = True ,
) ,
httpx . Response ,
) :
if data := decode_json ( resp ) :
print ( fg . green ( f " Fetched { data [ ' results_count ' ] } addresses " ) )
for mailbox in data [ " results " ] :
print ( f " \t { fg . yellow } - { colored ( mailbox [ ' address ' ] ) } " )
else :
print (
fg . red (
f " An HTTP exception occurred while sending request -> { type ( resp ) . __name__ } : { resp } "
)
)
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
continue
case 2 :
os . system ( " cls||clear " )
address = input ( colored ( " Choose an inbox to get info for: " ) )
if isinstance (
resp := await handle_errors (
" get " , user , password , f " { server } /admin/api/v1/boxes/ { address } " , follow_redirects = True
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print ( fg . red ( " The provided mailbox does not exist " ) )
elif data := decode_json ( resp ) :
if not data [ " redirect_to " ] :
redirect_list = " None "
else :
redirect_list = " "
for redirect in data [ " redirect_to " ] :
redirect_list + = f " \n \t - { redirect } \n "
print (
f """ { colored ( " Mailbox information for " ) } { colored ( address ) } \n \n """
f """ { colored ( f " User: { data [ ' user ' ] } " ) } \n """
f """ { colored ( f " Name: { data [ ' name ' ] or ' None ' } " ) } \n """
f """ { colored ( f " System Admin: { ' Yes ' if data [ ' super_admin ' ] else ' No ' } " ) } \n """
f """ { colored ( f " Domain Admin: { ' Yes ' if data [ ' domain_admin ' ] else ' No ' } " ) } \n """
f """ { colored ( f " Home Dir: { data [ ' home ' ] } " ) } \n """
f """ { colored ( f " Strict ' From ' Header: { ' Not Enforced ' if data [ ' strict_from_disabled ' ] else ' Enforced ' } " ) } \n """
f """ { colored ( f " Creation Date: { data [ ' created ' ] } " ) } \n """
f """ { colored ( f " Last Update Date: { data [ ' updated ' ] } " ) } \n """
f """ { colored ( f " Redirect Only: { ' Yes ' if data [ ' redirect_only ' ] else ' No ' } " ) } \n """
f """ { colored ( f " Redirects to: { redirect_list } " ) } \n """
f """ { colored ( f " Discards Incoming Mail: { ' Yes ' if data [ ' discard ' ] else ' No ' } " ) } \n """
f """ { colored ( f " Disabled: { ' Yes ' if data [ ' disabled ' ] else ' No ' } " ) } \n """
)
else :
print (
fg . red (
f " An HTTP exception occurred while sending request -> { type ( resp ) . __name__ } : { resp } "
)
)
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
continue
case 3 :
os . system ( " cls||clear " )
print ( colored ( " Creating new mailbox " ) )
2022-02-09 12:28:14 +01:00
name = input ( colored ( " Choose a name for the mailbox: " ) )
while True :
email = input ( colored ( " Choose an email address for the mailbox: " ) )
if not validators . email ( email ) :
print ( fg . red ( " Invalid email address " ) )
continue
break
while True :
mail_password = getpass . getpass ( colored ( " Choose a password for the mailbox (empty for auto-generated password): " ) )
if not mail_password . strip ( ) :
mail_password = " " . join ( ( random . choice ( string . ascii_letters + string . digits + string . punctuation ) for _ in range ( random . randint ( 10 , 16 ) ) ) )
print ( colored ( f " Generated password is { mail_password } " ) )
break
confirm_password = getpass . getpass ( colored ( " Confirm the password: " ) )
if mail_password != confirm_password :
print ( fg . red ( " Passwords do not match " ) )
continue
break
while True :
destinations = input ( colored ( " Is this mailbox a redirect? \n If so, provide a comma-separated list of destinations here: " ) )
if destinations and not all ( ( validators . email ( mail ) for mail in destinations . strip ( ) . split ( " , " ) ) ) :
print ( fg . red ( " One or more destinations are not valid " ) )
continue
break
confirmation = input ( colored ( " System Admin? [y/N] " ) )
if confirmation . strip ( ) . lower ( ) in ( " y " , " yes " ) :
admin = True
else :
admin = False
if ( t := destinations . strip ( ) . split ( " , " ) ) == [ ' ' ] :
# Split has some weird behavior
destinations = [ ]
else :
destinations = t
data = { " name " : name ,
" email " : email ,
" passwordPlaintext " : mail_password ,
" disabled " : False ,
" referenceId " : " " , # TODO: Set this?
" superAdmin " : admin ,
" redirectTo " : destinations
}
confirmation = input ( colored ( " Confirm? [y/N] " ) )
if confirmation . strip ( ) . lower ( ) in ( " y " , " yes " ) :
if isinstance (
resp := await handle_errors (
" post " ,
user ,
password ,
f " { server } /admin/api/v1/boxes " ,
follow_redirects = True ,
data = data
) ,
httpx . Response ,
) :
if resp . status_code == 201 :
print ( colored ( f " { email } successfully created! " ) )
elif resp . status_code == 400 :
print ( fg . red ( f " An error occurred: { decode_json ( resp ) or resp . content . decode ( ) } " ) )
elif resp . status_code == 404 :
print ( fg . red ( " Cannot create mailbox: the chosen domain is not registered " ) )
else :
print (
fg . red (
f " An HTTP exception occurred while sending request -> { type ( resp ) . __name__ } : { resp } "
)
)
await trio . sleep ( 2 )
os . system ( " cls||clear " )
continue
else :
print ( colored ( " Aborting " ) )
skip = True
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
case 4 :
os . system ( " cls||clear " )
print ( colored ( " Deleting an existing mailbox " ) )
address = input ( colored ( " Choose an inbox to delete: " ) )
confirmation = input ( colored ( " Confirm? [y/N] " ) )
if confirmation . strip ( ) . lower ( ) in ( " y " , " yes " ) :
if isinstance (
resp := await handle_errors (
" delete " ,
user ,
password ,
f " { server } /admin/api/v1/boxes/ { address } " ,
follow_redirects = True ,
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print ( fg . red ( " The provided mailbox does not exist " ) )
elif resp . status_code == 204 :
print ( colored ( f " { address } successfully deleted! " ) )
else :
print (
fg . red (
f " An HTTP exception occurred while sending request -> { type ( resp ) . __name__ } : { resp } "
)
)
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
continue
else :
print ( colored ( " Aborting " ) )
skip = True
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
case 5 :
data = {
" passwordPlaintext " : " " ,
" name " : " " ,
" disabled " : False ,
" superAdmin " : False ,
}
os . system ( " cls||clear " )
print ( colored ( " Updating an existing mailbox " ) )
email = input ( colored ( " Select a mailbox to update: " ) )
if isinstance (
resp := await handle_errors (
" get " , user , password , f " { server } /admin/api/v1/boxes/ { email } " , follow_redirects = True
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print ( fg . red ( " The provided mailbox does not exist " ) )
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
continue
elif account_info := decode_json ( resp ) :
data [ " disabled " ] = account_info [ " disabled " ]
data [ " superAdmin " ] = account_info [ " super_admin " ]
data [ " name " ] = account_info [ " name " ]
else :
print (
fg . red (
f " An HTTP exception occurred while sending request -> { type ( resp ) . __name__ } : { resp } "
)
)
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
continue
print (
f """ { colored ( " Choose what to update: " ) } \n """
f """ { fg . blue } [ { fg . green } 1 { fg . blue } ] Update Name \n """
f """ { fg . blue } [ { fg . green } 2 { fg . blue } ] Update Password \n """
f """ { fg . blue } [ { fg . green } 3 { fg . blue } ] Promote to System Admin \n """
f """ { fg . blue } [ { fg . green } 4 { fg . blue } ] Demote from System Admin \n """
f """ { fg . blue } [ { fg . green } 5 { fg . blue } ] Disable account \n """
f """ { fg . blue } [ { fg . green } 6 { fg . blue } ] Enable account \n """
)
keep = False
while True :
what = input ( colored ( " What information do you want to update? " ) )
if not what or not what . isnumeric ( ) :
2022-02-09 12:28:14 +01:00
print ( fg . yellow ( " Invalid choice (must be a number) " ) )
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
else :
match int ( what ) :
case 1 :
new_name = input ( colored ( f " New Name for { email } (empty to abort): " ) )
if data [ " name " ] == new_name :
print ( colored ( f " The desired name is already set for { email } " ) )
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
continue
2022-02-06 23:05:02 +01:00
elif new_name . strip ( ) :
data [ " name " ] = new_name
keep = True
break
case 2 :
new_password = input ( colored ( f " New Password for { email } (empty to abort): " ) )
if new_password . strip ( ) :
data [ " passwordPlaintext " ] = new_password
keep = True
break
case 3 :
if data [ " superAdmin " ] :
print ( colored ( f " { email } is already a System Admin " ) )
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
continue
2022-02-06 23:05:02 +01:00
else :
print ( colored ( f " Promoting { email } to System Admin " ) )
data [ " superAdmin " ] = True
keep = True
break
case 4 :
if not data [ " superAdmin " ] :
print ( colored ( f " { email } is not a System Admin " ) )
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
continue
2022-02-06 23:05:02 +01:00
else :
print ( colored ( f " Demoting { email } from System Admin " ) )
data [ " superAdmin " ] = False
keep = True
break
case 5 :
if data [ " disabled " ] :
print ( colored ( f " { email } is already disabled " ) )
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
continue
2022-02-06 23:05:02 +01:00
else :
print ( colored ( f " Disabling mailbox { email } " ) )
keep = True
data [ " disabled " ] = True
break
case 6 :
if not data [ " disabled " ] :
print ( colored ( f " { email } is not disabled " ) )
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
continue
2022-02-06 23:05:02 +01:00
else :
print ( colored ( f " Enabling mailbox { email } " ) )
keep = True
data [ " disabled " ] = False
break
case _ :
2022-02-09 12:28:14 +01:00
print ( fg . yellow ( " Invalid choice (value out of range) " ) )
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
if keep :
confirmation = input ( colored ( " Confirm? [y/N] " ) )
if confirmation . strip ( ) . lower ( ) in ( " y " , " yes " ) :
if isinstance (
resp := await handle_errors (
" patch " ,
user ,
password ,
f " { server } /admin/api/v1/boxes/ { email } " ,
follow_redirects = True ,
data = data
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print (
fg . red (
" The provided mailbox no longer exists: has it been deleted in the meantime? "
)
)
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
elif resp . status_code == 204 :
print ( colored ( " Information Updated " ) )
else :
print ( colored ( " Aborting " ) )
skip = True
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
case 6 :
os . system ( " cls||clear " )
print ( colored ( " Getting in/out statistics for existing mailbox " ) )
email = input ( colored ( " Select a mailbox to fetch stats for: " ) )
if isinstance (
resp := await handle_errors (
" get " , user , password , f " { server } /admin/api/v1/boxes/ { email } /stats " , follow_redirects = True
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print ( fg . red ( " The provided mailbox does not exist " ) )
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
continue
elif stats := decode_json ( resp ) :
for day in stats :
print ( colored ( day . replace ( " - " , " / " ) ) )
print ( colored ( f " \t - In: { stats [ day ] [ ' in ' ] } \n \t - Out: { stats [ day ] [ ' out ' ] } " ) )
else :
print (
fg . red (
f " An HTTP exception occurred while sending request -> { type ( resp ) . __name__ } : { resp } "
)
)
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
continue
case 7 :
2022-02-09 12:28:14 +01:00
os . system ( " cls||clear " )
2022-02-06 23:05:02 +01:00
print ( colored ( " Getting quota limits for an existing mailbox " ) )
2022-02-09 12:28:14 +01:00
mail = input ( colored ( " Choose a mailbox to get quota info for: " ) )
if isinstance (
resp := await handle_errors (
" get " , user , password , f " { server } /admin/api/v1/boxes/ { mail } /quota " , follow_redirects = True
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print ( fg . red ( " The provided mailbox does not exist " ) )
elif data := decode_json ( resp ) :
print (
f """ { colored ( " Quota information for " ) } { colored ( mail ) } \n \n """
f """ { colored ( f " Storage Limit: { data [ ' storage_limit ' ] } " ) } \n """
f """ { colored ( f " Storage Usage: { data [ ' storage_usage ' ] } " ) } \n """
f """ { colored ( f " Count Limit: { data [ ' count_limit ' ] } " ) } \n """
f """ { colored ( f " Count Usage: { data [ ' count_usage ' ] } " ) } \n """
)
2022-02-06 23:05:02 +01:00
case 8 :
os . system ( " cls||clear " )
print ( colored ( " Updating quota limits for an existing mailbox " ) )
2022-02-09 12:28:14 +01:00
data = {
" storageLimit " : 0 ,
" countLimit " : 0 ,
}
email = input ( colored ( " Select a mailbox to update quotas for: " ) )
if isinstance (
resp := await handle_errors (
" get " , user , password , f " { server } /admin/api/v1/boxes/ { email } /quota " , follow_redirects = True
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print ( fg . red ( " The provided mailbox does not exist " ) )
await trio . sleep ( 2 )
os . system ( " cls||clear " )
continue
elif account_info := decode_json ( resp ) :
data [ " storageLimit " ] = account_info [ " storage_limit " ]
data [ " countLimit " ] = account_info [ " count_limit " ]
else :
print (
fg . red (
f " An HTTP exception occurred while sending request -> { type ( resp ) . __name__ } : { resp } "
)
)
await trio . sleep ( 2 )
os . system ( " cls||clear " )
continue
print (
f """ { colored ( " Choose what to update: " ) } \n """
f """ { fg . blue } [ { fg . green } 1 { fg . blue } ] Update Storage Limit \n """
f """ { fg . blue } [ { fg . green } 2 { fg . blue } ] Update Count Limit \n """
)
keep = False
while True :
what = input ( colored ( " What information do you want to update? " ) )
if not what or not what . isnumeric ( ) :
print ( fg . yellow ( " Invalid choice (must be a number) " ) )
await trio . sleep ( 2 )
else :
match int ( what ) :
case 1 :
new_storage = int ( input ( colored ( f " New storage limit for { email } (current is { data [ ' storageLimit ' ] } , empty to abort): " ) ) )
if data [ " storageLimit " ] == new_storage :
print ( colored ( f " The desired storage limit is already set to { data [ ' storageLimit ' ] } for { email } " ) )
await trio . sleep ( 2 )
continue
elif new_storage . strip ( ) :
if not new_storage . isnumeric ( ) :
print ( fg . yellow ( " Invalid choice (must be a number) " ) )
await trio . sleep ( 2 )
continue
data [ " storageLimit " ] = new_name
keep = True
break
case 2 :
new_count = int ( input ( colored ( f " New count limit for { email } (current is { data [ ' countLimit ' ] } , empty to abort): " ) ) )
if data [ " countLimit " ] == new_count :
print ( colored ( f " The desired count limit is already set to { data [ ' countLimit ' ] } for { email } " ) )
await trio . sleep ( 2 )
continue
else :
data [ " countLimit " ] = new_count
keep = True
break
case _ :
print ( fg . yellow ( " Invalid choice (value out of range) " ) )
await trio . sleep ( 2 )
if keep :
confirmation = input ( colored ( " Confirm? [y/N] " ) )
if confirmation . strip ( ) . lower ( ) in ( " y " , " yes " ) :
if isinstance (
resp := await handle_errors (
" patch " ,
user ,
password ,
f " { server } /admin/api/v1/boxes/ { email } /quota " ,
follow_redirects = True ,
data = data
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print (
fg . red (
" The provided mailbox no longer exists: has it been deleted in the meantime? "
)
)
await trio . sleep ( 2 )
elif resp . status_code == 204 :
print ( colored ( " Quota Updated " ) )
elif resp . status_code == 500 :
print ( fg . yellow ( " Are you trying to update limits for a System Administrator which is now allowed? " ) )
else :
print ( colored ( " Aborting " ) )
skip = True
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
case 9 :
os . system ( " cls||clear " )
print ( colored ( " Listing all existing virtual domains " ) )
if isinstance (
resp := await handle_errors (
" get " ,
user ,
password ,
f " { server } /admin/api/v1/domains " ,
params = { " page " : 1 , " paging " : 999999 } ,
follow_redirects = True ,
) ,
httpx . Response ,
) :
if data := decode_json ( resp ) :
print ( fg . green ( f " Fetched { data [ ' results_count ' ] } domains " ) )
for domain in data [ " results " ] :
print ( f " \t { fg . yellow } - { colored ( domain [ ' name ' ] ) } " )
else :
print (
fg . red (
f " An HTTP exception occurred while sending request -> { type ( resp ) . __name__ } : { resp } "
)
)
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
continue
case 10 :
os . system ( " cls||clear " )
print ( colored ( " Getting info about an existing virtual domain " ) )
domain = input ( colored ( " Choose a virtual domain to get info for: " ) )
if isinstance (
resp := await handle_errors (
" get " , user , password , f " { server } /admin/api/v1/domains/ { domain } " , follow_redirects = True
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print ( fg . red ( " The provided virtual domain does not exist " ) )
elif data := decode_json ( resp ) :
print (
f """ { colored ( " Virtual domain information for " ) } { colored ( domain ) } \n \n """
f """ { colored ( f " Home Dir: { data [ ' home ' ] } " ) } \n """
f """ { colored ( f " Forward: { data [ ' forward_domain ' ] if data [ ' forward ' ] else ' No ' } " ) } \n """
f """ { colored ( f " Creation Date: { data [ ' created ' ] } " ) } \n """
f """ { colored ( f " Last Update Date: { data [ ' updated ' ] } " ) } \n """
f """ { colored ( f " Domain bin (catch-all): { data [ ' domain_bin_address ' ] if data [ ' domain_bin ' ] else ' No ' } " ) } \n """
f """ { colored ( f " Force Route: { data [ ' force_route_host ' ] if data [ ' force_route ' ] else ' No ' } " ) } \n """
)
case 11 :
os . system ( " cls||clear " )
print ( colored ( " Generating a DKIM key pair for an existing virtual domain " ) )
case 12 :
os . system ( " cls||clear " )
print ( colored ( " Getting the DKIM key pair for an existing virtual domain " ) )
case 13 :
os . system ( " cls||clear " )
print ( colored ( " Deleting the DKIM key pair for an existing virtual domain " ) )
case 14 :
os . system ( " cls||clear " )
print ( colored ( " Getting in/out statistics for an existing virtual domain " ) )
domain = input ( colored ( " Select a virtual domain to fetch stats for: " ) )
if isinstance (
resp := await handle_errors (
" get " , user , password , f " { server } /admin/api/v1/domains/ { domain } /stats " ,
follow_redirects = True
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print ( fg . red ( " The provided virtual domain does not exist " ) )
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
continue
elif stats := decode_json ( resp ) :
for day in stats :
print ( colored ( day . replace ( " - " , " / " ) ) )
print ( colored ( f " \t - In: { stats [ day ] [ ' in ' ] } \n \t - Out: { stats [ day ] [ ' out ' ] } " ) )
else :
print (
fg . red (
f " An HTTP exception occurred while sending request -> { type ( resp ) . __name__ } : { resp } "
)
)
2022-02-09 12:28:14 +01:00
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
continue
case 15 :
os . system ( " cls||clear " )
2022-02-09 12:28:14 +01:00
print ( colored ( " Updating an existing virtual domain " ) )
data = { " forward " : False ,
" forwardDomain " : " " ,
" domainBin " : False ,
" domainBinAddress " : " " ,
" forceRoute " : False ,
" forceRouteHost " : " " ,
" referenceId " : " "
}
domain = input ( colored ( " Select a virtual domain to update: " ) )
if isinstance (
resp := await handle_errors (
" get " , user , password , f " { server } /admin/api/v1/domains/ { domain } " , follow_redirects = True
) ,
httpx . Response ,
) :
if resp . status_code == 404 :
print ( fg . red ( " The provided virtual domain does not exist " ) )
await trio . sleep ( 2 )
os . system ( " cls||clear " )
continue
elif domain_info := decode_json ( resp ) :
data [ " forward " ] = domain_info [ " forward " ]
data [ " domainBin " ] = domain_info [ " domain_bin " ]
data [ " forceRoute " ] = domain_info [ " force_route " ]
if domain_info [ " forward " ] :
data [ " forwardDomain " ] = domain_info [ " forward_domain " ]
if domain_info [ " force_route " ] :
data [ " forceRouteHost " ] = domain_info [ " force_route_host " ]
if domain_info [ " domain_bin " ] :
data [ " domainBinAddress " ] = domain_info [ " domain_bin_address " ]
else :
print (
fg . red (
f " An HTTP exception occurred while sending request -> { type ( resp ) . __name__ } : { resp } "
)
)
await trio . sleep ( 2 )
os . system ( " cls||clear " )
continue
print (
f """ { colored ( " Choose what to update: " ) } \n """
f """ { fg . blue } [ { fg . green } 1 { fg . blue } ] Set/Update domain bin (catch-all) \n """
f """ { fg . blue } [ { fg . green } 2 { fg . blue } ] Set/Update forced routing \n """
f """ { fg . blue } [ { fg . green } 3 { fg . blue } ] Set/Update forward domain \n """
f """ { fg . blue } [ { fg . green } 4 { fg . blue } ] Unset domain bin (catch-all) \n """
f """ { fg . blue } [ { fg . green } 5 { fg . blue } ] Unset forced routing \n """
f """ { fg . blue } [ { fg . green } 6 { fg . blue } ] Unset forward domain \n """
)
keep = False
while True :
what = input ( colored ( " What information do you want to update? " ) )
if not what or not what . isnumeric ( ) :
print ( fg . yellow ( " Invalid choice (must be a number) " ) )
await trio . sleep ( 2 )
else :
match int ( what ) :
case 1 :
if data [ " domainBin " ] :
print ( colored ( f " Note: current catch-all for { domain } is set to { data [ ' domainBinAddress ' ] } " ) )
new_bin = input ( colored ( f " New domain catch-all for { domain } (empty to abort): " ) )
if data [ " domainBinAddress " ] and data [ " domainBinAddress " ] == new_bin :
print ( colored ( f " The desired catch-all is already set to { new_bin } for { domain } " ) )
elif new_bin . strip ( ) :
data [ " domainBinAddress " ] = new_bin
data [ " domainBin " ] = True
keep = True
break
case 2 :
if data [ " forceRoute " ] :
print ( colored ( f " Note: { domain } is currently routed trough { data [ ' forceRouteHost ' ] } " ) )
while True :
new_routing = input ( colored ( f " Set forced routing for { domain } (empty to abort): " ) )
if not new_routing . strip ( ) :
break
# I could've one-lined all these nested ifs but it'd look awful so shut up
if not validators . domain ( new_routing ) :
# Not a domain
if not validators . ipv4 ( new_routing ) or validators . ipv6 ( new_routing ) :
# Not a plain IPV4/IPV6
if ' : ' in new_routing :
# Is it an IP:port pair?
split = new_routing . split ( " : " , maxsplit = 1 )
if not split [ 1 ] . isnumeric ( ) or not ( validators . ipv4 ( split [ 0 ] ) or validators . ipv6 ( split [ 0 ] ) ) :
# Nope, not a valid host
print ( fg . red ( " The provided host is not valid (not a domain, IP, nor an IP:port pair) " ) )
await trio . sleep ( 2 )
continue
break
if new_routing . strip ( ) :
if data [ " forceRoute " ] and data [ " forceRouteHost " ] == new_routing . strip ( ) :
print ( colored ( " The virtual domain {domain} is already routed trough {new_routing} " ) )
await trio . sleep ( 2 )
continue
data [ " forceRouteHost " ] = new_routing
data [ " forceRoute " ] = True
keep = True
break
case 3 :
while True :
forward_address = input ( colored ( " Choose a new forwarding address: " ) )
if validators . email ( forward_address ) :
break
else :
print ( colored ( " Not a valid email address " ) )
if data [ " forward " ] :
print ( colored ( f " Note: { domain } currently forwards to { data [ ' forwardAddress ' ] } " ) )
if data [ " forward " ] and data [ " forwardAddress " ] == forward_address :
print ( colored ( f " { domain } already forwards to { data [ ' forwardAddress ' ] } " ) )
else :
data [ " forward " ] = True
data [ " forwardAddress " ] = True
keep = True
break
case _ :
print ( fg . yellow ( " Invalid choice (value out of range) " ) )
await trio . sleep ( 2 )
if keep :
confirmation = input ( colored ( " Confirm? [y/N] " ) )
if confirmation . strip ( ) . lower ( ) in ( " y " , " yes " ) :
if isinstance (
resp := await handle_errors (
" patch " ,
user ,
password ,
f " { server } /admin/api/v1/domains/ { domain } " ,
follow_redirects = True ,
data = data
) ,
httpx . Response ,
) :
if resp . status_code == 400 :
if err := decode_json ( resp ) :
print ( fg . red ( f " An error occurred: { err [ ' message ' ] } " ) )
print ( fg . red ( " Errors: " ) )
for error_kind in err [ " errors " ] [ " children " ] . keys ( ) :
if not err [ " errors " ] [ " children " ] [ error_kind ] :
continue
print ( fg . red ( f " - Error kind: { error_kind } " ) )
for error_message in err [ " errors " ] [ " children " ] [ error_kind ] [ " errors " ] :
print ( fg . red ( f " - { error_message } " ) )
if resp . status_code == 404 :
print (
fg . red (
" The provided domain no longer exists: has it been deleted in the meantime? "
)
)
await trio . sleep ( 2 )
elif resp . status_code == 204 :
print ( colored ( " Information Updated " ) )
else :
print ( colored ( " Aborting " ) )
skip = True
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
case 16 :
os . system ( " cls||clear " )
print ( colored ( " Searching a mailbox by address " ) )
case _ :
skip = True
2022-02-09 12:28:14 +01:00
print ( fg . yellow ( " Invalid choice (value out of range) " ) )
await trio . sleep ( 2 )
2022-02-06 23:05:02 +01:00
os . system ( " cls||clear " )
if not skip :
2022-02-09 12:28:14 +01:00
pause ( colored ( " Press any key to continue " ) )
2022-02-06 23:05:02 +01:00
# Awful, but it works so shut up
os . system ( " cls||clear " )
except KeyboardInterrupt :
os . system ( " cls||clear " )
# This way when you press Ctrl+C
# or Ctrl+D from an inner section
# you are brought back to the
# main menu
if not prompted :
return 0
2022-02-09 12:28:14 +01:00
except KeyError as k :
print ( fg . red ( " The JSON response from the API was missing an expected field ( {k!r} ), has the version changed? " ) )
await trio . sleep ( 2 )
os . system ( " cls||clear " )
2022-02-06 23:05:02 +01:00
except EOFError :
os . system ( " cls||clear " )
if not prompted :
return - 1
# This tool technically doesn't *need* to be async, but shut up
async def main ( args : argparse . Namespace ) - > int :
"""
Main program entry point
"""
user , password = ( None , None )
if not re . match ( r " http(s)?:// " , args . server_url ) :
print ( fg . red ( " Missing or invalid schema in server URL! " ) )
return - 1
elif not validators . url ( args . server_url , public = True ) :
# bool(ValidationFailure) is False :)
print ( fg . red ( " Invalid server URL, exiting " ) )
return - 1
if os . path . isfile ( " credentials.json " ) :
try :
with open ( " credentials.json " ) as credentials :
data = json . load ( credentials )
except json . decoder . JSONDecodeError as json_error :
print (
fg . red (
f " A JSON error occurred while attempting to load credentials.json -> { type ( json_error ) . __name__ } : "
f " { json_error } "
)
)
except PermissionError :
print (
fg . red (
" Could not read credentials.json due to a permission issue. Make sure the file is readable by "
" the current user and try again "
)
)
except OSError as os_error :
print (
fg . red (
f " Could not read credentials.json due to a generic OS error -> { type ( os_error ) . __name__ } : "
f " { os_error } "
)
)
else :
try :
user , password = data [ " username " ] , data [ " password " ]
except KeyError :
print ( fg . yellow ( " The JSON payload in credentials.json is not valid " ) )
if not all ( [ user , password ] ) and not all ( [ args . username , args . password ] ) :
print (
fg . yellow ( " Credentials could not be loaded and an explicit username/password pair wasn ' t provided, exiting " )
)
return - 1
elif all ( [ args . username , args . password ] ) :
user , password = args . username , args . password
if args . save :
print ( fg . green ( f " Saving credentials for { user } to credentials.json " ) )
try :
with open ( " credentials.json " , " w " ) as save_file :
json . dump ( { " username " : user , " password " : password } , save_file )
except PermissionError :
print (
fg . red (
" Could not write credentials.json due to a permission issue. Make sure the file is readable by "
" the current user and try again "
)
)
except OSError as os_error :
print (
fg . red (
f " Could not write credentials.json due to a generic OS error -> { type ( os_error ) . __name__ } : "
f " { os_error } "
)
)
else :
print ( fg . green ( f " Loaded credentials for { user } from credentials.json " ) )
print ( fg . yellow ( f " Attempting login with { user } to { args . server_url } " ) )
if isinstance (
resp := await handle_errors ( " get " , user , password , f " { args . server_url } /admin/api/doc " , follow_redirects = True ) ,
httpx . Response ,
) :
if resp . status_code == 200 :
print ( fg . green ( " Authentication successful! " ) )
return await loop ( args . server_url , user , password )
else :
2022-02-09 12:28:14 +01:00
print ( fg . red ( f " An error occurred while attempting to log in -> { type ( resp ) . __name__ } -> { resp } " ) )
2022-02-06 23:05:02 +01:00
return - 1
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( description = " A simple tool to interact with the REST API of poste.io instances " )
parser . add_argument ( " --username " , help = " The username to authenticate with the API " )
parser . add_argument ( " --password " , help = " The password to authenticate with the API " )
parser . add_argument (
" --save " ,
help = " Saves credentials in a credentials.json file for future use. Overwrites any existing credentials file " ,
action = " store_true " ,
default = False ,
)
parser . add_argument (
" --server-url " , help = " The URL of your poste.io instance. MUST include an http(s):// prefix " , required = True
)
parser . add_argument (
" --nocolor " ,
help = " Disables the unicorn puke and outputs plain old boring text " ,
action = " store_true " ,
default = False ,
)
arguments = parser . parse_args ( )
if arguments . nocolor :
# This hack is awful, but it works and is much better than using
# parameters or globals everywhere. All hail dynamic languages
fg = type (
" fg " ,
( object , ) ,
{
2022-02-09 12:28:14 +01:00
" __getattr__ " : lambda self , _ : type (
" i " , ( object , ) , { " __repr__ " : lambda _ : " " , " __call__ " : lambda _ , x : x }
2022-02-06 23:05:02 +01:00
) ( ) ,
} ,
) ( )
try :
os . system ( " cls||clear " )
print ( colored ( " MailMaker v0.0.1, made with <3 and Python 3.8 by @nocturn9x " ) )
sys . exit ( trio . run ( main , parser . parse_args ( ) ) )
except KeyboardInterrupt :
os . system ( " cls||clear " )
sys . exit ( 0 )