mirror of https://github.com/nocturn9x/BotBase.git
Started to work with the /whisper command
This commit is contained in:
parent
3addea5ecc
commit
5a0ad4a199
|
@ -1,11 +1,11 @@
|
|||
import sqlite3.dbapi2 as sqlite3
|
||||
from ..config import DB_GET_USERS, DB_GET_USER, DB_RELPATH, DB_SET_USER, DB_GET_IMEI, DB_SET_IMEI, DB_GET_API_DATE, \
|
||||
DB_SET_API_DATE, DB_SET_IMEI_DATA, DB_GET_IMEI_DATA
|
||||
from ..config import DB_GET_USERS, DB_GET_USER, DB_RELPATH, DB_SET_USER
|
||||
import logging
|
||||
import time
|
||||
from types import FunctionType
|
||||
import os
|
||||
|
||||
|
||||
def create_database(path: str, query: str):
|
||||
if os.path.exists(path):
|
||||
logging.warning(f"Database file exists at {path}, running query")
|
||||
|
@ -24,7 +24,6 @@ def create_database(path: str, query: str):
|
|||
except sqlite3.Error as query_error:
|
||||
logging.info(f"An error has occurred while executing query: {query_error}")
|
||||
|
||||
|
||||
def get_user(tg_id: int):
|
||||
try:
|
||||
database = sqlite3.connect(DB_RELPATH)
|
||||
|
@ -38,7 +37,7 @@ def get_user(tg_id: int):
|
|||
return query.fetchone()
|
||||
except sqlite3.Error as query_error:
|
||||
logging.error(f"An error has occurred while executing DB_GET_USER query: {query_error}")
|
||||
|
||||
return query_error
|
||||
|
||||
def get_users():
|
||||
try:
|
||||
|
@ -53,6 +52,7 @@ def get_users():
|
|||
return query.fetchall()
|
||||
except sqlite3.Error as query_error:
|
||||
logging.error(f"An error has occurred while executing DB_GET_USERS query: {query_error}")
|
||||
return query_error
|
||||
|
||||
|
||||
def set_user(tg_id: int, uname: str):
|
||||
|
@ -69,96 +69,4 @@ def set_user(tg_id: int, uname: str):
|
|||
return True
|
||||
except sqlite3.Error as query_error:
|
||||
logging.error(f"An error has occurred while executing DB_GET_USERS query: {query_error}")
|
||||
|
||||
|
||||
def set_imei(tg_id: int, imei: str):
|
||||
try:
|
||||
database = sqlite3.connect(DB_RELPATH)
|
||||
except sqlite3.Error as connection_error:
|
||||
logging.error(f"An error has occurred while connecting to database: {connection_error}")
|
||||
else:
|
||||
try:
|
||||
with database:
|
||||
cursor = database.cursor()
|
||||
cursor.execute(DB_SET_IMEI, (imei, tg_id))
|
||||
cursor.close()
|
||||
return True
|
||||
except sqlite3.Error as query_error:
|
||||
logging.error(f"An error has occurred while executing DB_SET_IMEI query: {query_error}")
|
||||
|
||||
|
||||
def get_imei(tg_id: int):
|
||||
try:
|
||||
database = sqlite3.connect(DB_RELPATH)
|
||||
except sqlite3.Error as connection_error:
|
||||
logging.error(f"An error has occurred while connecting to database: {connection_error}")
|
||||
else:
|
||||
try:
|
||||
with database:
|
||||
cursor = database.cursor()
|
||||
query = cursor.execute(DB_GET_IMEI, (tg_id,))
|
||||
return query.fetchone()
|
||||
except sqlite3.Error as query_error:
|
||||
logging.error(f"An error has occurred while executing DB_GET_IMEI query: {query_error}")
|
||||
|
||||
|
||||
def set_api_date(tg_id: int, timer: FunctionType = time.time):
|
||||
try:
|
||||
database = sqlite3.connect(DB_RELPATH)
|
||||
except sqlite3.Error as connection_error:
|
||||
logging.error(f"An error has occurred while connecting to database: {connection_error}")
|
||||
else:
|
||||
try:
|
||||
with database:
|
||||
cursor = database.cursor()
|
||||
cursor.execute(DB_SET_API_DATE, (int(timer()), tg_id))
|
||||
cursor.close()
|
||||
return True
|
||||
except sqlite3.Error as query_error:
|
||||
logging.error(f"An error has occurred while executing DB_SET_API_DATE query: {query_error}")
|
||||
|
||||
|
||||
def get_api_date(tg_id: int):
|
||||
try:
|
||||
database = sqlite3.connect(DB_RELPATH)
|
||||
except sqlite3.Error as connection_error:
|
||||
logging.error(f"An error has occurred while connecting to database: {connection_error}")
|
||||
else:
|
||||
try:
|
||||
with database:
|
||||
cursor = database.cursor()
|
||||
query = cursor.execute(DB_GET_API_DATE, (tg_id,))
|
||||
return query.fetchone()
|
||||
except sqlite3.Error as query_error:
|
||||
logging.error(f"An error has occurred while executing DB_GET_API_DATE query: {query_error}")
|
||||
|
||||
|
||||
def set_imei_data(imei: int, json_data: str):
|
||||
try:
|
||||
database = sqlite3.connect(DB_RELPATH)
|
||||
except sqlite3.Error as connection_error:
|
||||
logging.error(f"An error has occurred while connecting to database: {connection_error}")
|
||||
else:
|
||||
try:
|
||||
with database:
|
||||
cursor = database.cursor()
|
||||
cursor.execute(DB_SET_IMEI_DATA, (imei, json_data))
|
||||
cursor.close()
|
||||
return True
|
||||
except sqlite3.Error as query_error:
|
||||
logging.error(f"An error has occurred while executing DB_SET_IMEI_DATA query: {query_error}")
|
||||
|
||||
|
||||
def get_imei_data(imei: int):
|
||||
try:
|
||||
database = sqlite3.connect(DB_RELPATH)
|
||||
except sqlite3.Error as connection_error:
|
||||
logging.error(f"An error has occurred while connecting to database: {connection_error}")
|
||||
else:
|
||||
try:
|
||||
with database:
|
||||
cursor = database.cursor()
|
||||
query = cursor.execute(DB_GET_IMEI_DATA, (imei,))
|
||||
return query.fetchone()
|
||||
except sqlite3.Error as query_error:
|
||||
logging.error(f"An error has occurred while executing DB_GET_IMEI_DATA query: {query_error}")
|
||||
return query_error
|
||||
|
|
|
@ -26,7 +26,11 @@ def get_user_info(client, message):
|
|||
if user:
|
||||
logging.warning(f"Admin with id {message.from_user.id} sent /getuser {message.command[1]}")
|
||||
_, uid, uname, date, banned = user
|
||||
text = USER_INFO.format(uid=uid, uname='@' + uname if uname != 'null' else uname, date=date, status='User' if not admin else 'Admin')
|
||||
admin = uid in ADMINS
|
||||
text = USER_INFO.format(uid=uid, uname='@' + uname if uname != 'null' else uname, date=date,
|
||||
status='✅' if banned else '❌',
|
||||
admin='❌' if not admin else '✅'),
|
||||
)
|
||||
send_message(client, message.chat.id, text)
|
||||
else:
|
||||
send_message(client, message.chat.id, f"{ERROR}: {ID_MISSING.format(uid=message.command[1])}")
|
||||
|
@ -43,9 +47,11 @@ def get_random_user(client, message):
|
|||
send_message(client, message.chat.id, f"{INVALID_SYNTAX}: {NO_PARAMETERS.format(command='/getranduser')}")
|
||||
else:
|
||||
user = random.choice(get_users())
|
||||
rowid, uid, uname, date, admin = get_user(*user)
|
||||
rowid, uid, uname, date, banned = get_user(*user)
|
||||
admin = uid in ADMINS
|
||||
text = USER_INFO.format(uid=uid, uname='@' + uname if uname != 'null' else uname, date=date,
|
||||
status='User' if not admin else 'Admin',
|
||||
status='✅' if banned else '❌',
|
||||
admin='❌' if not admin else '✅'),
|
||||
)
|
||||
send_message(client, message.chat.id, text)
|
||||
|
||||
|
@ -59,9 +65,16 @@ def global_message(client, message):
|
|||
count = 0
|
||||
for uid in get_users():
|
||||
count += 1
|
||||
if not send_message(client, *uid, msg): # Returns False if an error gets raised
|
||||
if isinstance(send_message(client, *uid, msg), Exception):
|
||||
missed += 1
|
||||
send_message(client, message.chat.id, GLOBAL_MESSAGE_STATS.format(count=count, success=(count - missed), msg=msg))
|
||||
else:
|
||||
send_message(client, message.chat.id, f"{INVALID_SYNTAX}: Use <code>/global message</code>"
|
||||
"\n🍮 Note that the <code>/global</code> command supports markdown and html styling")
|
||||
|
||||
@Client.on_message(Filters.command("whisper") & ADMINS_FILTER & Filters.private & ~BANNED_USERS)
|
||||
def whisper(client, message):
|
||||
if len(message.command) > 1:
|
||||
msg = " ".join(message.command[1:])
|
||||
logging.warning(f"Admin with id {message.from_user.id} sent /whisper to {message.command}
|
||||
|
||||
|
|
|
@ -22,8 +22,15 @@ def start_handler(client, message):
|
|||
if message.from_user.id not in itertools.chain(*get_users()):
|
||||
logging.warning(f"New user detected ({message.from_user.id}), adding to database")
|
||||
set_user(message.from_user.id, None if not message.from_user.username else message.from_user.username)
|
||||
send_message(client, message.chat.id, GREET.format(mention=f"[{name}](tg://user?id={message.from_user.id})"),
|
||||
reply_markup=BUTTONS)
|
||||
if GREET:
|
||||
send_message(client,
|
||||
message.chat.id,
|
||||
GREET.format(mention=f"[{name}](tg://user?id={message.from_user.id})",
|
||||
id=message.from_user.id,
|
||||
username=message.from_user.username
|
||||
),
|
||||
reply_markup=BUTTONS
|
||||
)
|
||||
|
||||
|
||||
@Client.on_callback_query(Filters.callback_data("info"))
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
from pyrogram import Client, Filters, InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from .antiflood import BANNED_USERS
|
||||
from ..config import GREET, BUTTONS, CREDITS
|
||||
from ..database.query import get_users, set_user
|
||||
import logging
|
||||
import itertools
|
||||
from ..methods.safe_send import send_message
|
||||
from ..methods.safe_edit import edit_message_text
|
||||
|
||||
|
||||
@Client.on_message(Filters.command("start") & ~BANNED_USERS & Filters.private)
|
||||
def start_handler(client, message):
|
||||
"""Simply handles the /start command sending a pre-defined greeting
|
||||
and saving new users to the database"""
|
||||
|
||||
if message.from_user.first_name:
|
||||
name = message.from_user.first_name
|
||||
elif message.from_user.username:
|
||||
name = message.from_user.username
|
||||
else:
|
||||
name = "Anonymous"
|
||||
if message.from_user.id not in itertools.chain(*get_users()):
|
||||
logging.warning(f"New user detected ({message.from_user.id}), adding to database")
|
||||
set_user(message.from_user.id, None if not message.from_user.username else message.from_user.username)
|
||||
if GREET:
|
||||
send_message(client,
|
||||
message.chat.id,
|
||||
GREET.format(mention=f"[{name}](tg://user?id={message.from_user.id})",
|
||||
id=message.from_user.id,
|
||||
username=message.from_user.username
|
||||
),
|
||||
reply_markup=BUTTONS
|
||||
)
|
||||
|
||||
|
||||
@Client.on_callback_query(Filters.callback_data("info"))
|
||||
def bot_info(_, query):
|
||||
buttons = InlineKeyboardMarkup([[InlineKeyboardButton("🔙 Back", "back_start")]])
|
||||
edit_message_text(query, CREDITS, reply_markup=buttons)
|
||||
|
||||
|
||||
@Client.on_callback_query(Filters.callback_data("back_start"))
|
||||
def
|
15
DATABASE.md
15
DATABASE.md
|
@ -30,9 +30,8 @@ the date and time the user was inserted in the database as a string
|
|||
(formatted as d/m/Y H:M:S) and an integer (0 for `False` and 1 for `True`)
|
||||
that represents the user's status (whether it is banned or not)
|
||||
|
||||
- `get_users()` -> This acts similarly to the above `get_user`, but takes
|
||||
no parameters and returns a list of all the users in the database. The
|
||||
list contains tuples of the same structure of the ones returned by `get_user`
|
||||
- `get_users()` -> This method takes no parameter and returns a list
|
||||
of tuples. Each tuple contains a user ID as stored in the database
|
||||
|
||||
- `set_user()` -> Saves an ID/username pair (in this order)
|
||||
to the database. The username parameter can be `None`
|
||||
|
@ -47,3 +46,13 @@ The API has been designed in a way that makes it easy to swap between different
|
|||
database managers, so if you feel in the right mood make a PR to support a new
|
||||
database and it'll be reviewed ASAP.
|
||||
|
||||
|
||||
# Adding more methods
|
||||
|
||||
If you want to add custom methods to the API, we advise to follow the bot's convention:
|
||||
|
||||
- Set the SQL query as a global variable whose name starts with `DB_` in `config.py`
|
||||
- Import it in the `BotBase.database.query` module
|
||||
- Create a new function that takes the required parameters whose name reflects the query name (without `DB_`)
|
||||
- Perform the query in a `with` context manager, close the cursor when you're done
|
||||
- Return `True` or the query result if the query was successful, or an `Exception` subclass if an error occurs
|
||||
|
|
Loading…
Reference in New Issue