mirror of https://github.com/nocturn9x/BotBase.git
266 lines
14 KiB
Python
266 lines
14 KiB
Python
from ..config import ADMINS, USER_INFO, INVALID_SYNTAX, ERROR, NONNUMERIC_ID, USERS_COUNT, \
|
|
NO_PARAMETERS, ID_MISSING, GLOBAL_MESSAGE_STATS, NAME, WHISPER_FROM, USER_INFO_UPDATED, USER_INFO_UNCHANGED, \
|
|
USER_BANNED, USER_UNBANNED, CANNOT_BAN_ADMIN, USER_ALREADY_BANNED, USER_NOT_BANNED, YOU_ARE_BANNED, YOU_ARE_UNBANNED, \
|
|
MARKED_BUSY, UNMARKED_BUSY, CACHE, YES, NO, NAME_MISSING, bot, WHISPER_SUCCESSFUL, LEAVE_CURRENT_CHAT, \
|
|
QUEUE_LIST, CHATS_LIST, ADMIN_BUSY
|
|
from pyrogram import Client, Filters
|
|
from ..database.query import get_user, get_users, update_name, ban_user, unban_user, get_user_by_name
|
|
from .antiflood import BANNED_USERS
|
|
import random
|
|
import logging
|
|
import itertools
|
|
import re
|
|
from ..methods import MethodWrapper
|
|
|
|
|
|
ADMINS_FILTER = Filters.user(list(ADMINS.keys()))
|
|
wrapper = MethodWrapper(bot)
|
|
|
|
@Client.on_message(Filters.command("getranduser") & ADMINS_FILTER & ~BANNED_USERS & ~Filters.edited)
|
|
def get_random_user(client, message):
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /getranduser")
|
|
if len(message.command) > 1:
|
|
wrapper.send_message(message.chat.id, f"{NO_PARAMETERS.format(command='/getranduser')}")
|
|
else:
|
|
user = random.choice(get_users())
|
|
rowid, uid, uname, date, banned = get_user(*user)
|
|
admin = uid in ADMINS
|
|
text = USER_INFO.format(uid=uid,
|
|
uname='@' + uname if uname else 'null',
|
|
date=date,
|
|
status=YES if banned else NO,
|
|
admin=NO if not admin else YES)
|
|
wrapper.send_message(message.chat.id, text)
|
|
|
|
|
|
@Client.on_message(Filters.command("count") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def count_users(client, message):
|
|
if len(message.command) > 1:
|
|
wrapper.send_message(message.chat.id, f"{NO_PARAMETERS.format(command='/getranduser')}")
|
|
else:
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /count")
|
|
count = len(get_users())
|
|
wrapper.send_message(message.chat.id, USERS_COUNT.format(count=count))
|
|
|
|
|
|
@Client.on_message(Filters.command("getuser") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def get_user_info(client, message):
|
|
if len(message.command) == 2:
|
|
if message.command[1].isdigit():
|
|
user = get_user(message.command[1])
|
|
if user:
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /getuser {message.command[1]}")
|
|
_, uid, uname, date, banned = user
|
|
admin = uid in ADMINS
|
|
text = USER_INFO.format(uid=uid,
|
|
uname='@' + uname if uname else 'null',
|
|
date=date,
|
|
status=YES if banned else NO,
|
|
admin=NO if not admin else YES)
|
|
wrapper.send_message(message.chat.id, text)
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX.format(correct='/getuser id')}")
|
|
|
|
|
|
@Client.on_message(Filters.command("userbyname") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def get_user_by_uname(client, message):
|
|
if len(message.command) == 2:
|
|
name = message.command[1].lstrip("@").lower()
|
|
user = get_user_by_name(name)
|
|
if user:
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /userbyname {message.command[1]}")
|
|
_, uid, uname, date, banned = user
|
|
admin = uid in ADMINS
|
|
text = USER_INFO.format(uid=uid,
|
|
uname='@' + uname if uname else 'null',
|
|
date=date,
|
|
status=YES if banned else NO,
|
|
admin=NO if not admin else YES)
|
|
wrapper.send_message(message.chat.id, text)
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {NAME_MISSING.format(uname=message.command[1])}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX.format(correct='/userbyname [@]username')}")
|
|
|
|
|
|
@Client.on_message(Filters.command("global") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def global_message(client, message):
|
|
if len(message.command) > 1:
|
|
msg = message.text.html[7:]
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent the following global message: {msg}")
|
|
missed = 0
|
|
count = 0
|
|
for uid in itertools.chain(*get_users()):
|
|
count += 1
|
|
result = wrapper.send_message( uid, msg)
|
|
if isinstance(result, Exception):
|
|
logging.error(f"Could not deliver the global message to {uid} because of {type(result).__name__}: {result}")
|
|
missed += 1
|
|
logging.warning(f"{count - missed}/{count} global messages were successfully delivered")
|
|
wrapper.send_message(message.chat.id, GLOBAL_MESSAGE_STATS.format(count=count, success=(count - missed), msg=msg))
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX.format(correct='/global message')}\n**HTML and Markdown styling supported**")
|
|
|
|
|
|
@Client.on_message(Filters.command("whisper") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def whisper(client, message):
|
|
if len(message.command) > 2:
|
|
msg = message.text.html[9:]
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent {message.text.html}")
|
|
if message.command[1].isdigit():
|
|
msg = msg[re.search(message.command[1], msg).end():]
|
|
uid = int(message.command[1])
|
|
if uid in itertools.chain(*get_users()):
|
|
result = wrapper.send_message(uid, WHISPER_FROM.format(admin=f"[{ADMINS[message.from_user.id]}]({NAME.format(message.from_user.id)})",
|
|
msg=msg)
|
|
)
|
|
if isinstance(result, Exception):
|
|
logging.error(
|
|
f"Could not whisper to {uid} because of {type(result).__name__}: {result}")
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {type(result).__name__} -> {result}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, WHISPER_SUCCESSFUL)
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=uid)}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX.format(correct='/whisper ID')}\n**HTML and Markdown styling supported**")
|
|
|
|
|
|
@Client.on_message(Filters.command("update") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def update(client, message):
|
|
if len(message.command) == 2:
|
|
if message.command[1].isdigit():
|
|
user = get_user(message.command[1])
|
|
if user:
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /update {message.command[1]}")
|
|
_, uid, uname, date, banned = user
|
|
new = wrapper.get_users(uid)
|
|
if isinstance(new, Exception):
|
|
logging.error(f"An error has occurred when calling get_users({uid}), {type(new).__name__}: {new}")
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {type(new).__name__} -> {new}")
|
|
else:
|
|
if new.username is None:
|
|
new.username = "null"
|
|
if new.username != uname:
|
|
update_name(uid, new.username)
|
|
wrapper.send_message(message.chat.id, USER_INFO_UPDATED)
|
|
else:
|
|
wrapper.send_message(message.chat.id, USER_INFO_UNCHANGED)
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX.format(correct='/update ID')}")
|
|
|
|
|
|
@Client.on_message(Filters.command("ban") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def ban(client, message):
|
|
if len(message.command) == 2:
|
|
if message.command[1].isdigit():
|
|
if int(message.command[1]) in ADMINS:
|
|
wrapper.send_message(message.chat.id, CANNOT_BAN_ADMIN)
|
|
else:
|
|
user = get_user(message.command[1])
|
|
if user:
|
|
if not user[-1]:
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /ban {message.command[1]}")
|
|
_, uid, uname, date, banned = user
|
|
res = ban_user(int(message.command[1]))
|
|
if isinstance(res, Exception):
|
|
logging.error(f"An error has occurred when calling ban_user({uid}), {type(res).__name__}: {res}")
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {type(res).__name__} -> {res}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, USER_BANNED)
|
|
wrapper.send_message( uid, YOU_ARE_BANNED)
|
|
BANNED_USERS.add(uid)
|
|
else:
|
|
wrapper.send_message(message.chat.id, USER_ALREADY_BANNED)
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX.format(correct='/ban ID')}")
|
|
|
|
|
|
@Client.on_message(Filters.command("unban") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def unban(client, message):
|
|
if len(message.command) == 2:
|
|
if message.command[1].isdigit():
|
|
if int(message.command[1]) in ADMINS:
|
|
wrapper.send_message(message.chat.id, CANNOT_BAN_ADMIN)
|
|
else:
|
|
user = get_user(message.command[1])
|
|
if user:
|
|
if user[-1]:
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /unban {message.command[1]}")
|
|
_, uid, uname, date, banned = user
|
|
res = unban_user(int(message.command[1]))
|
|
if isinstance(res, Exception):
|
|
logging.error(f"An error has occurred when calling unban_user({uid}), {type(res).__name__}: {res}")
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {type(res).__name__} -> {res}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, USER_UNBANNED)
|
|
if uid in BANNED_USERS:
|
|
BANNED_USERS.remove(uid)
|
|
wrapper.send_message( uid, YOU_ARE_UNBANNED)
|
|
else:
|
|
wrapper.send_message(message.chat.id, USER_NOT_BANNED)
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
|
|
else:
|
|
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX.format(correct='/unban ID')}")
|
|
|
|
|
|
@Client.on_message(Filters.command("busy") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def busy(client, message):
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /busy")
|
|
if len(message.command) > 1:
|
|
wrapper.send_message(message.chat.id, f"{NO_PARAMETERS.format(command='/busy')}")
|
|
else:
|
|
if CACHE[message.from_user.id][0] == "IN_CHAT" and CACHE[message.from_user.id][1] != 1234567:
|
|
wrapper.send_message(message.from_user.id, LEAVE_CURRENT_CHAT)
|
|
elif CACHE[message.from_user.id][0] == "none":
|
|
wrapper.send_message(message.chat.id, MARKED_BUSY)
|
|
CACHE[message.from_user.id] = ["IN_CHAT", 1234567]
|
|
else:
|
|
if message.from_user.id in CACHE:
|
|
del CACHE[message.from_user.id]
|
|
wrapper.send_message(message.chat.id, UNMARKED_BUSY)
|
|
|
|
|
|
@Client.on_message(Filters.command("chats") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def chats(client, message):
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /chats")
|
|
if len(message.command) > 1:
|
|
wrapper.send_message(message.chat.id, f"{NO_PARAMETERS.format(command='/chats')}")
|
|
else:
|
|
text = ""
|
|
for user in CACHE:
|
|
if CACHE[user][0] == "IN_CHAT" and user not in ADMINS:
|
|
admin_id = CACHE[user][1]
|
|
admin_name = ADMINS[admin_id]
|
|
text += f"- 👤 [User]({NAME.format(user)}) -> 👨💻 [{admin_name}]({NAME.format(admin_id)})\n"
|
|
wrapper.send_message(message.chat.id, CHATS_LIST.format(chats=text))
|
|
|
|
@Client.on_message(Filters.command("queue") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
|
|
def queue(client, message):
|
|
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /queue")
|
|
if len(message.command) > 1:
|
|
wrapper.send_message(message.chat.id, f"{NO_PARAMETERS.format(command='/queue')}")
|
|
else:
|
|
text = ""
|
|
for user in CACHE:
|
|
if CACHE[user][0] == "AWAITING_ADMIN":
|
|
text += f"- 👤 [User]({NAME.format(user)})\n"
|
|
wrapper.send_message(message.chat.id, QUEUE_LIST.format(queue=text))
|