Compare commits
4 Commits
334134c578
...
d69448d8db
Author | SHA1 | Date |
---|---|---|
Mattia Giambirtone | d69448d8db | |
Mattia Giambirtone | 90023ac2f0 | |
Mattia Giambirtone | 5427a09004 | |
Mattia Giambirtone | b8a6d4da7a |
|
@ -138,3 +138,11 @@ dmypy.json
|
|||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
|
||||
*.session
|
||||
*.session-journal
|
||||
config.ini
|
||||
bin/*
|
||||
bin
|
||||
bin/
|
||||
pyvenv.cfg
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
# Copyright (C) 2022 nocturn9x
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
|
@ -0,0 +1,34 @@
|
|||
# Copyright (C) 2022 nocturn9x
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from pydantic import BaseModel
|
||||
from typing import Optional
|
||||
import os
|
||||
|
||||
|
||||
class BotConfig(BaseModel):
|
||||
"""
|
||||
A container class for bot configuration variables.
|
||||
Because passing around a fuckload of global variables
|
||||
wasn't bad enough
|
||||
"""
|
||||
|
||||
api_id: int
|
||||
api_hash: str
|
||||
bot_token: str
|
||||
logging_format: str
|
||||
logging_date_format: str
|
||||
logging_level: int
|
||||
restart_times: Optional[int] = 0
|
||||
plugins_dir: Optional[str] = "modules"
|
||||
working_directory: Optional[str] = os.getcwd()
|
||||
session_name: Optional[str] = "userbot"
|
|
@ -0,0 +1,28 @@
|
|||
# Copyright (C) 2022 nocturn9x
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
class SharedStatus:
|
||||
|
||||
"""
|
||||
Simple class to share state across the userbot.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.state = {}
|
||||
|
||||
def add(self, key, value):
|
||||
self.state[key] = value
|
||||
|
||||
def __getattr__(self, item):
|
||||
if item in self.__dict__:
|
||||
return self.__dict__[item]
|
||||
return self.state[item]
|
|
@ -0,0 +1,51 @@
|
|||
# Copyright (C) 2022 nocturn9x
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from pyrogram.types import Message
|
||||
from pyrogram.errors import RPCError
|
||||
from pyrogram import Client, filters
|
||||
from pyrogram.enums.parse_mode import ParseMode
|
||||
|
||||
|
||||
HELP_MSG = """List of available commands:
|
||||
- /help: Opens help menu (you are here)
|
||||
- /start: Starts the bot
|
||||
- /play: Create a lobby to play UNO!
|
||||
- /stop: Stops the current game
|
||||
- /draw: Draw a card from the deck
|
||||
- /uno: Say UNO!
|
||||
- /pass: End your turn
|
||||
- /join <id>: Join a game lobby
|
||||
- /close: Close the game lobby to new players
|
||||
- /open: Reopens a lobby to new players
|
||||
- /list: List all players in the current lobby
|
||||
- /leave: Leave the current lobby
|
||||
- /kick <id>: Kicks a given player from the current game
|
||||
- /begin: Starts the game and closes the lobby
|
||||
- /broadcast <message>: Sends a message to all users
|
||||
- /whisper <id> <message>: Sends a message to a single user
|
||||
- /throw <card>: Throws a card onto the table
|
||||
- /info: Shows the current state of the game (your cards and the table)
|
||||
"""
|
||||
|
||||
|
||||
@Client.on_message(filters.command("help"))
|
||||
async def help(_: Client, message: Message):
|
||||
"""
|
||||
Handles the /help command
|
||||
"""
|
||||
|
||||
try:
|
||||
await message.reply_text(HELP_MSG, parse_mode=ParseMode.MARKDOWN)
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
|
@ -0,0 +1,29 @@
|
|||
# Copyright (C) 2022 nocturn9x
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from pyrogram.types import Message
|
||||
from pyrogram.errors import RPCError
|
||||
from pyrogram import Client, filters
|
||||
|
||||
|
||||
|
||||
@Client.on_message(filters.command("start"))
|
||||
async def start(_: Client, message: Message):
|
||||
"""
|
||||
Handles the /start command
|
||||
"""
|
||||
|
||||
try:
|
||||
await message.reply_text("Hi! I'm the UNO! bot. I can let you play with your friends in a group. Send /help to learn more!")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
|
@ -0,0 +1,869 @@
|
|||
# Copyright (C) 2022 nocturn9x
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import random
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum, auto
|
||||
from typing import Optional
|
||||
from pyrogram.types import Message
|
||||
from pyrogram.errors import RPCError
|
||||
from pyrogram.enums import ChatType
|
||||
from pyrogram.enums.parse_mode import ParseMode
|
||||
from pyrogram import Client, filters
|
||||
from collections import deque, defaultdict
|
||||
|
||||
|
||||
class UNODirection(Enum):
|
||||
"""
|
||||
The direction of play in the
|
||||
current game
|
||||
"""
|
||||
|
||||
CLOCKWISE: int = auto()
|
||||
COUNTER_CLOCKWISE: int = auto()
|
||||
|
||||
|
||||
class UNOCardType(Enum):
|
||||
"""
|
||||
A UNO card type
|
||||
"""
|
||||
|
||||
NUMBER: int = auto()
|
||||
DRAW: int = auto()
|
||||
WILD: int = auto()
|
||||
WILD_DRAW: int = auto()
|
||||
REVERSE: int = auto()
|
||||
SKIP: int = auto()
|
||||
|
||||
|
||||
class UNOCardColor(Enum):
|
||||
"""
|
||||
A UNO card's color
|
||||
"""
|
||||
|
||||
RED: int = auto()
|
||||
BLUE: int = auto()
|
||||
GREEN: int = auto()
|
||||
YELLOW: int = auto()
|
||||
BLACK: int = auto()
|
||||
|
||||
|
||||
@dataclass
|
||||
class UNOCard:
|
||||
"""
|
||||
A single UNO card
|
||||
"""
|
||||
|
||||
kind: UNOCardType
|
||||
color: UNOCardColor
|
||||
stackable: Optional[bool] = True
|
||||
placed_by: "UNOPlayer" = None
|
||||
# Extra metadata
|
||||
metadata: Optional[dict[str, int | str | bool]] = None
|
||||
|
||||
|
||||
def get_default_deck(n_decks: int = 1) -> deque[UNOCard]:
|
||||
"""
|
||||
Generates the UNO! default deck. Can merge more than
|
||||
one deck if there's more people playing (use n_decks)
|
||||
"""
|
||||
|
||||
result = []
|
||||
for __ in range(n_decks):
|
||||
# Numbers: 1 to 9 for each color
|
||||
for color in [UNOCardColor.GREEN, UNOCardColor.RED,
|
||||
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
|
||||
for _ in range(2):
|
||||
for i in range(1, 10):
|
||||
result.append(UNOCard(UNOCardType.NUMBER, color, metadata={"value": i}))
|
||||
# 8 skip cards, 2 per color
|
||||
for _ in [UNOCardColor.GREEN, UNOCardColor.RED,
|
||||
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
|
||||
for _ in range(2):
|
||||
result.append(UNOCard(UNOCardType.SKIP, color, metadata={"exhausted": False}))
|
||||
# 8 reverse cards, 2 per color
|
||||
for _ in [UNOCardColor.GREEN, UNOCardColor.RED,
|
||||
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
|
||||
for _ in range(2):
|
||||
result.append(UNOCard(UNOCardType.REVERSE, color, metadata={"exhausted": False}))
|
||||
# 8 draw cards, 2 per color
|
||||
for _ in [UNOCardColor.GREEN, UNOCardColor.RED,
|
||||
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
|
||||
for _ in range(2):
|
||||
result.append(UNOCard(UNOCardType.DRAW, color, metadata={"exhausted": False}))
|
||||
# Wild and wild draw cards
|
||||
for _ in range(4):
|
||||
result.append(UNOCard(UNOCardType.WILD, UNOCardColor.BLACK, stackable=False, metadata={"exhausted": False}))
|
||||
for _ in range(4):
|
||||
result.append(UNOCard(UNOCardType.WILD_DRAW, UNOCardColor.BLACK, stackable=False, metadata={"exhausted": False}))
|
||||
random.shuffle(result)
|
||||
return deque(result)
|
||||
|
||||
|
||||
@dataclass
|
||||
class UNOPlayer:
|
||||
"""
|
||||
A single UNO player
|
||||
"""
|
||||
|
||||
id: int
|
||||
name: str
|
||||
cards: list[UNOCard] = field(default_factory=list)
|
||||
said_uno: bool = False
|
||||
won: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class UNOGame:
|
||||
"""
|
||||
A single UNO game
|
||||
"""
|
||||
|
||||
players: list[UNOPlayer] = field(default_factory=list)
|
||||
# We literally have a stack of cards, so it makes sense
|
||||
# to use a deque
|
||||
deck: deque[UNOCard] = field(default_factory=get_default_deck)
|
||||
table: deque[UNOCard] = field(default_factory=deque)
|
||||
direction: UNODirection = UNODirection.CLOCKWISE
|
||||
current_pos: int = 0
|
||||
current_player: Optional[UNOPlayer] = None
|
||||
winners: list[UNOPlayer] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class UNOLobby:
|
||||
"""
|
||||
A UNO playing lobby
|
||||
"""
|
||||
|
||||
creator: UNOPlayer
|
||||
game: UNOGame
|
||||
open: bool = True
|
||||
started: bool = False
|
||||
|
||||
|
||||
|
||||
class PlayerAction(Enum):
|
||||
"""
|
||||
An enum of possible
|
||||
player actions and
|
||||
statuses
|
||||
"""
|
||||
|
||||
WAITING_LOBBY: int = auto()
|
||||
WAITING_TURN: int = auto()
|
||||
PLAYING_TURN: int = auto()
|
||||
CHOOSING_COLOR: int = auto()
|
||||
END_TURN: int = auto()
|
||||
END_ACTION: int = auto()
|
||||
END_THROW: int = auto()
|
||||
END_DRAW: int = auto()
|
||||
NONE: int = auto()
|
||||
|
||||
|
||||
LOBBIES: list[UNOLobby] = []
|
||||
USERS: dict[int, tuple[PlayerAction, UNOLobby]] = defaultdict(lambda: (PlayerAction.NONE, None))
|
||||
|
||||
|
||||
@Client.on_message(filters.command("play"))
|
||||
async def play(_: Client, message: Message):
|
||||
"""
|
||||
Handles the /play command
|
||||
"""
|
||||
|
||||
try:
|
||||
if message.chat.type != ChatType.PRIVATE:
|
||||
await message.reply_text("This command can only be used in private chats. Send /help to learn more!")
|
||||
return
|
||||
elif USERS[message.from_user.id][0] != PlayerAction.NONE:
|
||||
await message.reply_text("You're already in a lobby: type /leave if you want to leave it or /stop if you're the creator")
|
||||
return
|
||||
decks = 1
|
||||
if len(message.command) > 1:
|
||||
if message.command[1].isnumeric():
|
||||
decks = int(message.command[1])
|
||||
else:
|
||||
await message.reply_text("Invalid deck count: it must be a positive integer")
|
||||
return
|
||||
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
|
||||
creator = UNOPlayer(message.from_user.id, name, [])
|
||||
deck = get_default_deck(decks)
|
||||
game = UNOGame(players=[creator], table=deque([deck.popleft()]), direction=UNODirection.CLOCKWISE)
|
||||
LOBBIES.append(UNOLobby(creator, game))
|
||||
USERS[creator.id] = (PlayerAction.WAITING_LOBBY, LOBBIES[-1])
|
||||
await message.reply_text(f"Lobby created (playing with {decks} deck{'s' if decks > 1 else ''}). Tell your friends to send me `/join {message.from_user.id}` to join the game")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
@Client.on_message(filters.command("close"))
|
||||
async def close(_: Client, message: Message):
|
||||
"""
|
||||
Handles the /close command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
if lobby.creator.id != message.from_user.id:
|
||||
await message.reply_text("Only the creator can close the lobby")
|
||||
elif not lobby.open:
|
||||
await message.reply_text("The lobby is already closed")
|
||||
else:
|
||||
lobby.open = False
|
||||
await message.reply_text("Closed the lobby to new players")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
@Client.on_message(filters.command("open"))
|
||||
async def open(_: Client, message: Message):
|
||||
"""
|
||||
Handles the /open command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
if lobby.creator.id != message.from_user.id:
|
||||
await message.reply_text("Only the creator can open the lobby")
|
||||
elif lobby.open:
|
||||
await message.reply_text("The lobby is already open")
|
||||
else:
|
||||
lobby.open = True
|
||||
await message.reply_text("Opened the lobby to new players")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
@Client.on_message(filters.command("join"))
|
||||
async def join(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /join command
|
||||
"""
|
||||
|
||||
try:
|
||||
if len(message.command) != 2:
|
||||
await message.reply_text(f"Invalid syntax, the correct one is `/join <id>`", parse_mode=ParseMode.MARKDOWN)
|
||||
return
|
||||
elif message.command[1].isnumeric():
|
||||
join_lobby = None
|
||||
creator_id = int(message.command[1])
|
||||
for i, lobby in enumerate(LOBBIES):
|
||||
if creator_id == lobby.creator.id:
|
||||
join_lobby = lobby
|
||||
break
|
||||
if not join_lobby:
|
||||
await message.reply_text(f"Could not find lobby `{creator_id}`. It may have been deleted or not exist yet. Send /help for more info")
|
||||
elif not lobby.open:
|
||||
await message.reply_text(f"Sorry, but this lobby isn't accepting any new players")
|
||||
else:
|
||||
for player in lobby.game.players:
|
||||
if player.id == message.from_user.id:
|
||||
await message.reply_text("You're already in this lobby")
|
||||
return
|
||||
USERS[message.from_user.id] = (PlayerAction.WAITING_LOBBY, lobby)
|
||||
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
|
||||
lobby.game.players.append(UNOPlayer(message.from_user.id, name, []))
|
||||
await message.reply_text(f"Joined lobby `{creator_id}`")
|
||||
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
|
||||
for user in lobby.game.players:
|
||||
if user.id != message.from_user.id:
|
||||
try:
|
||||
await client.send_message(user.id, f"{name} has joined!")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
else:
|
||||
await message.reply_text("Invalid lobby ID: it must be a positive integer")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
@Client.on_message(filters.command("list"))
|
||||
async def list(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /list command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
msg = f"List of players in lobby `{lobby.creator.id}`:\n"
|
||||
for player in lobby.game.players:
|
||||
msg += f"- {player.name} (`{player.id}`)"
|
||||
if player.cards:
|
||||
msg += f" (`{len(player.cards)}` card{'s' if len(player.cards) > 1 else ''})"
|
||||
if lobby.started and player == lobby.game.current_player:
|
||||
msg += " ⬅️"
|
||||
elif player.won:
|
||||
msg += " 👑"
|
||||
msg += "\n"
|
||||
await message.reply_text(msg)
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
@Client.on_message(filters.command("stop"))
|
||||
async def stop(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /stop command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
if message.from_user.id != lobby.creator.id:
|
||||
await message.reply_text("Only the creator can stop the game")
|
||||
return
|
||||
for user in lobby.game.players:
|
||||
del USERS[user.id]
|
||||
if user.id != message.from_user.id:
|
||||
try:
|
||||
await client.send_message(user.id, f"The game has been stopped by its creator, thanks for playing!")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
LOBBIES.remove(data[1])
|
||||
await message.reply_text("Lobby closed, thanks for playing!")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
|
||||
@Client.on_message(filters.command("leave"))
|
||||
async def leave(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /leave command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
if message.from_user.id == lobby.creator.id:
|
||||
await message.reply_text("You can't leave a lobby as a creator: use /stop instead")
|
||||
return
|
||||
for player in lobby.game.players:
|
||||
if player.id == message.from_user.id:
|
||||
break
|
||||
lobby.game.players.remove(player)
|
||||
# User's cards are added at the bottom
|
||||
# of the table
|
||||
lobby.game.table.extend(player.cards)
|
||||
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
|
||||
for user in lobby.game.players:
|
||||
if user.id != message.from_user.id:
|
||||
try:
|
||||
await client.send_message(user.id, f"{name} has left!")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
del USERS[message.from_user.id]
|
||||
await message.reply_text(f"Left lobby `{lobby.creator.id}`")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
@Client.on_message(filters.command("kick"))
|
||||
async def kick(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /kick command
|
||||
"""
|
||||
|
||||
try:
|
||||
if len(message.command) != 2:
|
||||
await message.reply_text("Invalid syntax, the correct one is `/kick <id>`", parse_mode=ParseMode.MARKDOWN)
|
||||
elif (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
if lobby.creator.id != message.from_user.id:
|
||||
await message.reply_text("Only the creator can kick users")
|
||||
elif message.command[1].isnumeric():
|
||||
kicked = int(message.command[1])
|
||||
if kicked == lobby.creator.id:
|
||||
await message.reply_text("You can't kick yourself")
|
||||
return
|
||||
found = None
|
||||
for player in lobby.game.players:
|
||||
if player.id == kicked:
|
||||
found = player
|
||||
if found:
|
||||
lobby.game.players.remove(player)
|
||||
del USERS[kicked]
|
||||
await client.send_message(kicked, f"You have been kicked from lobby `{lobby.creator.id}`")
|
||||
await message.reply_text(f"Kicked `{kicked}`")
|
||||
else:
|
||||
await message.reply_text(f"`{kicked}` is not in the lobby")
|
||||
else:
|
||||
await message.reply_text("Invalid user ID: it must be a positive integer")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
@Client.on_message(filters.command("broadcast"))
|
||||
async def broadcast(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /broadcast command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
elif len(message.command) <= 1:
|
||||
await message.reply_text("Invalid syntax, the correct one is `/broadcast <message>`", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
|
||||
for user in lobby.game.players:
|
||||
if user.id != message.from_user.id:
|
||||
try:
|
||||
await client.send_message(user.id, f"{name} says: {message.text[11:]}")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
await message.reply_text(f"Done!")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
@Client.on_message(filters.command("whisper"))
|
||||
async def whisper(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /whisper command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
elif len(message.command) <= 2:
|
||||
await message.reply_text("Invalid syntax, the correct one is `/whisper <id> <message>`", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
|
||||
user_id = None
|
||||
if message.command[1].isnumeric():
|
||||
user_id = int(message.command[1])
|
||||
else:
|
||||
await message.reply_text("Invalid user ID: it must be a positive integer")
|
||||
return
|
||||
found = False
|
||||
for user in lobby.game.players:
|
||||
if user.id == user_id:
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
await message.reply_text(f"`{user_id}` is not in this lobby, use /list to see the list of users")
|
||||
return
|
||||
try:
|
||||
await client.send_message(user_id, f"{name} whispers you: {message.text[11 + len(message.command[1]) - 1:]}")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
await message.reply_text(f"Done!")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
def reshuffle_table_in_deck(game: UNOGame):
|
||||
"""
|
||||
Called when the deck becomes empty and
|
||||
the cards on the table need to be reshuffled
|
||||
into it
|
||||
"""
|
||||
|
||||
game.deck.extend(game.table)
|
||||
game.table.clear()
|
||||
game.table.append(game.deck.popleft())
|
||||
while True:
|
||||
if game.table[0].kind != UNOCardType.NUMBER:
|
||||
game.table.append(game.deck.popleft())
|
||||
game.deck.append(game.table.popleft())
|
||||
else:
|
||||
break
|
||||
# Breaks up the pairs
|
||||
random.shuffle(game.deck)
|
||||
|
||||
|
||||
def pick_first_card(game: UNOGame):
|
||||
"""
|
||||
Picks the first card on the table from the
|
||||
deck, ensuring that it is a number
|
||||
"""
|
||||
|
||||
# Picks the first card (which can't be a special card)
|
||||
game.table.append(game.deck.popleft())
|
||||
while True:
|
||||
if game.table[0].kind != UNOCardType.NUMBER:
|
||||
game.table.append(game.deck.popleft())
|
||||
game.deck.append(game.table.popleft())
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
def distribute_cards(game: UNOGame, n: int = 1):
|
||||
"""
|
||||
Distributes n cards to all players evenly
|
||||
as if they were being given in real life
|
||||
one by one
|
||||
"""
|
||||
|
||||
# Gives 7 cards to each player, one by one (i.e first one to each player, then the second, etc.)
|
||||
while not all(len(player.cards) == n for player in game.players):
|
||||
for player in game.players:
|
||||
player.cards.append(game.deck.popleft())
|
||||
for player in game.players:
|
||||
for card in player.cards:
|
||||
card.placed_by = player
|
||||
|
||||
|
||||
@Client.on_message(filters.command("begin"))
|
||||
async def begin(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /begin command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
if lobby.creator.id != message.from_user.id:
|
||||
await message.reply_text("Only the creator can begin the game")
|
||||
elif lobby.started:
|
||||
await message.reply_text("The game has already started")
|
||||
else:
|
||||
if len(lobby.game.players) < 2:
|
||||
await message.reply_text("You need at least 2 players to play UNO!")
|
||||
return
|
||||
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
|
||||
lobby.open = False
|
||||
lobby.started = True
|
||||
pick_first_card(lobby.game)
|
||||
distribute_cards(lobby.game)
|
||||
lobby.game.current_pos = random.randint(0, len(lobby.game.players) - 1)
|
||||
lobby.game.current_player = lobby.game.players[lobby.game.current_pos]
|
||||
await message.reply_text("The game has started and the lobby is now closed to new players")
|
||||
for user in lobby.game.players:
|
||||
user.won = False
|
||||
try:
|
||||
if user.id != message.from_user.id:
|
||||
await client.send_message(user.id, f"{name} has started the game!")
|
||||
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
|
||||
for i, card in enumerate(user.cards):
|
||||
msg += f"{i + 1}. {card_to_string(card)}\n"
|
||||
await client.send_message(user.id, msg)
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
def card_to_string(card: UNOCard) -> str:
|
||||
"""
|
||||
Converts a UNO card to its corresponding
|
||||
string representation
|
||||
"""
|
||||
|
||||
maps = {
|
||||
1: "1️⃣",
|
||||
2: "2️⃣",
|
||||
3: "3️⃣",
|
||||
4: "4️⃣",
|
||||
5: "5️⃣",
|
||||
6: "6️⃣",
|
||||
7: "7️⃣",
|
||||
8: "8️⃣",
|
||||
9: "9️⃣",
|
||||
UNOCardType.NUMBER: "",
|
||||
UNOCardType.DRAW: "➕",
|
||||
UNOCardType.WILD: "🌈",
|
||||
UNOCardType.WILD_DRAW: "🌈➕4️⃣",
|
||||
UNOCardType.REVERSE: "↩️",
|
||||
UNOCardType.SKIP: "🚫",
|
||||
|
||||
UNOCardColor.BLACK: "",
|
||||
UNOCardColor.BLUE: "🔵",
|
||||
UNOCardColor.RED: "🔴",
|
||||
UNOCardColor.GREEN: "🟢",
|
||||
UNOCardColor.YELLOW: "🟡",
|
||||
}
|
||||
result = ""
|
||||
match card.kind:
|
||||
case UNOCardType.NUMBER:
|
||||
result += maps[card.metadata["value"]]
|
||||
result += maps[card.color]
|
||||
case UNOCardType.DRAW:
|
||||
result += maps[card.kind]
|
||||
result += maps[2]
|
||||
result += maps[card.color]
|
||||
case UNOCardType.REVERSE | UNOCardType.SKIP:
|
||||
result += maps[card.kind]
|
||||
result += maps[card.color]
|
||||
case UNOCardType.WILD | UNOCardType.WILD_DRAW:
|
||||
result += maps[card.kind]
|
||||
return result
|
||||
|
||||
|
||||
@Client.on_message(filters.command("info"))
|
||||
async def info(_: Client, message: Message):
|
||||
"""
|
||||
Handles the /info command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
if not lobby.started:
|
||||
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
|
||||
return
|
||||
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
|
||||
for player in lobby.game.players:
|
||||
if player.id == message.from_user.id:
|
||||
break
|
||||
for i, card in enumerate(player.cards):
|
||||
msg += f"{i + 1}. {card_to_string(card)}\n"
|
||||
await message.reply_text(msg)
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
async def check_special_cards(client: Client, game: UNOGame):
|
||||
"""
|
||||
Performs some checks on the effects of special cards and
|
||||
sends some messages to users to make them aware of any
|
||||
changes that occur
|
||||
"""
|
||||
|
||||
|
||||
@Client.on_message(filters.command("pass"))
|
||||
async def pass_turn(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /pass command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
for player in lobby.game.players:
|
||||
if player.id == message.from_user.id:
|
||||
break
|
||||
if not lobby.started:
|
||||
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
|
||||
return
|
||||
elif lobby.game.current_player != player:
|
||||
await message.reply_text("It's not your turn! Wait for your opponents to finish first")
|
||||
return
|
||||
elif USERS[message.from_user.id][0] not in {PlayerAction.END_DRAW, PlayerAction.END_THROW, PlayerAction.END_ACTION}:
|
||||
await message.reply_text("You have to draw or throw a card before passing your turn")
|
||||
return
|
||||
else:
|
||||
while True:
|
||||
if lobby.game.direction == UNODirection.CLOCKWISE:
|
||||
lobby.game.current_pos += 1
|
||||
else:
|
||||
lobby.game.current_pos -= 1
|
||||
lobby.game.current_player = lobby.game.players[lobby.game.current_pos % len(lobby.game.players)]
|
||||
if lobby.game.current_player.won:
|
||||
# Skip players who won already
|
||||
continue
|
||||
else:
|
||||
break
|
||||
USERS[message.from_user.id] = (PlayerAction.END_TURN, USERS[message.from_user.id][1])
|
||||
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
|
||||
next_name = lobby.game.current_player.name
|
||||
for user in lobby.game.players:
|
||||
if user.id != message.from_user.id:
|
||||
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
|
||||
for i, card in enumerate(user.cards):
|
||||
msg += f"{i + 1}. {card_to_string(card)}\n"
|
||||
try:
|
||||
await client.send_message(user.id, f"{name} has finished their turn, it is now {next_name}'s turn")
|
||||
await client.send_message(lobby.game.current_player.id, msg)
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
await message.reply_text("Done!")
|
||||
await check_special_cards(client, lobby.game)
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
@Client.on_message(filters.command("draw"))
|
||||
async def draw(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /draw command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
if not lobby.started:
|
||||
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
|
||||
return
|
||||
for player in lobby.game.players:
|
||||
if player.id == message.from_user.id:
|
||||
break
|
||||
if lobby.game.current_player != player:
|
||||
await message.reply_text("It's not your turn! Wait for your opponents to finish first")
|
||||
return
|
||||
elif USERS[message.from_user.id][0] in {PlayerAction.END_DRAW, PlayerAction.END_THROW, PlayerAction.END_ACTION}:
|
||||
await message.reply_text("You have already performed an action for this turn!")
|
||||
return
|
||||
for user in lobby.game.players:
|
||||
if user.id != message.from_user.id:
|
||||
try:
|
||||
await client.send_message(user.id, f"{player.name} draws a card")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
if not lobby.game.deck:
|
||||
reshuffle_table_in_deck(lobby.game)
|
||||
for user in lobby.game.players:
|
||||
try:
|
||||
await client.send_message(user.id, "The deck pile is empty! Shuffling the table back into the deck")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
player.cards.append(lobby.game.deck.popleft())
|
||||
USERS[message.from_user.id] = (PlayerAction.END_DRAW, USERS[message.from_user.id][1])
|
||||
await message.reply_text(f"You draw a card. It's {card_to_string(player.cards[-1])}")
|
||||
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
|
||||
for i, card in enumerate(player.cards):
|
||||
msg += f"{i + 1}. {card_to_string(card)}\n"
|
||||
await client.send_message(lobby.game.current_player.id, msg)
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
|
||||
|
||||
def check_next_throw(game: UNOGame, card: UNOCard) -> bool:
|
||||
"""
|
||||
Returns if the card to be thrown is valid
|
||||
"""
|
||||
|
||||
top = game.table[0]
|
||||
result = False
|
||||
match card.kind:
|
||||
case UNOCardType.NUMBER as k if top.kind == k:
|
||||
if card.metadata["value"] == top.metadata["value"]:
|
||||
result = True
|
||||
elif card.color == top.color:
|
||||
result = True
|
||||
# Can't stack colors!
|
||||
if top.placed_by and card.color == top.color and card.placed_by is top.placed_by:
|
||||
result = False
|
||||
case UNOCardType.DRAW | UNOCardType.REVERSE | UNOCardType.SKIP as k:
|
||||
result = top.kind == k and top.color == card.color
|
||||
case UNOCardType.WILD | UNOCardType.WILD_DRAW:
|
||||
result = True
|
||||
case _:
|
||||
result = False
|
||||
result = result and top.stackable
|
||||
return result
|
||||
|
||||
|
||||
@Client.on_message(filters.command("throw"))
|
||||
async def throw(client: Client, message: Message):
|
||||
"""
|
||||
Handles the /throw command
|
||||
"""
|
||||
|
||||
try:
|
||||
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
|
||||
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
|
||||
elif len(message.command) != 2:
|
||||
await message.reply_text("Invalid syntax, the correct one is `/throw <card>`", parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
lobby = data[1]
|
||||
if not lobby.started:
|
||||
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
|
||||
return
|
||||
for player in lobby.game.players:
|
||||
if player.id == message.from_user.id:
|
||||
break
|
||||
if lobby.game.current_player != player:
|
||||
await message.reply_text("It's not your turn! Wait for your opponents to finish first")
|
||||
return
|
||||
elif USERS[message.from_user.id][0] == PlayerAction.END_ACTION:
|
||||
await message.reply_text("You have already performed an action for this turn!")
|
||||
return
|
||||
card_no = 0
|
||||
if message.command[1].isnumeric():
|
||||
card_no = int(message.command[1])
|
||||
else:
|
||||
await message.reply_text("Invalid card number: it must be a positive integer")
|
||||
return
|
||||
if card_no > len(player.cards) or card_no == 0:
|
||||
await message.reply_text(f"Invalid card: you can choose between 1 and {len(player.cards)}")
|
||||
return
|
||||
if not check_next_throw(lobby.game, player.cards[card_no - 1]):
|
||||
await message.reply_text("You can't throw that card")
|
||||
return
|
||||
lobby.game.table.appendleft(player.cards.pop(card_no - 1))
|
||||
USERS[message.from_user.id] = (PlayerAction.END_THROW, USERS[message.from_user.id][1])
|
||||
for user in lobby.game.players:
|
||||
if user.id != message.from_user.id:
|
||||
try:
|
||||
await client.send_message(user.id, f"{player.name} throws {card_to_string(lobby.game.table[0])}")
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
if len(player.cards) == 0:
|
||||
if player.won:
|
||||
await message.reply_text("You already won! Please wait for the match to end")
|
||||
return
|
||||
lobby.game.winners.append(player)
|
||||
player.won = True
|
||||
await message.reply_text("You're out of cards, congrats!")
|
||||
if sum(player.won for player in lobby.game.players) == len(lobby.game.players) - 1:
|
||||
msg = "📝 Results:\n\n"
|
||||
for i, user in enumerate(lobby.game.winners):
|
||||
msg += f"- {user.name} "
|
||||
match i:
|
||||
case 0:
|
||||
msg += "🥇"
|
||||
case 1:
|
||||
msg += "🥈"
|
||||
case 2:
|
||||
msg += "🥉"
|
||||
msg += "\n"
|
||||
for player in lobby.game.players:
|
||||
if not player.won:
|
||||
msg += f"- {player.name}\n"
|
||||
# Only one player left
|
||||
for i, user in enumerate(lobby.game.players):
|
||||
user.cards = []
|
||||
USERS[user.id] = (PlayerAction.WAITING_LOBBY, lobby)
|
||||
try:
|
||||
if user.id != message.from_user.id:
|
||||
await client.send_message(user.id, f"Game over!")
|
||||
await client.send_message(user.id, msg)
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
||||
lobby.started = False
|
||||
else:
|
||||
await message.reply_text(f"You throw a card. It's {card_to_string(lobby.game.table[0])}")
|
||||
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
|
||||
for i, card in enumerate(player.cards):
|
||||
msg += f"{i + 1}. {card_to_string(card)}\n"
|
||||
await client.send_message(lobby.game.current_player.id, msg)
|
||||
except RPCError as rpc_error:
|
||||
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
|
|
@ -0,0 +1,15 @@
|
|||
[bot]
|
||||
|
||||
# Bot configuration file. Options that are left empty (with or without a '=' sign)
|
||||
# are automatically set to None. Don't mess shit up, idiot <3
|
||||
|
||||
api_id = 12345
|
||||
api_hash = API HASH HERE
|
||||
bot_token = BOT TOKEN HERE
|
||||
logging_format = [%%(levelname)s %%(asctime)s] Module '%%(module)s', function '%%(funcName)s' at line %%(lineno)d -> %%(message)s
|
||||
logging_date_format = %%d/%%m/%%Y %%T %%p
|
||||
logging_level = 20
|
||||
plugins_dir = UnoVRBot/modules
|
||||
working_directory =
|
||||
session_name = UnoVRBot
|
||||
restart_times = 0
|
|
@ -0,0 +1,161 @@
|
|||
# Copyright (C) 2022 nocturn9x
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from pyrogram import Client, idle
|
||||
from pyrogram.session import Session
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
import uvloop
|
||||
import sys
|
||||
from configparser import ConfigParser
|
||||
from UnoVRBot.misc.config import BotConfig
|
||||
from pydantic import ValidationError
|
||||
|
||||
|
||||
# Main Bot module. This is the entry point to the Bot and it loads all the modules and
|
||||
# performs automatic startup, actually pyrogram does all the job but shut up we are communist
|
||||
|
||||
|
||||
def log(prefix: str = "", message: str = ""):
|
||||
"""
|
||||
Hey I like logging but this is used only when no proper
|
||||
logging configuration is yet in place to actually be useful
|
||||
:param message: The message to log to stderr, default to ""
|
||||
:type message: str
|
||||
:param prefix: Any extra prefix to add to the message, default to ""
|
||||
Use {date} to replace it with the current date in d/m/Y format and
|
||||
{time} to replace it with the current time in %H:%M:%S %p format or
|
||||
just %T %p if you're a lazy sucker. A whitespace is automatically
|
||||
added between the prefix and the suffix just in case you're dumb
|
||||
:type prefix: str
|
||||
:return: Nothing
|
||||
:rtype: None
|
||||
"""
|
||||
|
||||
sys.stderr.write(f"{prefix.format(date=time.strftime('%d/%m/%Y'), time=time.strftime('%T %p'))} {message}\n")
|
||||
|
||||
|
||||
async def start(config: BotConfig):
|
||||
"""
|
||||
Starts the Bot, or at least tries to do so and give
|
||||
up after a while cuz we've got other shit to do than starting shitty
|
||||
Bots all day
|
||||
:param config: The BotConfig class (yeah, thanks, explanation be like)
|
||||
:type config: BotConfig
|
||||
:return: Nothing, fucker
|
||||
:rtype: None
|
||||
"""
|
||||
|
||||
log("[Bot - INFO {date} {time}]",
|
||||
"Loading an actual logging configuration")
|
||||
logging.basicConfig(format=config.logging_format,
|
||||
datefmt=config.logging_date_format,
|
||||
level=config.logging_level)
|
||||
logging.info("Now we're talking, creating the Client instance")
|
||||
Session.notice_displayed = True # Sorry but no one really cares
|
||||
logging.getLogger("pyrogram").setLevel(logging.WARNING) # Because pyrogram logs are fucking verbose
|
||||
client = Client(api_id=config.api_id,
|
||||
api_hash=config.api_hash,
|
||||
bot_token=config.bot_token,
|
||||
plugins={"root": config.plugins_dir},
|
||||
workdir=config.working_directory,
|
||||
name=config.session_name)
|
||||
try:
|
||||
await client.start()
|
||||
logging.info("Client started, goin' asleep baby")
|
||||
await idle()
|
||||
except Exception as oh_no:
|
||||
logging.error(f"A fatal error occurred -> {type(oh_no).__name__}: {oh_no}, exiting")
|
||||
try:
|
||||
logging.debug("Disconnecting from telegram")
|
||||
await client.disconnect()
|
||||
except Exception as disconnect:
|
||||
logging.debug(f"Could not disconnect -> {type(disconnect).__name__}: {disconnect}")
|
||||
finally:
|
||||
if config.restart_times:
|
||||
logging.warning(f"Restarting up to {config.restart_times} more times")
|
||||
config.restart_times -= 1
|
||||
await start(config)
|
||||
|
||||
|
||||
def check_config(parser: ConfigParser):
|
||||
"""
|
||||
Checks if a parsed .INI file actually contains the required settings
|
||||
or if the user is a sack of shit. Starts the Bot if the user
|
||||
is actually a good boy or beat him to death otherwise :)
|
||||
:param parser: A ConfigParser instance (I should really consider to become a teacher, boy)
|
||||
:type parser: ConfigParser
|
||||
:return: Again, not the slightest shit
|
||||
:rtype: None
|
||||
"""
|
||||
|
||||
getters = { # Sort of like a computed goto. Kind of, I just hate a ton of if conditions
|
||||
"api_id": (ConfigParser.getint, "int"),
|
||||
"api_hash": (ConfigParser.get, "str"),
|
||||
"logging_format": (ConfigParser.get, "int"),
|
||||
"logging_level": (ConfigParser.getint, "str"),
|
||||
"logging_date_format": (ConfigParser.get, "str"),
|
||||
"plugins_dir": (ConfigParser.get, "str"),
|
||||
"working_directory": (ConfigParser.get, "str"),
|
||||
"session_name": (ConfigParser.get, "str"),
|
||||
"restart_times": (ConfigParser.getint, "int"),
|
||||
"bot_token": (ConfigParser.get, "str")
|
||||
}
|
||||
options = {}
|
||||
if "bot" not in parser:
|
||||
log("[Bot - ERROR {date} {time}]",
|
||||
"How am I gonna start if you can't even fucking fill a config file? Missing 'bot' section")
|
||||
return
|
||||
for option in parser["bot"]:
|
||||
getter, expected = getters.get(option, (None, None))
|
||||
if not getter:
|
||||
log("[Bot - WARN {date} {time}]",
|
||||
f"Unknown option '{option}', skipping it, idiot!")
|
||||
continue
|
||||
try:
|
||||
options[option] = getter(parser, "bot", option)
|
||||
except Exception as fucked_up:
|
||||
log("[Bot - ERROR {date} {time}]",
|
||||
f"Hey! Got a type mismatch for option '{option}' (expecting '{expected}'), can you please stop being a dumbass? -> {type(fucked_up).__name__}: {fucked_up}")
|
||||
try:
|
||||
asyncio.run(start(BotConfig(**options)))
|
||||
except AttributeError: # Python 3.6
|
||||
asyncio.get_event_loop().run_until_complete(start(BotConfig(**options)))
|
||||
except ValidationError as validation_error:
|
||||
log("[Bot - ERROR {date} {time}]",
|
||||
f"What the heck did you do? Apparently some type mismatch occurred -> {validation_error}")
|
||||
|
||||
|
||||
if __name__ == "__main__": # Start dat shit boi
|
||||
uvloop.install()
|
||||
parser = ConfigParser()
|
||||
if len(sys.argv) < 2:
|
||||
# Yeah yeah say what you want fucker, but how am I gonna log without proper config in place? :\
|
||||
# Take this shitty print instead!
|
||||
log("[Bot - INFO {date} {time}]",
|
||||
"Loading config file at default path ('./config.ini'), were you too lazy to specify a custom one?")
|
||||
file = "./config.ini"
|
||||
else:
|
||||
log("[Bot - INFO {date} {time}]",
|
||||
f"I see you were not lazy! Loading config file at '{sys.argv[1]}'")
|
||||
file = sys.argv[1]
|
||||
try:
|
||||
with open(file) as config:
|
||||
parser.read_file(config)
|
||||
except Exception as fuck: # Cause we're too lazy for anything else
|
||||
log("[Bot - ERROR {date} {time}]",
|
||||
f"Sometimes thing go well, sometimes they don't, fix this shit fucker -> {type(fuck.__name__)}: {fuck}")
|
||||
else:
|
||||
log("[Bot - INFO {date} {time}]",
|
||||
"Config file loaded, let's see if you're retarded or not now, checking options")
|
||||
check_config(parser)
|
|
@ -0,0 +1,4 @@
|
|||
pyrogram
|
||||
tgcrypto
|
||||
uvloop
|
||||
pydantic
|
Loading…
Reference in New Issue