Compare commits

...

4 Commits

10 changed files with 1211 additions and 0 deletions

8
.gitignore vendored
View File

@ -138,3 +138,11 @@ dmypy.json
# Cython debug symbols
cython_debug/
*.session
*.session-journal
config.ini
bin/*
bin
bin/
pyvenv.cfg

12
UnoVRBot/misc/__init__.py Normal file
View File

@ -0,0 +1,12 @@
# Copyright (C) 2022 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

34
UnoVRBot/misc/config.py Normal file
View File

@ -0,0 +1,34 @@
# Copyright (C) 2022 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pydantic import BaseModel
from typing import Optional
import os
class BotConfig(BaseModel):
"""
A container class for bot configuration variables.
Because passing around a fuckload of global variables
wasn't bad enough
"""
api_id: int
api_hash: str
bot_token: str
logging_format: str
logging_date_format: str
logging_level: int
restart_times: Optional[int] = 0
plugins_dir: Optional[str] = "modules"
working_directory: Optional[str] = os.getcwd()
session_name: Optional[str] = "userbot"

28
UnoVRBot/misc/shared.py Normal file
View File

@ -0,0 +1,28 @@
# Copyright (C) 2022 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class SharedStatus:
"""
Simple class to share state across the userbot.
"""
def __init__(self):
self.state = {}
def add(self, key, value):
self.state[key] = value
def __getattr__(self, item):
if item in self.__dict__:
return self.__dict__[item]
return self.state[item]

51
UnoVRBot/modules/help.py Normal file
View File

@ -0,0 +1,51 @@
# Copyright (C) 2022 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pyrogram.types import Message
from pyrogram.errors import RPCError
from pyrogram import Client, filters
from pyrogram.enums.parse_mode import ParseMode
HELP_MSG = """List of available commands:
- /help: Opens help menu (you are here)
- /start: Starts the bot
- /play: Create a lobby to play UNO!
- /stop: Stops the current game
- /draw: Draw a card from the deck
- /uno: Say UNO!
- /pass: End your turn
- /join <id>: Join a game lobby
- /close: Close the game lobby to new players
- /open: Reopens a lobby to new players
- /list: List all players in the current lobby
- /leave: Leave the current lobby
- /kick <id>: Kicks a given player from the current game
- /begin: Starts the game and closes the lobby
- /broadcast <message>: Sends a message to all users
- /whisper <id> <message>: Sends a message to a single user
- /throw <card>: Throws a card onto the table
- /info: Shows the current state of the game (your cards and the table)
"""
@Client.on_message(filters.command("help"))
async def help(_: Client, message: Message):
"""
Handles the /help command
"""
try:
await message.reply_text(HELP_MSG, parse_mode=ParseMode.MARKDOWN)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")

29
UnoVRBot/modules/start.py Normal file
View File

@ -0,0 +1,29 @@
# Copyright (C) 2022 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pyrogram.types import Message
from pyrogram.errors import RPCError
from pyrogram import Client, filters
@Client.on_message(filters.command("start"))
async def start(_: Client, message: Message):
"""
Handles the /start command
"""
try:
await message.reply_text("Hi! I'm the UNO! bot. I can let you play with your friends in a group. Send /help to learn more!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")

869
UnoVRBot/modules/uno.py Normal file
View File

@ -0,0 +1,869 @@
# Copyright (C) 2022 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import logging
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
from pyrogram.types import Message
from pyrogram.errors import RPCError
from pyrogram.enums import ChatType
from pyrogram.enums.parse_mode import ParseMode
from pyrogram import Client, filters
from collections import deque, defaultdict
class UNODirection(Enum):
"""
The direction of play in the
current game
"""
CLOCKWISE: int = auto()
COUNTER_CLOCKWISE: int = auto()
class UNOCardType(Enum):
"""
A UNO card type
"""
NUMBER: int = auto()
DRAW: int = auto()
WILD: int = auto()
WILD_DRAW: int = auto()
REVERSE: int = auto()
SKIP: int = auto()
class UNOCardColor(Enum):
"""
A UNO card's color
"""
RED: int = auto()
BLUE: int = auto()
GREEN: int = auto()
YELLOW: int = auto()
BLACK: int = auto()
@dataclass
class UNOCard:
"""
A single UNO card
"""
kind: UNOCardType
color: UNOCardColor
stackable: Optional[bool] = True
placed_by: "UNOPlayer" = None
# Extra metadata
metadata: Optional[dict[str, int | str | bool]] = None
def get_default_deck(n_decks: int = 1) -> deque[UNOCard]:
"""
Generates the UNO! default deck. Can merge more than
one deck if there's more people playing (use n_decks)
"""
result = []
for __ in range(n_decks):
# Numbers: 1 to 9 for each color
for color in [UNOCardColor.GREEN, UNOCardColor.RED,
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
for _ in range(2):
for i in range(1, 10):
result.append(UNOCard(UNOCardType.NUMBER, color, metadata={"value": i}))
# 8 skip cards, 2 per color
for _ in [UNOCardColor.GREEN, UNOCardColor.RED,
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
for _ in range(2):
result.append(UNOCard(UNOCardType.SKIP, color, metadata={"exhausted": False}))
# 8 reverse cards, 2 per color
for _ in [UNOCardColor.GREEN, UNOCardColor.RED,
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
for _ in range(2):
result.append(UNOCard(UNOCardType.REVERSE, color, metadata={"exhausted": False}))
# 8 draw cards, 2 per color
for _ in [UNOCardColor.GREEN, UNOCardColor.RED,
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
for _ in range(2):
result.append(UNOCard(UNOCardType.DRAW, color, metadata={"exhausted": False}))
# Wild and wild draw cards
for _ in range(4):
result.append(UNOCard(UNOCardType.WILD, UNOCardColor.BLACK, stackable=False, metadata={"exhausted": False}))
for _ in range(4):
result.append(UNOCard(UNOCardType.WILD_DRAW, UNOCardColor.BLACK, stackable=False, metadata={"exhausted": False}))
random.shuffle(result)
return deque(result)
@dataclass
class UNOPlayer:
"""
A single UNO player
"""
id: int
name: str
cards: list[UNOCard] = field(default_factory=list)
said_uno: bool = False
won: bool = False
@dataclass
class UNOGame:
"""
A single UNO game
"""
players: list[UNOPlayer] = field(default_factory=list)
# We literally have a stack of cards, so it makes sense
# to use a deque
deck: deque[UNOCard] = field(default_factory=get_default_deck)
table: deque[UNOCard] = field(default_factory=deque)
direction: UNODirection = UNODirection.CLOCKWISE
current_pos: int = 0
current_player: Optional[UNOPlayer] = None
winners: list[UNOPlayer] = field(default_factory=list)
@dataclass
class UNOLobby:
"""
A UNO playing lobby
"""
creator: UNOPlayer
game: UNOGame
open: bool = True
started: bool = False
class PlayerAction(Enum):
"""
An enum of possible
player actions and
statuses
"""
WAITING_LOBBY: int = auto()
WAITING_TURN: int = auto()
PLAYING_TURN: int = auto()
CHOOSING_COLOR: int = auto()
END_TURN: int = auto()
END_ACTION: int = auto()
END_THROW: int = auto()
END_DRAW: int = auto()
NONE: int = auto()
LOBBIES: list[UNOLobby] = []
USERS: dict[int, tuple[PlayerAction, UNOLobby]] = defaultdict(lambda: (PlayerAction.NONE, None))
@Client.on_message(filters.command("play"))
async def play(_: Client, message: Message):
"""
Handles the /play command
"""
try:
if message.chat.type != ChatType.PRIVATE:
await message.reply_text("This command can only be used in private chats. Send /help to learn more!")
return
elif USERS[message.from_user.id][0] != PlayerAction.NONE:
await message.reply_text("You're already in a lobby: type /leave if you want to leave it or /stop if you're the creator")
return
decks = 1
if len(message.command) > 1:
if message.command[1].isnumeric():
decks = int(message.command[1])
else:
await message.reply_text("Invalid deck count: it must be a positive integer")
return
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
creator = UNOPlayer(message.from_user.id, name, [])
deck = get_default_deck(decks)
game = UNOGame(players=[creator], table=deque([deck.popleft()]), direction=UNODirection.CLOCKWISE)
LOBBIES.append(UNOLobby(creator, game))
USERS[creator.id] = (PlayerAction.WAITING_LOBBY, LOBBIES[-1])
await message.reply_text(f"Lobby created (playing with {decks} deck{'s' if decks > 1 else ''}). Tell your friends to send me `/join {message.from_user.id}` to join the game")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("close"))
async def close(_: Client, message: Message):
"""
Handles the /close command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if lobby.creator.id != message.from_user.id:
await message.reply_text("Only the creator can close the lobby")
elif not lobby.open:
await message.reply_text("The lobby is already closed")
else:
lobby.open = False
await message.reply_text("Closed the lobby to new players")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("open"))
async def open(_: Client, message: Message):
"""
Handles the /open command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if lobby.creator.id != message.from_user.id:
await message.reply_text("Only the creator can open the lobby")
elif lobby.open:
await message.reply_text("The lobby is already open")
else:
lobby.open = True
await message.reply_text("Opened the lobby to new players")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("join"))
async def join(client: Client, message: Message):
"""
Handles the /join command
"""
try:
if len(message.command) != 2:
await message.reply_text(f"Invalid syntax, the correct one is `/join <id>`", parse_mode=ParseMode.MARKDOWN)
return
elif message.command[1].isnumeric():
join_lobby = None
creator_id = int(message.command[1])
for i, lobby in enumerate(LOBBIES):
if creator_id == lobby.creator.id:
join_lobby = lobby
break
if not join_lobby:
await message.reply_text(f"Could not find lobby `{creator_id}`. It may have been deleted or not exist yet. Send /help for more info")
elif not lobby.open:
await message.reply_text(f"Sorry, but this lobby isn't accepting any new players")
else:
for player in lobby.game.players:
if player.id == message.from_user.id:
await message.reply_text("You're already in this lobby")
return
USERS[message.from_user.id] = (PlayerAction.WAITING_LOBBY, lobby)
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
lobby.game.players.append(UNOPlayer(message.from_user.id, name, []))
await message.reply_text(f"Joined lobby `{creator_id}`")
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
for user in lobby.game.players:
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"{name} has joined!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
else:
await message.reply_text("Invalid lobby ID: it must be a positive integer")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("list"))
async def list(client: Client, message: Message):
"""
Handles the /list command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
msg = f"List of players in lobby `{lobby.creator.id}`:\n"
for player in lobby.game.players:
msg += f"- {player.name} (`{player.id}`)"
if player.cards:
msg += f" (`{len(player.cards)}` card{'s' if len(player.cards) > 1 else ''})"
if lobby.started and player == lobby.game.current_player:
msg += " ⬅️"
elif player.won:
msg += " 👑"
msg += "\n"
await message.reply_text(msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("stop"))
async def stop(client: Client, message: Message):
"""
Handles the /stop command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if message.from_user.id != lobby.creator.id:
await message.reply_text("Only the creator can stop the game")
return
for user in lobby.game.players:
del USERS[user.id]
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"The game has been stopped by its creator, thanks for playing!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
LOBBIES.remove(data[1])
await message.reply_text("Lobby closed, thanks for playing!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("leave"))
async def leave(client: Client, message: Message):
"""
Handles the /leave command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if message.from_user.id == lobby.creator.id:
await message.reply_text("You can't leave a lobby as a creator: use /stop instead")
return
for player in lobby.game.players:
if player.id == message.from_user.id:
break
lobby.game.players.remove(player)
# User's cards are added at the bottom
# of the table
lobby.game.table.extend(player.cards)
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
for user in lobby.game.players:
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"{name} has left!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
del USERS[message.from_user.id]
await message.reply_text(f"Left lobby `{lobby.creator.id}`")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("kick"))
async def kick(client: Client, message: Message):
"""
Handles the /kick command
"""
try:
if len(message.command) != 2:
await message.reply_text("Invalid syntax, the correct one is `/kick <id>`", parse_mode=ParseMode.MARKDOWN)
elif (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if lobby.creator.id != message.from_user.id:
await message.reply_text("Only the creator can kick users")
elif message.command[1].isnumeric():
kicked = int(message.command[1])
if kicked == lobby.creator.id:
await message.reply_text("You can't kick yourself")
return
found = None
for player in lobby.game.players:
if player.id == kicked:
found = player
if found:
lobby.game.players.remove(player)
del USERS[kicked]
await client.send_message(kicked, f"You have been kicked from lobby `{lobby.creator.id}`")
await message.reply_text(f"Kicked `{kicked}`")
else:
await message.reply_text(f"`{kicked}` is not in the lobby")
else:
await message.reply_text("Invalid user ID: it must be a positive integer")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("broadcast"))
async def broadcast(client: Client, message: Message):
"""
Handles the /broadcast command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
elif len(message.command) <= 1:
await message.reply_text("Invalid syntax, the correct one is `/broadcast <message>`", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
for user in lobby.game.players:
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"{name} says: {message.text[11:]}")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
await message.reply_text(f"Done!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("whisper"))
async def whisper(client: Client, message: Message):
"""
Handles the /whisper command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
elif len(message.command) <= 2:
await message.reply_text("Invalid syntax, the correct one is `/whisper <id> <message>`", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
user_id = None
if message.command[1].isnumeric():
user_id = int(message.command[1])
else:
await message.reply_text("Invalid user ID: it must be a positive integer")
return
found = False
for user in lobby.game.players:
if user.id == user_id:
found = True
break
if not found:
await message.reply_text(f"`{user_id}` is not in this lobby, use /list to see the list of users")
return
try:
await client.send_message(user_id, f"{name} whispers you: {message.text[11 + len(message.command[1]) - 1:]}")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
await message.reply_text(f"Done!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
def reshuffle_table_in_deck(game: UNOGame):
"""
Called when the deck becomes empty and
the cards on the table need to be reshuffled
into it
"""
game.deck.extend(game.table)
game.table.clear()
game.table.append(game.deck.popleft())
while True:
if game.table[0].kind != UNOCardType.NUMBER:
game.table.append(game.deck.popleft())
game.deck.append(game.table.popleft())
else:
break
# Breaks up the pairs
random.shuffle(game.deck)
def pick_first_card(game: UNOGame):
"""
Picks the first card on the table from the
deck, ensuring that it is a number
"""
# Picks the first card (which can't be a special card)
game.table.append(game.deck.popleft())
while True:
if game.table[0].kind != UNOCardType.NUMBER:
game.table.append(game.deck.popleft())
game.deck.append(game.table.popleft())
else:
break
def distribute_cards(game: UNOGame, n: int = 1):
"""
Distributes n cards to all players evenly
as if they were being given in real life
one by one
"""
# Gives 7 cards to each player, one by one (i.e first one to each player, then the second, etc.)
while not all(len(player.cards) == n for player in game.players):
for player in game.players:
player.cards.append(game.deck.popleft())
for player in game.players:
for card in player.cards:
card.placed_by = player
@Client.on_message(filters.command("begin"))
async def begin(client: Client, message: Message):
"""
Handles the /begin command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if lobby.creator.id != message.from_user.id:
await message.reply_text("Only the creator can begin the game")
elif lobby.started:
await message.reply_text("The game has already started")
else:
if len(lobby.game.players) < 2:
await message.reply_text("You need at least 2 players to play UNO!")
return
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
lobby.open = False
lobby.started = True
pick_first_card(lobby.game)
distribute_cards(lobby.game)
lobby.game.current_pos = random.randint(0, len(lobby.game.players) - 1)
lobby.game.current_player = lobby.game.players[lobby.game.current_pos]
await message.reply_text("The game has started and the lobby is now closed to new players")
for user in lobby.game.players:
user.won = False
try:
if user.id != message.from_user.id:
await client.send_message(user.id, f"{name} has started the game!")
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
for i, card in enumerate(user.cards):
msg += f"{i + 1}. {card_to_string(card)}\n"
await client.send_message(user.id, msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
def card_to_string(card: UNOCard) -> str:
"""
Converts a UNO card to its corresponding
string representation
"""
maps = {
1: "1",
2: "2",
3: "3",
4: "4",
5: "5",
6: "6",
7: "7",
8: "8",
9: "9",
UNOCardType.NUMBER: "",
UNOCardType.DRAW: "",
UNOCardType.WILD: "🌈",
UNOCardType.WILD_DRAW: "🌈4",
UNOCardType.REVERSE: "↩️",
UNOCardType.SKIP: "🚫",
UNOCardColor.BLACK: "",
UNOCardColor.BLUE: "🔵",
UNOCardColor.RED: "🔴",
UNOCardColor.GREEN: "🟢",
UNOCardColor.YELLOW: "🟡",
}
result = ""
match card.kind:
case UNOCardType.NUMBER:
result += maps[card.metadata["value"]]
result += maps[card.color]
case UNOCardType.DRAW:
result += maps[card.kind]
result += maps[2]
result += maps[card.color]
case UNOCardType.REVERSE | UNOCardType.SKIP:
result += maps[card.kind]
result += maps[card.color]
case UNOCardType.WILD | UNOCardType.WILD_DRAW:
result += maps[card.kind]
return result
@Client.on_message(filters.command("info"))
async def info(_: Client, message: Message):
"""
Handles the /info command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if not lobby.started:
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
return
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
for player in lobby.game.players:
if player.id == message.from_user.id:
break
for i, card in enumerate(player.cards):
msg += f"{i + 1}. {card_to_string(card)}\n"
await message.reply_text(msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
async def check_special_cards(client: Client, game: UNOGame):
"""
Performs some checks on the effects of special cards and
sends some messages to users to make them aware of any
changes that occur
"""
@Client.on_message(filters.command("pass"))
async def pass_turn(client: Client, message: Message):
"""
Handles the /pass command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
for player in lobby.game.players:
if player.id == message.from_user.id:
break
if not lobby.started:
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
return
elif lobby.game.current_player != player:
await message.reply_text("It's not your turn! Wait for your opponents to finish first")
return
elif USERS[message.from_user.id][0] not in {PlayerAction.END_DRAW, PlayerAction.END_THROW, PlayerAction.END_ACTION}:
await message.reply_text("You have to draw or throw a card before passing your turn")
return
else:
while True:
if lobby.game.direction == UNODirection.CLOCKWISE:
lobby.game.current_pos += 1
else:
lobby.game.current_pos -= 1
lobby.game.current_player = lobby.game.players[lobby.game.current_pos % len(lobby.game.players)]
if lobby.game.current_player.won:
# Skip players who won already
continue
else:
break
USERS[message.from_user.id] = (PlayerAction.END_TURN, USERS[message.from_user.id][1])
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
next_name = lobby.game.current_player.name
for user in lobby.game.players:
if user.id != message.from_user.id:
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
for i, card in enumerate(user.cards):
msg += f"{i + 1}. {card_to_string(card)}\n"
try:
await client.send_message(user.id, f"{name} has finished their turn, it is now {next_name}'s turn")
await client.send_message(lobby.game.current_player.id, msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
await message.reply_text("Done!")
await check_special_cards(client, lobby.game)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("draw"))
async def draw(client: Client, message: Message):
"""
Handles the /draw command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if not lobby.started:
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
return
for player in lobby.game.players:
if player.id == message.from_user.id:
break
if lobby.game.current_player != player:
await message.reply_text("It's not your turn! Wait for your opponents to finish first")
return
elif USERS[message.from_user.id][0] in {PlayerAction.END_DRAW, PlayerAction.END_THROW, PlayerAction.END_ACTION}:
await message.reply_text("You have already performed an action for this turn!")
return
for user in lobby.game.players:
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"{player.name} draws a card")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
if not lobby.game.deck:
reshuffle_table_in_deck(lobby.game)
for user in lobby.game.players:
try:
await client.send_message(user.id, "The deck pile is empty! Shuffling the table back into the deck")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
player.cards.append(lobby.game.deck.popleft())
USERS[message.from_user.id] = (PlayerAction.END_DRAW, USERS[message.from_user.id][1])
await message.reply_text(f"You draw a card. It's {card_to_string(player.cards[-1])}")
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
for i, card in enumerate(player.cards):
msg += f"{i + 1}. {card_to_string(card)}\n"
await client.send_message(lobby.game.current_player.id, msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
def check_next_throw(game: UNOGame, card: UNOCard) -> bool:
"""
Returns if the card to be thrown is valid
"""
top = game.table[0]
result = False
match card.kind:
case UNOCardType.NUMBER as k if top.kind == k:
if card.metadata["value"] == top.metadata["value"]:
result = True
elif card.color == top.color:
result = True
# Can't stack colors!
if top.placed_by and card.color == top.color and card.placed_by is top.placed_by:
result = False
case UNOCardType.DRAW | UNOCardType.REVERSE | UNOCardType.SKIP as k:
result = top.kind == k and top.color == card.color
case UNOCardType.WILD | UNOCardType.WILD_DRAW:
result = True
case _:
result = False
result = result and top.stackable
return result
@Client.on_message(filters.command("throw"))
async def throw(client: Client, message: Message):
"""
Handles the /throw command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
elif len(message.command) != 2:
await message.reply_text("Invalid syntax, the correct one is `/throw <card>`", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if not lobby.started:
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
return
for player in lobby.game.players:
if player.id == message.from_user.id:
break
if lobby.game.current_player != player:
await message.reply_text("It's not your turn! Wait for your opponents to finish first")
return
elif USERS[message.from_user.id][0] == PlayerAction.END_ACTION:
await message.reply_text("You have already performed an action for this turn!")
return
card_no = 0
if message.command[1].isnumeric():
card_no = int(message.command[1])
else:
await message.reply_text("Invalid card number: it must be a positive integer")
return
if card_no > len(player.cards) or card_no == 0:
await message.reply_text(f"Invalid card: you can choose between 1 and {len(player.cards)}")
return
if not check_next_throw(lobby.game, player.cards[card_no - 1]):
await message.reply_text("You can't throw that card")
return
lobby.game.table.appendleft(player.cards.pop(card_no - 1))
USERS[message.from_user.id] = (PlayerAction.END_THROW, USERS[message.from_user.id][1])
for user in lobby.game.players:
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"{player.name} throws {card_to_string(lobby.game.table[0])}")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
if len(player.cards) == 0:
if player.won:
await message.reply_text("You already won! Please wait for the match to end")
return
lobby.game.winners.append(player)
player.won = True
await message.reply_text("You're out of cards, congrats!")
if sum(player.won for player in lobby.game.players) == len(lobby.game.players) - 1:
msg = "📝 Results:\n\n"
for i, user in enumerate(lobby.game.winners):
msg += f"- {user.name} "
match i:
case 0:
msg += "🥇"
case 1:
msg += "🥈"
case 2:
msg += "🥉"
msg += "\n"
for player in lobby.game.players:
if not player.won:
msg += f"- {player.name}\n"
# Only one player left
for i, user in enumerate(lobby.game.players):
user.cards = []
USERS[user.id] = (PlayerAction.WAITING_LOBBY, lobby)
try:
if user.id != message.from_user.id:
await client.send_message(user.id, f"Game over!")
await client.send_message(user.id, msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
lobby.started = False
else:
await message.reply_text(f"You throw a card. It's {card_to_string(lobby.game.table[0])}")
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
for i, card in enumerate(player.cards):
msg += f"{i + 1}. {card_to_string(card)}\n"
await client.send_message(lobby.game.current_player.id, msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")

15
config.ini.example Normal file
View File

@ -0,0 +1,15 @@
[bot]
# Bot configuration file. Options that are left empty (with or without a '=' sign)
# are automatically set to None. Don't mess shit up, idiot <3
api_id = 12345
api_hash = API HASH HERE
bot_token = BOT TOKEN HERE
logging_format = [%%(levelname)s %%(asctime)s] Module '%%(module)s', function '%%(funcName)s' at line %%(lineno)d -> %%(message)s
logging_date_format = %%d/%%m/%%Y %%T %%p
logging_level = 20
plugins_dir = UnoVRBot/modules
working_directory =
session_name = UnoVRBot
restart_times = 0

161
main.py Normal file
View File

@ -0,0 +1,161 @@
# Copyright (C) 2022 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyrogram import Client, idle
from pyrogram.session import Session
import asyncio
import logging
import time
import uvloop
import sys
from configparser import ConfigParser
from UnoVRBot.misc.config import BotConfig
from pydantic import ValidationError
# Main Bot module. This is the entry point to the Bot and it loads all the modules and
# performs automatic startup, actually pyrogram does all the job but shut up we are communist
def log(prefix: str = "", message: str = ""):
"""
Hey I like logging but this is used only when no proper
logging configuration is yet in place to actually be useful
:param message: The message to log to stderr, default to ""
:type message: str
:param prefix: Any extra prefix to add to the message, default to ""
Use {date} to replace it with the current date in d/m/Y format and
{time} to replace it with the current time in %H:%M:%S %p format or
just %T %p if you're a lazy sucker. A whitespace is automatically
added between the prefix and the suffix just in case you're dumb
:type prefix: str
:return: Nothing
:rtype: None
"""
sys.stderr.write(f"{prefix.format(date=time.strftime('%d/%m/%Y'), time=time.strftime('%T %p'))} {message}\n")
async def start(config: BotConfig):
"""
Starts the Bot, or at least tries to do so and give
up after a while cuz we've got other shit to do than starting shitty
Bots all day
:param config: The BotConfig class (yeah, thanks, explanation be like)
:type config: BotConfig
:return: Nothing, fucker
:rtype: None
"""
log("[Bot - INFO {date} {time}]",
"Loading an actual logging configuration")
logging.basicConfig(format=config.logging_format,
datefmt=config.logging_date_format,
level=config.logging_level)
logging.info("Now we're talking, creating the Client instance")
Session.notice_displayed = True # Sorry but no one really cares
logging.getLogger("pyrogram").setLevel(logging.WARNING) # Because pyrogram logs are fucking verbose
client = Client(api_id=config.api_id,
api_hash=config.api_hash,
bot_token=config.bot_token,
plugins={"root": config.plugins_dir},
workdir=config.working_directory,
name=config.session_name)
try:
await client.start()
logging.info("Client started, goin' asleep baby")
await idle()
except Exception as oh_no:
logging.error(f"A fatal error occurred -> {type(oh_no).__name__}: {oh_no}, exiting")
try:
logging.debug("Disconnecting from telegram")
await client.disconnect()
except Exception as disconnect:
logging.debug(f"Could not disconnect -> {type(disconnect).__name__}: {disconnect}")
finally:
if config.restart_times:
logging.warning(f"Restarting up to {config.restart_times} more times")
config.restart_times -= 1
await start(config)
def check_config(parser: ConfigParser):
"""
Checks if a parsed .INI file actually contains the required settings
or if the user is a sack of shit. Starts the Bot if the user
is actually a good boy or beat him to death otherwise :)
:param parser: A ConfigParser instance (I should really consider to become a teacher, boy)
:type parser: ConfigParser
:return: Again, not the slightest shit
:rtype: None
"""
getters = { # Sort of like a computed goto. Kind of, I just hate a ton of if conditions
"api_id": (ConfigParser.getint, "int"),
"api_hash": (ConfigParser.get, "str"),
"logging_format": (ConfigParser.get, "int"),
"logging_level": (ConfigParser.getint, "str"),
"logging_date_format": (ConfigParser.get, "str"),
"plugins_dir": (ConfigParser.get, "str"),
"working_directory": (ConfigParser.get, "str"),
"session_name": (ConfigParser.get, "str"),
"restart_times": (ConfigParser.getint, "int"),
"bot_token": (ConfigParser.get, "str")
}
options = {}
if "bot" not in parser:
log("[Bot - ERROR {date} {time}]",
"How am I gonna start if you can't even fucking fill a config file? Missing 'bot' section")
return
for option in parser["bot"]:
getter, expected = getters.get(option, (None, None))
if not getter:
log("[Bot - WARN {date} {time}]",
f"Unknown option '{option}', skipping it, idiot!")
continue
try:
options[option] = getter(parser, "bot", option)
except Exception as fucked_up:
log("[Bot - ERROR {date} {time}]",
f"Hey! Got a type mismatch for option '{option}' (expecting '{expected}'), can you please stop being a dumbass? -> {type(fucked_up).__name__}: {fucked_up}")
try:
asyncio.run(start(BotConfig(**options)))
except AttributeError: # Python 3.6
asyncio.get_event_loop().run_until_complete(start(BotConfig(**options)))
except ValidationError as validation_error:
log("[Bot - ERROR {date} {time}]",
f"What the heck did you do? Apparently some type mismatch occurred -> {validation_error}")
if __name__ == "__main__": # Start dat shit boi
uvloop.install()
parser = ConfigParser()
if len(sys.argv) < 2:
# Yeah yeah say what you want fucker, but how am I gonna log without proper config in place? :\
# Take this shitty print instead!
log("[Bot - INFO {date} {time}]",
"Loading config file at default path ('./config.ini'), were you too lazy to specify a custom one?")
file = "./config.ini"
else:
log("[Bot - INFO {date} {time}]",
f"I see you were not lazy! Loading config file at '{sys.argv[1]}'")
file = sys.argv[1]
try:
with open(file) as config:
parser.read_file(config)
except Exception as fuck: # Cause we're too lazy for anything else
log("[Bot - ERROR {date} {time}]",
f"Sometimes thing go well, sometimes they don't, fix this shit fucker -> {type(fuck.__name__)}: {fuck}")
else:
log("[Bot - INFO {date} {time}]",
"Config file loaded, let's see if you're retarded or not now, checking options")
check_config(parser)

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
pyrogram
tgcrypto
uvloop
pydantic