Comolete refactoring

This commit is contained in:
nocturn9x 2020-06-15 12:15:50 +00:00
parent 66c56dda5a
commit ad63aadcc8
13 changed files with 299 additions and 536 deletions

View File

@ -1,5 +1,5 @@
import sqlite3.dbapi2 as sqlite3
from ..config import DB_GET_USERS, DB_GET_USER, DB_PATH, DB_SET_USER, DB_UPDATE_NAME, DB_BAN_USER, DB_UNBAN_USER
from ..config import DB_GET_USERS, DB_GET_USER, DB_PATH, DB_SET_USER, DB_UPDATE_NAME, DB_BAN_USER, DB_UNBAN_USER, DB_GET_USER_BY_NAME
import logging
import time
import os
@ -39,6 +39,22 @@ def get_user(tg_id: int):
return query_error
def get_user_by_name(uname: str):
try:
database = sqlite3.connect(DB_PATH)
except sqlite3.Error as connection_error:
logging.error(f"An error has occurred while connecting to database: {connection_error}")
else:
try:
with database:
cursor = database.cursor()
query = cursor.execute(DB_GET_USER_BY_NAME, (uname,))
return query.fetchone()
except sqlite3.Error as query_error:
logging.error(f"An error has occurred while executing DB_GET_USER_BY_NAME query: {query_error}")
return query_error
def update_name(tg_id: int, name: str):
try:
database = sqlite3.connect(DB_PATH)

View File

@ -1 +1,2 @@
__version__ = (1, 0, 0)
__version__ = (1, 0, 0)
from ._wrapper import MethodWrapper

View File

@ -0,0 +1,37 @@
from pyrogram import Client, CallbackQuery, InlineQuery
from pyrogram.errors import RPCError
import logging
from typing import Union
class MethodWrapper(object):
"""A class that that implements a wrapper around ``pyrogram.Client`` methods.
To access a pyrogram method just call ``MethodWrapper.method_name``.
All method calls are performed in a try/except block and either return
the exception object if an error occurs, or the result of the called
method otherwise. All errors are automatically logged to stderr.
:param instance: The ``pyrogram.Client`` or ``pyrogram.CallbackQuery`` or ``pyrogram.InlineQuery`` instance (not class!)
:type instance: Union[Client, CallbackQuery, InlineQuery]
"""
def __init__(self, instance: Union[Client, CallbackQuery, InlineQuery]):
"""Object constructor"""
self.instance = instance
def __getattr__(self, attribute: str):
if attribute in self.__dict__:
return self.__dict__[attribute]
else:
def wrapper(*args, **kwargs):
if hasattr(self.instance, attribute):
try:
return getattr(self.instance, attribute)(*args, **kwargs)
except RPCError as err:
logging.error(f"An exception occurred -> {type(err).__name__}: {err}")
return err
else:
raise AttributeError(self.instance, attribute)
return wrapper

View File

@ -1,84 +0,0 @@
from pyrogram.errors import RPCError, FloodWait
import time
import logging
from pyrogram import Client, CallbackQuery
from typing import Union
def edit_message_text(update: Union[CallbackQuery, Client], sleep: bool = True, *args, **kwargs):
"""Edits a message in a way that never triggers exceptions and logs errors
:param update: The pyrogram.Client instance or pyrogram.CallbackQuery
object to call the method for
:type update: Union[Client, CallbackQuery]
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
:returns: Whatever the called pyrogram method returns, or an exception if
the method call caused an error
"""
try:
return update.edit_message_text(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error
def edit_message_caption(update: Union[CallbackQuery, Client], sleep: bool = True, *args, **kwargs):
"""Edits a message caption in a way that never triggers exceptions and logs errors
:param update: The pyrogram.Client instance or pyrogram.CallbackQuery
object to call the method for
:type update: Union[Client, CallbackQuery]
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
:returns: Whatever the called pyrogram method returns, or an exception if
the method call caused an error
"""
try:
return update.edit_message_caption(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error
def edit_message_media(update: Union[CallbackQuery, Client], sleep: bool = True, *args, **kwargs):
"""Edits a message media in a way that never triggers exceptions and logs errors
:param update: The pyrogram.Client instance or pyrogram.CallbackQuery
object to call the method for
:type update: Union[Client, CallbackQuery]
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
:returns: Whatever the called pyrogram method returns, or an exception if
the method call caused an error
"""
try:
return update.edit_message_media(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error

View File

@ -1,117 +0,0 @@
from pyrogram.errors import RPCError, FloodWait
from pyrogram import Client
import time
import logging
def send_message(client: Client, sleep: bool = True, *args, **kwargs):
"""Sends a message in a way that never triggers exceptions and logs errors
:param client: The pyrogram.Client instance to call the method for
:type client: class: Client
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
"""
try:
return client.send_message(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error
def send_photo(client: Client, sleep: bool = True, *args, **kwargs):
"""Sends a photo in a way that never triggers exceptions and logs errors
:param client: The pyrogram.Client instance to call the method for
:type client: class: Client
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
"""
try:
return client.send_photo(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error
def send_audio(client: Client, sleep: bool = True, *args, **kwargs):
"""Sends an audio in a way that never triggers exceptions and logs errors
:param client: The pyrogram.Client instance to call the method for
:type client: class: Client
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
"""
try:
return client.send_audio(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error
def send_sticker(client: Client, sleep: bool = True, *args, **kwargs):
"""Sends a sticker in a way that never triggers exceptions and logs errors
:param client: The pyrogram.Client instance to call the method for
:type client: class: Client
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
"""
try:
return client.send_sticker(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error
def send_animation(client: Client, sleep: bool = True, *args, **kwargs):
"""Sends an animation in a way that never triggers exceptions and logs errors
:param client: The pyrogram.Client instance to call the method for
:type client: class: Client
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
"""
try:
return client.send_animation(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error

View File

@ -1,79 +0,0 @@
from pyrogram.errors import RPCError, FloodWait
import time
from pyrogram import CallbackQuery
import logging
def answer(query: CallbackQuery, sleep: bool = True, *args, **kwargs):
"""Answers a query in a way that never triggers exceptions and logs errors
:param query: The pyrogram.CallbackQuery object to call the method for
:type query: class: CallbackQuery
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
:returns: Whatever the called pyrogram method returns, or an exception if
the method call caused an error
"""
try:
return query.answer(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error
def delete_messages(client, sleep: bool = True, *args, **kwargs):
"""Deletes messages in a way that never triggers exceptions and logs errors
:param client: The pyrogram.Client instance to call the method for
:type client: class: Client
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
:returns: Whatever the called pyrogram method returns, or an exception if
the method call caused an error
"""
try:
return client.delete_messages(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error
def get_users(client, sleep: bool = True, *args, **kwargs):
"""Calls get_users in a way that never triggers exceptions and logs errors
:param client: The pyrogram.Client instance to call the method for
:type client: class: Client
:param sleep: If True, the default, the function will call time.sleep()
in case of a FloodWait exception and return the exception object
after the sleep is done, otherwise the ``FloodWait`` exception is returned
immediately
:returns: Whatever the called pyrogram method returns, or an exception if
the method call caused an error
"""
try:
return client.get_users(*args, **kwargs)
except FloodWait as fw:
logging.warning(f"FloodWait! A wait of {fw.x} seconds is required")
if sleep:
time.sleep(fw.x)
return fw
except RPCError as generic_error:
logging.error(f"An exception occurred: {generic_error}")
return generic_error

View File

@ -1,26 +1,43 @@
from ..config import ADMINS, USER_INFO, INVALID_SYNTAX, ERROR, NONNUMERIC_ID, USERS_COUNT, \
NO_PARAMETERS, ID_MISSING, GLOBAL_MESSAGE_STATS, NAME, WHISPER_FROM, USER_INFO_UPDATED, USER_INFO_UNCHANGED, \
USER_BANNED, USER_UNBANNED, CANNOT_BAN_ADMIN, USER_ALREADY_BANNED, USER_NOT_BANNED, YOU_ARE_BANNED, YOU_ARE_UNBANNED, \
MARKED_BUSY, UNMARKED_BUSY, CACHE
MARKED_BUSY, UNMARKED_BUSY, CACHE, bot, YES, NO, NAME_MISSING
from pyrogram import Client, Filters
from ..database.query import get_user, get_users, update_name, ban_user, unban_user
from ..database.query import get_user, get_users, update_name, ban_user, unban_user, get_user_by_name
from .antiflood import BANNED_USERS
import random
from ..methods.safe_send import send_message
from ..methods.various import get_users as get_telegram_users
import logging
import itertools
import re
from ..methods import MethodWrapper
ADMINS_FILTER = Filters.user(list(ADMINS.keys()))
wrapper = MethodWrapper(bot)
@Client.on_message(Filters.command("getranduser") & ADMINS_FILTER & ~BANNED_USERS & ~Filters.edited)
def get_random_user(client, message):
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /getranduser")
if len(message.command) > 1:
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX}: {NO_PARAMETERS.format(command='/getranduser')}")
else:
user = random.choice(get_users())
rowid, uid, uname, date, banned = get_user(*user)
admin = uid in ADMINS
text = USER_INFO.format(uid=uid,
uname='@' + uname if uname != 'null' else uname,
date=date,
status=YES if banned else NO,
admin=NO if not admin else YES)
wrapper.send_message(message.chat.id, text)
@Client.on_message(Filters.command("count") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
def count_users(client, message):
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /count")
count = len(get_users())
send_message(client, True, message.chat.id, USERS_COUNT.format(count=count))
wrapper.send_message(message.chat.id, USERS_COUNT.format(count=count))
@Client.on_message(Filters.command("getuser") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
@ -35,32 +52,35 @@ def get_user_info(client, message):
text = USER_INFO.format(uid=uid,
uname='@' + uname if uname != 'null' else uname,
date=date,
status='' if banned else '',
admin='' if not admin else '')
send_message(client, True, message.chat.id, text)
status=YES if banned else NO,
admin=NO if not admin else YES)
wrapper.send_message(message.chat.id, text)
else:
send_message(client, True, message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
wrapper.send_message(message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
else:
send_message(client, True, message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
wrapper.send_message(message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
else:
send_message(client, True, message.chat.id, f"{INVALID_SYNTAX}: Use <code>/getuser user_id</code>")
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX}: Use <code>/getuser user_id</code>")
@Client.on_message(Filters.command("getranduser") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
def get_random_user(client, message):
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /getranduser")
if len(message.command) > 1:
send_message(client, True, message.chat.id, f"{INVALID_SYNTAX}: {NO_PARAMETERS.format(command='/getranduser')}")
@Client.on_message(Filters.command("userbyname") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
def get_user_by_uname(client, message):
if len(message.command) == 2:
user = get_user_by_name(message.command[1])
if user:
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /userbyname {message.command[1]}")
_, uid, uname, date, banned = user
admin = uid in ADMINS
text = USER_INFO.format(uid=uid,
uname='@' + uname if uname != 'null' else uname,
date=date,
status=YES if banned else NO,
admin=NO if not admin else YES)
wrapper.send_message(message.chat.id, text)
else:
wrapper.send_message(message.chat.id, f"{ERROR}: {NAME_MISSING.format(uname=message.command[1])}")
else:
user = random.choice(get_users())
rowid, uid, uname, date, banned = get_user(*user)
admin = uid in ADMINS
text = USER_INFO.format(uid=uid,
uname='@' + uname if uname != 'null' else uname,
date=date,
status='' if banned else '',
admin='' if not admin else '')
send_message(client, True, message.chat.id, text)
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX}: Use <code>/getuser user_name</code>")
@Client.on_message(Filters.command("global") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
@ -72,14 +92,14 @@ def global_message(client, message):
count = 0
for uid in itertools.chain(*get_users()):
count += 1
result = send_message(client, True, uid, msg)
result = wrapper.send_message( uid, msg)
if isinstance(result, Exception):
logging.error(f"Could not deliver the global message to {uid} because of {type(result).__name__}: {result}")
missed += 1
logging.warning(f"{count - missed}/{count} global messages were successfully delivered")
send_message(client, True, message.chat.id, GLOBAL_MESSAGE_STATS.format(count=count, success=(count - missed), msg=msg))
wrapper.send_message(message.chat.id, GLOBAL_MESSAGE_STATS.format(count=count, success=(count - missed), msg=msg))
else:
send_message(client, True, message.chat.id, f"{INVALID_SYNTAX}: Use <code>/global message</code>"
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX}: Use <code>/global message</code>"
f"\n🍮 Note that the <code>/global</code> command supports markdown and html styling")
@ -92,19 +112,19 @@ def whisper(client, message):
msg = msg[re.search(message.command[1], msg).end():]
uid = int(message.command[1])
if uid in itertools.chain(*get_users()):
result = send_message(client, True, uid, WHISPER_FROM.format(admin=f"[{ADMINS[message.from_user.id]}]({NAME.format(message.from_user.id)})",
msg=msg)
)
result = wrapper.send_message(uid, WHISPER_FROM.format(admin=f"[{ADMINS[message.from_user.id]}]({NAME.format(message.from_user.id)})",
msg=msg)
)
if isinstance(result, Exception):
logging.error(
f"Could not whisper to {uid} because of {type(result).__name__}: {result}")
send_message(client, True, message.chat.id, f"{ERROR}: {type(result).__name__} -> {result}")
wrapper.send_message(message.chat.id, f"{ERROR}: {type(result).__name__} -> {result}")
else:
send_message(client, True, message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=uid)}")
wrapper.send_message(message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=uid)}")
else:
send_message(client, True, message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
wrapper.send_message(message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
else:
send_message(client, True, message.chat.id, f"{INVALID_SYNTAX}: Use <code>/whisper ID message</code>"
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX}: Use <code>/whisper ID message</code>"
f"\n🍮 Note that the <code>/whisper</code> command supports markdown and html styling")
@ -116,24 +136,24 @@ def update(client, message):
if user:
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /update {message.command[1]}")
_, uid, uname, date, banned = user
new = get_telegram_users(client, True, uid)
new = wrapper.get_users(uid)
if isinstance(new, Exception):
logging.error(f"An error has occurred when calling get_users({uid}), {type(new).__name__}: {new}")
send_message(client, True, message.chat.id, f"{ERROR}: {type(new).__name__} -> {new}")
wrapper.send_message(message.chat.id, f"{ERROR}: {type(new).__name__} -> {new}")
else:
if new.username is None:
new.username = "null"
if new.username != uname:
update_name(uid, new.username)
send_message(client, True, message.chat.id, USER_INFO_UPDATED)
wrapper.send_message(message.chat.id, USER_INFO_UPDATED)
else:
send_message(client, True, message.chat.id, USER_INFO_UNCHANGED)
wrapper.send_message(message.chat.id, USER_INFO_UNCHANGED)
else:
send_message(client, True, message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
wrapper.send_message(message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
else:
send_message(client, True, message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
wrapper.send_message(message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
else:
send_message(client, True, message.chat.id, f"{INVALID_SYNTAX}: Use <code>/update user_id</code>")
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX}: Use <code>/update user_id</code>")
@Client.on_message(Filters.command("ban") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
@ -141,7 +161,7 @@ def ban(client, message):
if len(message.command) == 2:
if message.command[1].isdigit():
if int(message.command[1]) in ADMINS:
send_message(client, True, message.chat.id, CANNOT_BAN_ADMIN)
wrapper.send_message(message.chat.id, CANNOT_BAN_ADMIN)
else:
user = get_user(message.command[1])
if user:
@ -151,19 +171,19 @@ def ban(client, message):
res = ban_user(int(message.command[1]))
if isinstance(res, Exception):
logging.error(f"An error has occurred when calling ban_user({uid}), {type(res).__name__}: {res}")
send_message(client, True, message.chat.id, f"{ERROR}: {type(res).__name__} -> {res}")
wrapper.send_message(message.chat.id, f"{ERROR}: {type(res).__name__} -> {res}")
else:
send_message(client, True, message.chat.id, USER_BANNED)
send_message(client, True, uid, YOU_ARE_BANNED)
wrapper.send_message(message.chat.id, USER_BANNED)
wrapper.send_message( uid, YOU_ARE_BANNED)
BANNED_USERS.add(uid)
else:
send_message(client, True, message.chat.id, USER_ALREADY_BANNED)
wrapper.send_message(message.chat.id, USER_ALREADY_BANNED)
else:
send_message(client, True, message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
wrapper.send_message(message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
else:
send_message(client, True, message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
wrapper.send_message(message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
else:
send_message(client, True, message.chat.id, f"{INVALID_SYNTAX}: Use <code>/ban user_id</code>")
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX}: Use <code>/ban user_id</code>")
@Client.on_message(Filters.command("unban") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
@ -171,7 +191,7 @@ def unban(client, message):
if len(message.command) == 2:
if message.command[1].isdigit():
if int(message.command[1]) in ADMINS:
send_message(client, True, message.chat.id, CANNOT_BAN_ADMIN)
wrapper.send_message(message.chat.id, CANNOT_BAN_ADMIN)
else:
user = get_user(message.command[1])
if user:
@ -181,32 +201,32 @@ def unban(client, message):
res = unban_user(int(message.command[1]))
if isinstance(res, Exception):
logging.error(f"An error has occurred when calling unban_user({uid}), {type(res).__name__}: {res}")
send_message(client, True, message.chat.id, f"{ERROR}: {type(res).__name__} -> {res}")
wrapper.send_message(message.chat.id, f"{ERROR}: {type(res).__name__} -> {res}")
else:
send_message(client, True, message.chat.id, USER_UNBANNED)
wrapper.send_message(message.chat.id, USER_UNBANNED)
if uid in BANNED_USERS:
BANNED_USERS.remove(uid)
send_message(client, True, uid, YOU_ARE_UNBANNED)
wrapper.send_message( uid, YOU_ARE_UNBANNED)
else:
send_message(client, True, message.chat.id, USER_NOT_BANNED)
wrapper.send_message(message.chat.id, USER_NOT_BANNED)
else:
send_message(client, True, message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
wrapper.send_message(message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
else:
send_message(client, True, message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
wrapper.send_message(message.chat.id, f"{ERROR}: {NONNUMERIC_ID}")
else:
send_message(client, True, message.chat.id, f"{INVALID_SYNTAX}: Use <code>/unban user_id</code>")
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX}: Use <code>/unban user_id</code>")
@Client.on_message(Filters.command("/busy") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
def get_random_user(client, message):
@Client.on_message(Filters.command("busy") & ADMINS_FILTER & Filters.private & ~BANNED_USERS & ~Filters.edited)
def busy(client, message):
logging.warning(f"{ADMINS[message.from_user.id]} [{message.from_user.id}] sent /busy")
if len(message.command) > 1:
send_message(client, True, message.chat.id, f"{INVALID_SYNTAX}: {NO_PARAMETERS.format(command='/busy')}")
wrapper.send_message(message.chat.id, f"{INVALID_SYNTAX}: {NO_PARAMETERS.format(command='/busy')}")
else:
if CACHE[message.from_user.id][0] == "none":
send_message(client, True, message.chat.id, MARKED_BUSY)
wrapper.send_message(message.chat.id, MARKED_BUSY)
CACHE[message.from_user.id] = ["IN_CHAT", 1234567]
else:
if message.from_user.id in CACHE:
del CACHE[message.from_user.id]
send_message(client, True, message.chat.id, UNMARKED_BUSY)
wrapper.send_message(message.chat.id, UNMARKED_BUSY)

View File

@ -1,11 +1,11 @@
from pyrogram import Client, Filters, Message, CallbackQueryHandler
from ..config import MAX_UPDATE_THRESHOLD, ANTIFLOOD_SENSIBILITY, BAN_TIME, ADMINS, BYPASS_FLOOD, FLOOD_NOTICE, \
COUNT_CALLBACKS_SEPARATELY, FLOOD_PERCENTAGE, CACHE, PRIVATE_ONLY, DELETE_MESSAGES
COUNT_CALLBACKS_SEPARATELY, FLOOD_PERCENTAGE, CACHE, PRIVATE_ONLY, DELETE_MESSAGES, bot
from collections import defaultdict
import logging
import time
from ..methods.safe_send import send_message
from ..methods.various import delete_messages
from ..methods import MethodWrapper
from ..config import check_user_banned
# Some variables for runtime configuration
@ -14,6 +14,7 @@ BANNED_USERS = Filters.user() # Filters where the antiflood will put banned use
BYPASS_USERS = Filters.user(list(ADMINS.keys())) if BYPASS_FLOOD else Filters.user()
QUERIES = defaultdict(list) if COUNT_CALLBACKS_SEPARATELY else MESSAGES
FILTER = Filters.private if PRIVATE_ONLY else ~Filters.user()
wrapper = MethodWrapper(bot)
def is_flood(updates: list):
@ -23,7 +24,7 @@ def is_flood(updates: list):
genexpr = [i <= ANTIFLOOD_SENSIBILITY for i in
((updates[i + 1] - timestamp) if i < (MAX_UPDATE_THRESHOLD - 1) else (timestamp - updates[i - 1]) for
i, timestamp in enumerate(updates))]
return genexpr.count(True) >= int((len(genexpr) / 100) * FLOOD_PERCENTAGE)
return sum(genexpr) >= int((len(genexpr) / 100) * FLOOD_PERCENTAGE)
@Client.on_message(FILTER & ~BYPASS_USERS, group=-1)
@ -63,9 +64,9 @@ def anti_flood(client, update):
BANNED_USERS.add(user_id)
VAR[user_id] = chat, time.time()
if FLOOD_NOTICE:
send_message(client, True, user_id, FLOOD_NOTICE)
wrapper.send_message(user_id, FLOOD_NOTICE)
if DELETE_MESSAGES and any(updates):
delete_messages(client, True, chat, filter(bool, updates))
wrapper.delete_messages(chat, filter(bool, updates))
else:
if user_id in VAR:
del VAR[user_id]

View File

@ -1,60 +1,64 @@
from pyrogram import Client, Filters, InlineKeyboardMarkup, InlineKeyboardButton
from ..methods.safe_send import send_message
from ..methods.safe_edit import edit_message_text
from ..methods.various import answer, delete_messages
from ..config import CACHE, ADMINS, ADMINS_LIST_UPDATE_DELAY, callback_regex, admin_is_chatting, \
user_is_chatting, LIVE_CHAT_STATUSES, STATUS_BUSY, STATUS_FREE, SUPPORT_REQUEST_SENT, SUPPORT_NOTIFICATION, \
ADMIN_JOINS_CHAT, USER_CLOSES_CHAT, JOIN_CHAT_BUTTON, USER_INFO, USER_LEAVES_CHAT, ADMIN_MESSAGE, USER_MESSAGE, \
TOO_FAST, CHAT_BUSY, LEAVE_CURRENT_CHAT, USER_JOINS_CHAT, NAME
TOO_FAST, CHAT_BUSY, LEAVE_CURRENT_CHAT, USER_JOINS_CHAT, NAME, bot, CANNOT_REQUEST_SUPPORT, YES, NO, user_banned, CLOSE_CHAT_BUTTON, BACK_BUTTON, UPDATE_BUTTON
import time
from ..database.query import get_user
from .antiflood import BANNED_USERS
from .start import back_start
import logging
from ..methods import MethodWrapper
ADMINS_FILTER = Filters.user(list(ADMINS.keys()))
BUTTONS = InlineKeyboardMarkup(
[
[InlineKeyboardButton("🔙 Back", "back_start")],
[InlineKeyboardButton("🔄 Update", "update_admins_list")]
[InlineKeyboardButton(BACK_BUTTON, "back_start")],
[InlineKeyboardButton(UPDATE_BUTTON, "update_admins_list")]
])
wrapper = MethodWrapper(bot)
@Client.on_callback_query(Filters.callback_data("sos") & ~BANNED_USERS)
@Client.on_callback_query(Filters.callback_data("sos") & ~BANNED_USERS & ~user_banned())
def begin_chat(client, query):
CACHE[query.from_user.id] = ["AWAITING_ADMIN", time.time()]
queue = LIVE_CHAT_STATUSES
for admin_id, admin_name in ADMINS.items():
status = CACHE[admin_id][0]
if status != "IN_CHAT":
queue += f"- {STATUS_FREE}"
else:
queue += f"- {STATUS_BUSY}"
queue += f"[{admin_name}]({NAME.format(admin_id)})\n"
msg = edit_message_text(query, True, SUPPORT_REQUEST_SENT.format(queue=queue, date=time.strftime('%d/%m/%Y %T')),
reply_markup=BUTTONS)
join_chat_button = InlineKeyboardMarkup([[InlineKeyboardButton(JOIN_CHAT_BUTTON, f"join_{query.from_user.id}")]])
user = get_user(query.from_user.id)
_, uid, uname, date, banned = user
admin = uid in ADMINS
text = USER_INFO.format(uid=uid, uname='@' + uname if uname != 'null' else uname, date=date,
status='' if banned else '',
admin='' if not admin else '')
CACHE[query.from_user.id].append([])
for admin in ADMINS:
status = CACHE[admin][0]
if status != "IN_CHAT":
message = send_message(client, True, admin, SUPPORT_NOTIFICATION.format(uinfo=text), reply_markup=join_chat_button)
CACHE[query.from_user.id][-1].append((message.chat.id, message.message_id))
CACHE[query.from_user.id][-1].append((msg.chat.id, msg.message_id))
cb_wrapper = MethodWrapper(query)
if query.from_user.id in ADMINS:
cb_wrapper.answer(CANNOT_REQUEST_SUPPORT)
else:
CACHE[query.from_user.id] = ["AWAITING_ADMIN", time.time()]
queue = LIVE_CHAT_STATUSES
for admin_id, admin_name in ADMINS.items():
status = CACHE[admin_id][0]
if status != "IN_CHAT":
queue += f"- {STATUS_FREE}"
else:
queue += f"- {STATUS_BUSY}"
queue += f"[{admin_name}]({NAME.format(admin_id)})\n"
msg = cb_wrapper.edit_message_text(SUPPORT_REQUEST_SENT.format(queue=queue, date=time.strftime('%d/%m/%Y %T')),
reply_markup=BUTTONS)
join_chat_button = InlineKeyboardMarkup([[InlineKeyboardButton(JOIN_CHAT_BUTTON, f"join_{query.from_user.id}")]])
user = get_user(query.from_user.id)
_, uid, uname, date, banned = user
text = USER_INFO.format(uid=uid, uname='@' + uname if uname != 'null' else uname, date=date,
status=YES if banned else NO,
admin='N/A')
CACHE[query.from_user.id].append([])
for admin in ADMINS:
status = CACHE[admin][0]
if status != "IN_CHAT":
message = wrapper.send_message(admin, SUPPORT_NOTIFICATION.format(uinfo=text), reply_markup=join_chat_button)
CACHE[query.from_user.id][-1].append((message.chat.id, message.message_id))
CACHE[admin] = ["NOTIFICATION_SENT", query.from_user.id]
CACHE[query.from_user.id][-1].append((msg.chat.id, msg.message_id))
@Client.on_callback_query(Filters.callback_data("update_admins_list") & ~BANNED_USERS)
@Client.on_callback_query(Filters.callback_data("update_admins_list") & ~BANNED_USERS & ~user_banned())
def update_admins_list(_, query):
if time.time() - CACHE[query.from_user.id][1] >= ADMINS_LIST_UPDATE_DELAY:
if CACHE[query.from_user.id][0] == "AWAITING_ADMIN":
CACHE[query.from_user.id] = ["AWAITING_ADMIN", time.time()]
cb_wrapper = MethodWrapper(query)
if CACHE[query.from_user.id][0] == "AWAITING_ADMIN":
if time.time() - CACHE[query.from_user.id][1] >= ADMINS_LIST_UPDATE_DELAY:
CACHE[query.from_user.id][1] = time.time()
queue = LIVE_CHAT_STATUSES
for admin_id, admin_name in ADMINS.items():
status = CACHE[admin_id][0]
@ -63,70 +67,80 @@ def update_admins_list(_, query):
else:
queue += f"- {STATUS_BUSY}"
queue += f"[{admin_name}]({NAME.format(admin_id)})\n"
edit_message_text(query, True, SUPPORT_REQUEST_SENT.format(queue=queue, date=time.strftime('%d/%m/%Y %T')),
reply_markup=BUTTONS)
cb_wrapper.edit_message_text(SUPPORT_REQUEST_SENT.format(queue=queue, date=time.strftime('%d/%m/%Y %T')),
reply_markup=BUTTONS)
join_chat_button = InlineKeyboardMarkup([[InlineKeyboardButton(JOIN_CHAT_BUTTON, f"join_{query.from_user.id}")]])
user = get_user(query.from_user.id)
_, uid, uname, date, banned = user
text = USER_INFO.format(uid=uid, uname='@' + uname if uname != 'null' else uname, date=date,
status=YES if banned else NO,
admin='N/A')
for admin in ADMINS:
status = CACHE[admin][0]
if status != "IN_CHAT":
if status != "NOTIFICATION_SENT" and CACHE[admin][1] != uid:
message = wrapper.send_message(admin, SUPPORT_NOTIFICATION.format(uinfo=text), reply_markup=join_chat_button)
CACHE[query.from_user.id][-1].append((message.chat.id, message.message_id))
else:
cb_wrapper.answer(TOO_FAST, show_alert=True)
else:
back_start(_, query)
@Client.on_callback_query(~user_banned() & callback_regex(r"close_chat_\d+") & ~BANNED_USERS & user_is_chatting() | admin_is_chatting() & ~BANNED_USERS & callback_regex(r"close_chat_\d+"))
def close_chat(_, query):
user_id = int(query.data.split("_")[2])
if query.from_user.id in ADMINS:
data = CACHE[user_id][-1]
if isinstance(data, list):
data.append((query.from_user.id, query.message.message_id))
for chatid, message_ids in data:
wrapper.delete_messages(chatid, message_ids)
status = CACHE[query.from_user.id][0]
if status == "IN_CHAT":
wrapper.send_message(query.from_user.id, USER_LEAVES_CHAT)
admin_id, admin_name = query.from_user.id, ADMINS[query.from_user.id]
logging.warning(f"{ADMINS[admin_id]} [{admin_id}] has terminated the chat with user {CACHE[admin_id][1]}")
if CACHE[user_id][0] == "IN_CHAT":
del CACHE[user_id]
wrapper.send_message(user_id,
USER_CLOSES_CHAT.format(user_id=NAME.format(admin_id), user_name=admin_name))
del CACHE[query.from_user.id][1]
else:
data = CACHE[query.from_user.id][-1]
if isinstance(data, list):
for chatid, message_ids in data:
wrapper.delete_messages(chatid, message_ids)
admin_id = CACHE[query.from_user.id][1]
if CACHE[user_id][1]:
if query.from_user.first_name:
user_name = query.from_user.first_name
elif query.from_user.username:
user_name = query.from_user.username
else:
user_name = "Anonymous"
logging.warning(f"{user_name} [{query.from_user.id}] has terminated the chat with admin {ADMINS[admin_id]} [{admin_id}]")
wrapper.send_message(query.from_user.id,
USER_LEAVES_CHAT)
wrapper.send_message(CACHE[user_id][1],
USER_CLOSES_CHAT.format(user_id=NAME.format(query.from_user.id), user_name=user_name))
del CACHE[query.from_user.id]
del CACHE[admin_id]
else:
back_start(_, query)
else:
answer(query, True, TOO_FAST, show_alert=True)
@Client.on_callback_query(callback_regex(r"close_chat_\d+") & ~BANNED_USERS)
def close_chat(_, query):
if user_is_chatting() or admin_is_chatting():
user_id = int(query.data.split("_")[2])
if query.from_user.id in ADMINS:
data = CACHE[CACHE[query.from_user.id][1]][-1]
if isinstance(data, list):
data.append((query.from_user.id, query.message.message_id))
for chatid, message_ids in data:
delete_messages(_, True, chatid, message_ids)
status = CACHE[query.from_user.id][0]
if status == "IN_CHAT":
del CACHE[query.from_user.id][1]
send_message(_, True, query.from_user.id, USER_LEAVES_CHAT)
admin_id, admin_name = query.from_user.id, ADMINS[query.from_user.id]
if CACHE[user_id][1]:
send_message(_, True, user_id,
USER_CLOSES_CHAT.format(user_id=NAME.format(admin_id), user_name=admin_name))
if user_id in CACHE:
del CACHE[user_id]
logging.warning(f"{ADMINS[admin_id]} [{admin_id}] has terminated the chat with user {CACHE[admin_id][1]}")
del CACHE[admin_id]
else:
data = CACHE[query.from_user.id][-1]
if isinstance(data, list):
for chatid, message_ids in data:
delete_messages(_, True, chatid, message_ids)
admin_id = CACHE[query.from_user.id][1]
if CACHE[user_id][1]:
if query.from_user.first_name:
user_name = query.from_user.first_name
elif query.from_user.username:
user_name = query.from_user.username
else:
user_name = "Anonymous"
logging.warning(f"{user_name} [{query.from_user.id}] has terminated the chat with admin {ADMINS[admin_id]} [{admin_id}]")
send_message(_, True, query.from_user.id,
USER_LEAVES_CHAT)
send_message(_, True, CACHE[user_id][1],
USER_CLOSES_CHAT.format(user_id=NAME.format(query.from_user.id), user_name=user_name))
del CACHE[query.from_user.id]
del CACHE[admin_id]
else:
back_start(_, query)
@Client.on_message(admin_is_chatting() & Filters.text & ~BANNED_USERS)
@Client.on_message(admin_is_chatting() & Filters.text & ~BANNED_USERS & ~user_banned())
def forward_from_admin(client, message):
logging.warning(f"Admin {ADMINS[message.from_user.id]} [{message.from_user.id}] says to {CACHE[message.from_user.id][1]}: {message.text.html}")
send_message(client, True, CACHE[message.from_user.id][1],
wrapper.send_message(CACHE[message.from_user.id][1],
ADMIN_MESSAGE.format(user_name=ADMINS[message.from_user.id],
user_id=NAME.format(message.from_user.id),
message=message.text.html))
@Client.on_message(user_is_chatting() & Filters.text & ~BANNED_USERS)
@Client.on_message(user_is_chatting() & Filters.text & ~BANNED_USERS & ~user_banned())
def forward_from_user(client, message):
if message.from_user.first_name:
name = message.from_user.first_name
@ -135,28 +149,30 @@ def forward_from_user(client, message):
else:
name = "Anonymous"
logging.warning(f"User {name} [{message.from_user.id}] says to Admin {ADMINS[CACHE[message.from_user.id][1]]} [{CACHE[message.from_user.id][1]}]: {message.text.html}")
send_message(client, True, CACHE[message.from_user.id][1],
wrapper.send_message(CACHE[message.from_user.id][1],
USER_MESSAGE.format(user_name=name, user_id=NAME.format(message.from_user.id),
message=message.text.html))
@Client.on_callback_query(ADMINS_FILTER & callback_regex(r"join_\d+") & ~BANNED_USERS)
@Client.on_callback_query(ADMINS_FILTER & callback_regex(r"join_\d+") & ~BANNED_USERS & ~user_banned())
def join_chat(_, query):
cb_wrapper = MethodWrapper(query)
if CACHE[query.from_user.id][0] != "IN_CHAT":
user_id = int(query.data.split("_")[1])
if CACHE[user_id][0] != "AWAITING_ADMIN":
answer(query, True, CHAT_BUSY)
cb_wrapper.answer(CHAT_BUSY)
else:
buttons = InlineKeyboardMarkup([[InlineKeyboardButton("❌ Close chat", f"close_chat_{user_id}")]])
buttons = InlineKeyboardMarkup([[InlineKeyboardButton(CLOSE_CHAT_BUTTON, f"close_chat_{user_id}")]])
admin_id, admin_name = query.from_user.id, ADMINS[query.from_user.id]
CACHE[user_id] = ["IN_CHAT", admin_id, CACHE[user_id][-1]]
CACHE[query.from_user.id] = ["IN_CHAT", user_id]
message = send_message(_, True, query.from_user.id, USER_JOINS_CHAT, reply_markup=buttons)
admin_joins = send_message(_, True, user_id, ADMIN_JOINS_CHAT.format(admin_name=admin_name, admin_id=NAME.format(admin_id)),
message = wrapper.send_message(query.from_user.id, USER_JOINS_CHAT, reply_markup=buttons)
admin_joins = wrapper.send_message(user_id, ADMIN_JOINS_CHAT.format(admin_name=admin_name, admin_id=NAME.format(admin_id)),
reply_markup=buttons)
for chatid, message_ids in CACHE[CACHE[query.from_user.id][1]][-1]:
delete_messages(_, True, chatid, message_ids)
for chatid, message_ids in CACHE[user_id][-1]:
wrapper.delete_messages(chatid, message_ids)
CACHE[user_id][-1].append((message.chat.id, message.message_id))
CACHE[user_id][-1].append((admin_joins.chat.id, admin_joins.message_id))
else:
answer(query, True, LEAVE_CURRENT_CHAT)
cb_wrapper.answer(LEAVE_CURRENT_CHAT)

View File

@ -1,28 +1,15 @@
from pyrogram import Client, Filters, InlineKeyboardButton, InlineKeyboardMarkup
from .antiflood import BANNED_USERS
from ..config import GREET, BUTTONS, CREDITS, CACHE, YOU_ARE_BANNED
from ..database.query import get_users, set_user, get_user
from ..config import GREET, BUTTONS, CREDITS, CACHE, bot, VERSION, RELEASE_DATE, user_banned, BACK_BUTTON
from ..database.query import get_users, set_user
import logging
import itertools
from ..methods.safe_send import send_message
from ..methods.safe_edit import edit_message_text
from ..methods.various import delete_messages
from ..methods import MethodWrapper
wrapper = MethodWrapper(bot)
def check_user_banned(tg_id: int):
res = get_user(tg_id)
if isinstance(res, Exception):
return False
else:
if not res:
return False
if res[-1]:
return True
else:
return False
@Client.on_message(Filters.command("start") & ~BANNED_USERS & Filters.private)
@Client.on_message(Filters.command("start") & ~BANNED_USERS & Filters.private & ~user_banned())
def start_handler(client, message):
"""Simply handles the /start command sending a pre-defined greeting
and saving new users to the database"""
@ -33,33 +20,29 @@ def start_handler(client, message):
name = message.from_user.username
else:
name = "Anonymous"
if check_user_banned(message.from_user.id):
BANNED_USERS.add(message.from_user.id)
send_message(client, True, message.from_user.id, YOU_ARE_BANNED)
else:
if message.from_user.id not in itertools.chain(*get_users()):
logging.warning(f"New user detected ({message.from_user.id}), adding to database")
set_user(message.from_user.id, message.from_user.username)
if GREET:
send_message(client,
True,
message.from_user.id,
GREET.format(mention=f"[{name}](tg://user?id={message.from_user.id})",
id=message.from_user.id,
username=message.from_user.username
),
reply_markup=BUTTONS
)
if message.from_user.id not in itertools.chain(*get_users()):
logging.warning(f"New user detected ({message.from_user.id}), adding to database")
set_user(message.from_user.id, message.from_user.username.lower() if message.from_user.username else None)
if GREET:
wrapper.send_message(message.from_user.id,
GREET.format(mention=f"[{name}](tg://user?id={message.from_user.id})",
id=message.from_user.id,
username=message.from_user.username
),
reply_markup=BUTTONS
)
@Client.on_callback_query(Filters.callback_data("info") & ~BANNED_USERS)
def bot_info(_, query):
buttons = InlineKeyboardMarkup([[InlineKeyboardButton("🔙 Back", "back_start")]])
edit_message_text(query, True, CREDITS.format(), reply_markup=buttons)
cb_wrapper = MethodWrapper(query)
buttons = InlineKeyboardMarkup([[InlineKeyboardButton(BACK_BUTTON, "back_start")]])
cb_wrapper.edit_message_text(CREDITS.format(VERSION=VERSION, RELEASE_DATE=RELEASE_DATE), reply_markup=buttons)
@Client.on_callback_query(Filters.callback_data("back_start") & ~BANNED_USERS)
def back_start(_, query):
cb_wrapper = MethodWrapper(query)
if query.from_user.first_name:
name = query.from_user.first_name
elif query.from_user.username:
@ -69,9 +52,8 @@ def back_start(_, query):
if CACHE[query.from_user.id][0] == "AWAITING_ADMIN":
data = CACHE[query.from_user.id][-1]
if isinstance(data, list):
for chatid, message_ids in data:
delete_messages(_, True, chatid, message_ids)
edit_message_text(query, True,
GREET.format(mention=f"[{name}](tg://user?id={query.from_user.id})", id=query.from_user.id,
username=query.from_user.username),
reply_markup=BUTTONS)
for chatid, message_ids in data[:-2]:
wrapper.delete_messages(chatid, message_ids)
cb_wrapper.edit_message_text(GREET.format(mention=f"[{name}](tg://user?id={query.from_user.id})", id=query.from_user.id,
username=query.from_user.username),
reply_markup=BUTTONS)

View File

@ -4,45 +4,13 @@ BotBase has a builtin collection of wrappers around Pyrogram methods that make
it even easier to use them properly.
**DISCLAIMER**: These methods are just wrappers around Pyrogram's ones and behave
exactly the same. Every method listed here takes 2 extra arguments,
namely a `Client`/`CallbackQuery` instance and a boolean parameter (read below)
the same way.
All other arguments, including keyword ones, are passed to pyrogram directly.
The methods are wrapped in try/except blocks and log automatically all errors
to the console. Also, if `sleep=True` (which is by default) if the method raises
a `FloodWait` exception, the wrapper will sleep the required amount of time and
then return the `FloodWait` exception. If `sleep=False` the exception is returned
immediately. All other exceptions are catched under `RPCError` and are returned
if they get raised, too. If no exception occurs the wrapper will return whatever
the corresponding pyrogram method returns.
## Methods - Safe send
List of the available functions in `BotBase.methods.safe_send`
- `send_message`
- `send_photo`
- `send_audio`
- `send_animation`
- `send_sticker`
These are the exact names that pyrogram uses, to see their docs refer to
[pyrogram docs](https://docs.pyrogram.org/api/methods/)
## Methods - Safe edit
List of the available functions in `BotBase.methods.safe_edit`
- `edit_message_text`
- `edit_message_media`
- `edit_message_caption`
## Methods - Various
List of the available functions in `BotBase.methods.various`
- `answer` (for `CallbackQuery` objects)
- `delete_messages`
- `get_users`
To use the "safe" methods, just import the `MethodWrapper`` class from `BotBase.methods`
and pass it a `pyrogram.Client` **instance** (not the class) or a `pyrogram.CallbackQuery`
or even a `pyrogram.InlineQuery` object. Then you can just call `wrapper.method` rather than `client.method`.
This way, the calls will never trigger exceptions and will log errors to stderr.
If an exception occurs, the exception object is returned, otherwise whatever
the called pyrogram method returns will be returned.

View File

@ -21,6 +21,8 @@ To setup a project using BotBase, follow this step-by-step guide (assuming `pip`
**Note**: The configuration file is still a python file and when it will be imported any python code that you typed inside it will be executed, so be careful! If you need to perform pre-startup operations it is advised to do them in the `if __name__ == "main":` block inside `bot.py`, before `bot.start()`
Once you're done configuring, move to the top level directory of the project and run `python3 bot.py`
## BotBase - Plugins
BotBase comes with lots of default plugins and tools to manage database interaction.
@ -73,6 +75,7 @@ The available commands are:
- `/whisper ID msg`: Send `msg` to a specific user given its ID. HTML and markdown formatting supported
- `/update ID`: Updates the user's info in the database, if they've changed
- `/busy`: Sets your admin status as busy/not busy to silence/unsilence support requests to you
- `/userbyname`: Same as `getuser`, but takes an username (without the @) as input. Note that if the database contains multiple users with the same username, due to old data for instance, only the first entry is returned
### Plugins - Antiflood
@ -96,7 +99,7 @@ If you don't know what a smart plugin is, check [this link](https://docs.pyrogra
There are some things to keep in mind, though:
- If you want to protect your plugin from flood, import the `BotBase.modules.antiflood.BANNED_USERS` filter (basically a `Filters.user()` object) and use it like this: `~BANNED_USERS`. This will restrict banned users from reaching your handler at all.
Please note that users banned with the `/ban` command will be put in that filter, too.
Please note that users banned with the `/ban` command are filtered with the custom filter `BotBase.config.user_banned`!
- To avoid repetition with try/except blocks, BotBase also implements some wrappers around `pyrogram.Client` and `pyrogram.CallbackQuery` (and many more soon) that perform automatic exception handling and log to the console automatically, check the `METHODS.md` file in this repo to know more
- Nothing restricts you from changing how the default plugins work, but this is not advised. The default plugins have been designed to cooperate together and breaking this might lead to obscure tracebacks and errors that are hard to debug
- BotBase also has many default methods to handle database interaction, check the `DATABASE.md` file in this repo to know more

5
bot.py
View File

@ -1,5 +1,5 @@
import logging
from pyrogram import Client, CallbackQueryHandler
from pyrogram import CallbackQueryHandler
from pyrogram.session import Session
import importlib
@ -7,11 +7,10 @@ import importlib
if __name__ == "__main__":
MODULE_NAME = "BotBase" # Change this to match the FOLDER name that contains the config.py file
conf = importlib.import_module(f"{MODULE_NAME}.config")
bot = conf.bot
antiflood = importlib.import_module(f"{MODULE_NAME}.modules.antiflood")
dbmodule = importlib.import_module(f"{MODULE_NAME}.database.query")
logging.basicConfig(format=conf.LOGGING_FORMAT, datefmt=conf.DATE_FORMAT, level=conf.LOGGING_LEVEL)
bot = Client(api_id=conf.API_ID, api_hash=conf.API_HASH, bot_token=conf.BOT_TOKEN, plugins=conf.PLUGINS_ROOT,
session_name=conf.SESSION_NAME, workers=conf.WORKERS_NUM)
bot.add_handler(CallbackQueryHandler(antiflood.anti_flood, ~antiflood.BYPASS_USERS), group=-1)
Session.notice_displayed = True
try: