# Copyright (C) 2022 nocturn9x # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import random import logging from dataclasses import dataclass, field from enum import Enum, auto from typing import Optional from pyrogram.types import Message from pyrogram.errors import RPCError from pyrogram.enums import ChatType from pyrogram.enums.parse_mode import ParseMode from pyrogram import Client, filters from collections import deque, defaultdict class UNODirection(Enum): """ The direction of play in the current game """ CLOCKWISE: int = auto() COUNTER_CLOCKWISE: int = auto() class UNOCardType(Enum): """ A UNO card type """ NUMBER: int = auto() DRAW: int = auto() WILD: int = auto() WILD_DRAW: int = auto() REVERSE: int = auto() SKIP: int = auto() class UNOCardColor(Enum): """ A UNO card's color """ RED: int = auto() BLUE: int = auto() GREEN: int = auto() YELLOW: int = auto() BLACK: int = auto() @dataclass class UNOCard: """ A single UNO card """ kind: UNOCardType color: UNOCardColor stackable: Optional[bool] = True placed_by: "UNOPlayer" = None # Extra metadata metadata: Optional[dict[str, int | str | bool]] = None def get_default_deck(n_decks: int = 1) -> deque[UNOCard]: """ Generates the UNO! default deck. Can merge more than one deck if there's more people playing (use n_decks) """ result = [] for __ in range(n_decks): # Numbers: 1 to 9 for each color for color in [UNOCardColor.GREEN, UNOCardColor.RED, UNOCardColor.YELLOW, UNOCardColor.BLUE]: for _ in range(2): for i in range(1, 10): result.append(UNOCard(UNOCardType.NUMBER, color, metadata={"value": i})) # 8 skip cards, 2 per color for _ in [UNOCardColor.GREEN, UNOCardColor.RED, UNOCardColor.YELLOW, UNOCardColor.BLUE]: for _ in range(2): result.append(UNOCard(UNOCardType.SKIP, color, metadata={"exhausted": False})) # 8 reverse cards, 2 per color for _ in [UNOCardColor.GREEN, UNOCardColor.RED, UNOCardColor.YELLOW, UNOCardColor.BLUE]: for _ in range(2): result.append(UNOCard(UNOCardType.REVERSE, color, metadata={"exhausted": False})) # 8 draw cards, 2 per color for _ in [UNOCardColor.GREEN, UNOCardColor.RED, UNOCardColor.YELLOW, UNOCardColor.BLUE]: for _ in range(2): result.append(UNOCard(UNOCardType.DRAW, color, metadata={"exhausted": False})) # Wild and wild draw cards for _ in range(4): result.append(UNOCard(UNOCardType.WILD, UNOCardColor.BLACK, stackable=False, metadata={"exhausted": False})) for _ in range(4): result.append(UNOCard(UNOCardType.WILD_DRAW, UNOCardColor.BLACK, stackable=False, metadata={"exhausted": False})) random.shuffle(result) return deque(result) @dataclass class UNOPlayer: """ A single UNO player """ id: int name: str cards: list[UNOCard] = field(default_factory=list) said_uno: bool = False won: bool = False @dataclass class UNOGame: """ A single UNO game """ players: list[UNOPlayer] = field(default_factory=list) # We literally have a stack of cards, so it makes sense # to use a deque deck: deque[UNOCard] = field(default_factory=get_default_deck) table: deque[UNOCard] = field(default_factory=deque) direction: UNODirection = UNODirection.CLOCKWISE current_pos: int = 0 current_player: Optional[UNOPlayer] = None winners: list[UNOPlayer] = field(default_factory=list) @dataclass class UNOLobby: """ A UNO playing lobby """ creator: UNOPlayer game: UNOGame open: bool = True started: bool = False class PlayerAction(Enum): """ An enum of possible player actions and statuses """ WAITING_LOBBY: int = auto() WAITING_TURN: int = auto() PLAYING_TURN: int = auto() CHOOSING_COLOR: int = auto() END_TURN: int = auto() END_ACTION: int = auto() END_THROW: int = auto() END_DRAW: int = auto() NONE: int = auto() LOBBIES: list[UNOLobby] = [] USERS: dict[int, tuple[PlayerAction, UNOLobby]] = defaultdict(lambda: (PlayerAction.NONE, None)) @Client.on_message(filters.command("play")) async def play(_: Client, message: Message): """ Handles the /play command """ try: if message.chat.type != ChatType.PRIVATE: await message.reply_text("This command can only be used in private chats. Send /help to learn more!") return elif USERS[message.from_user.id][0] != PlayerAction.NONE: await message.reply_text("You're already in a lobby: type /leave if you want to leave it or /stop if you're the creator") return decks = 1 if len(message.command) > 1: if message.command[1].isnumeric(): decks = int(message.command[1]) else: await message.reply_text("Invalid deck count: it must be a positive integer") return name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" creator = UNOPlayer(message.from_user.id, name, []) deck = get_default_deck(decks) game = UNOGame(players=[creator], table=deque([deck.popleft()]), direction=UNODirection.CLOCKWISE) LOBBIES.append(UNOLobby(creator, game)) USERS[creator.id] = (PlayerAction.WAITING_LOBBY, LOBBIES[-1]) await message.reply_text(f"Lobby created (playing with {decks} deck{'s' if decks > 1 else ''}). Tell your friends to send me `/join {message.from_user.id}` to join the game") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") @Client.on_message(filters.command("close")) async def close(_: Client, message: Message): """ Handles the /close command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] if lobby.creator.id != message.from_user.id: await message.reply_text("Only the creator can close the lobby") elif not lobby.open: await message.reply_text("The lobby is already closed") else: lobby.open = False await message.reply_text("Closed the lobby to new players") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") @Client.on_message(filters.command("open")) async def open(_: Client, message: Message): """ Handles the /open command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] if lobby.creator.id != message.from_user.id: await message.reply_text("Only the creator can open the lobby") elif lobby.open: await message.reply_text("The lobby is already open") else: lobby.open = True await message.reply_text("Opened the lobby to new players") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") @Client.on_message(filters.command("join")) async def join(client: Client, message: Message): """ Handles the /join command """ try: if len(message.command) != 2: await message.reply_text(f"Invalid syntax, the correct one is `/join `", parse_mode=ParseMode.MARKDOWN) return elif message.command[1].isnumeric(): join_lobby = None creator_id = int(message.command[1]) for i, lobby in enumerate(LOBBIES): if creator_id == lobby.creator.id: join_lobby = lobby break if not join_lobby: await message.reply_text(f"Could not find lobby `{creator_id}`. It may have been deleted or not exist yet. Send /help for more info") elif not lobby.open: await message.reply_text(f"Sorry, but this lobby isn't accepting any new players") else: for player in lobby.game.players: if player.id == message.from_user.id: await message.reply_text("You're already in this lobby") return USERS[message.from_user.id] = (PlayerAction.WAITING_LOBBY, lobby) name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" lobby.game.players.append(UNOPlayer(message.from_user.id, name, [])) await message.reply_text(f"Joined lobby `{creator_id}`") name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" for user in lobby.game.players: if user.id != message.from_user.id: try: await client.send_message(user.id, f"{name} has joined!") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") else: await message.reply_text("Invalid lobby ID: it must be a positive integer") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") @Client.on_message(filters.command("list")) async def list(client: Client, message: Message): """ Handles the /list command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] msg = f"List of players in lobby `{lobby.creator.id}`:\n" for player in lobby.game.players: msg += f"- {player.name} (`{player.id}`)" if player.cards: msg += f" (`{len(player.cards)}` card{'s' if len(player.cards) > 1 else ''})" if lobby.started and player == lobby.game.current_player: msg += " ⬅️" elif player.won: msg += " 👑" msg += "\n" await message.reply_text(msg) except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") @Client.on_message(filters.command("stop")) async def stop(client: Client, message: Message): """ Handles the /stop command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] if message.from_user.id != lobby.creator.id: await message.reply_text("Only the creator can stop the game") return for user in lobby.game.players: del USERS[user.id] if user.id != message.from_user.id: try: await client.send_message(user.id, f"The game has been stopped by its creator, thanks for playing!") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") LOBBIES.remove(data[1]) await message.reply_text("Lobby closed, thanks for playing!") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") @Client.on_message(filters.command("leave")) async def leave(client: Client, message: Message): """ Handles the /leave command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] if message.from_user.id == lobby.creator.id: await message.reply_text("You can't leave a lobby as a creator: use /stop instead") return for player in lobby.game.players: if player.id == message.from_user.id: break lobby.game.players.remove(player) # User's cards are added at the bottom # of the table lobby.game.table.extend(player.cards) name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" for user in lobby.game.players: if user.id != message.from_user.id: try: await client.send_message(user.id, f"{name} has left!") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") del USERS[message.from_user.id] await message.reply_text(f"Left lobby `{lobby.creator.id}`") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") @Client.on_message(filters.command("kick")) async def kick(client: Client, message: Message): """ Handles the /kick command """ try: if len(message.command) != 2: await message.reply_text("Invalid syntax, the correct one is `/kick `", parse_mode=ParseMode.MARKDOWN) elif (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] if lobby.creator.id != message.from_user.id: await message.reply_text("Only the creator can kick users") elif message.command[1].isnumeric(): kicked = int(message.command[1]) if kicked == lobby.creator.id: await message.reply_text("You can't kick yourself") return found = None for player in lobby.game.players: if player.id == kicked: found = player if found: lobby.game.players.remove(player) del USERS[kicked] await client.send_message(kicked, f"You have been kicked from lobby `{lobby.creator.id}`") await message.reply_text(f"Kicked `{kicked}`") else: await message.reply_text(f"`{kicked}` is not in the lobby") else: await message.reply_text("Invalid user ID: it must be a positive integer") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") @Client.on_message(filters.command("broadcast")) async def broadcast(client: Client, message: Message): """ Handles the /broadcast command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) elif len(message.command) <= 1: await message.reply_text("Invalid syntax, the correct one is `/broadcast `", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" for user in lobby.game.players: if user.id != message.from_user.id: try: await client.send_message(user.id, f"{name} says: {message.text[11:]}") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") await message.reply_text(f"Done!") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") @Client.on_message(filters.command("whisper")) async def whisper(client: Client, message: Message): """ Handles the /whisper command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) elif len(message.command) <= 2: await message.reply_text("Invalid syntax, the correct one is `/whisper `", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" user_id = None if message.command[1].isnumeric(): user_id = int(message.command[1]) else: await message.reply_text("Invalid user ID: it must be a positive integer") return found = False for user in lobby.game.players: if user.id == user_id: found = True break if not found: await message.reply_text(f"`{user_id}` is not in this lobby, use /list to see the list of users") return try: await client.send_message(user_id, f"{name} whispers you: {message.text[11 + len(message.command[1]) - 1:]}") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") await message.reply_text(f"Done!") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") def reshuffle_table_in_deck(game: UNOGame): """ Called when the deck becomes empty and the cards on the table need to be reshuffled into it """ game.deck.extend(game.table) game.table.clear() game.table.append(game.deck.popleft()) while True: if game.table[0].kind != UNOCardType.NUMBER: game.table.append(game.deck.popleft()) game.deck.append(game.table.popleft()) else: break # Breaks up the pairs random.shuffle(game.deck) def pick_first_card(game: UNOGame): """ Picks the first card on the table from the deck, ensuring that it is a number """ # Picks the first card (which can't be a special card) game.table.append(game.deck.popleft()) while True: if game.table[0].kind != UNOCardType.NUMBER: game.table.append(game.deck.popleft()) game.deck.append(game.table.popleft()) else: break def distribute_cards(game: UNOGame, n: int = 1): """ Distributes n cards to all players evenly as if they were being given in real life one by one """ # Gives 7 cards to each player, one by one (i.e first one to each player, then the second, etc.) while not all(len(player.cards) == n for player in game.players): for player in game.players: player.cards.append(game.deck.popleft()) for player in game.players: for card in player.cards: card.placed_by = player @Client.on_message(filters.command("begin")) async def begin(client: Client, message: Message): """ Handles the /begin command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] if lobby.creator.id != message.from_user.id: await message.reply_text("Only the creator can begin the game") elif lobby.started: await message.reply_text("The game has already started") else: if len(lobby.game.players) < 2: await message.reply_text("You need at least 2 players to play UNO!") return name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" lobby.open = False lobby.started = True pick_first_card(lobby.game) distribute_cards(lobby.game) lobby.game.current_pos = random.randint(0, len(lobby.game.players) - 1) lobby.game.current_player = lobby.game.players[lobby.game.current_pos] await message.reply_text("The game has started and the lobby is now closed to new players") for user in lobby.game.players: user.won = False try: if user.id != message.from_user.id: await client.send_message(user.id, f"{name} has started the game!") msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n" for i, card in enumerate(user.cards): msg += f"{i + 1}. {card_to_string(card)}\n" await client.send_message(user.id, msg) except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") def card_to_string(card: UNOCard) -> str: """ Converts a UNO card to its corresponding string representation """ maps = { 1: "1️⃣", 2: "2️⃣", 3: "3️⃣", 4: "4️⃣", 5: "5️⃣", 6: "6️⃣", 7: "7️⃣", 8: "8️⃣", 9: "9️⃣", UNOCardType.NUMBER: "", UNOCardType.DRAW: "➕", UNOCardType.WILD: "🌈", UNOCardType.WILD_DRAW: "🌈➕4️⃣", UNOCardType.REVERSE: "↩️", UNOCardType.SKIP: "🚫", UNOCardColor.BLACK: "", UNOCardColor.BLUE: "🔵", UNOCardColor.RED: "🔴", UNOCardColor.GREEN: "🟢", UNOCardColor.YELLOW: "🟡", } result = "" match card.kind: case UNOCardType.NUMBER: result += maps[card.metadata["value"]] result += maps[card.color] case UNOCardType.DRAW: result += maps[card.kind] result += maps[2] result += maps[card.color] case UNOCardType.REVERSE | UNOCardType.SKIP: result += maps[card.kind] result += maps[card.color] case UNOCardType.WILD | UNOCardType.WILD_DRAW: result += maps[card.kind] return result @Client.on_message(filters.command("info")) async def info(_: Client, message: Message): """ Handles the /info command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] if not lobby.started: await message.reply_text("The game hasn't started yet, ask the creator to send /begin!") return msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n" for player in lobby.game.players: if player.id == message.from_user.id: break for i, card in enumerate(player.cards): msg += f"{i + 1}. {card_to_string(card)}\n" await message.reply_text(msg) except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") async def check_special_cards(client: Client, game: UNOGame): """ Performs some checks on the effects of special cards and sends some messages to users to make them aware of any changes that occur """ @Client.on_message(filters.command("pass")) async def pass_turn(client: Client, message: Message): """ Handles the /pass command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] for player in lobby.game.players: if player.id == message.from_user.id: break if not lobby.started: await message.reply_text("The game hasn't started yet, ask the creator to send /begin!") return elif lobby.game.current_player != player: await message.reply_text("It's not your turn! Wait for your opponents to finish first") return elif USERS[message.from_user.id][0] not in {PlayerAction.END_DRAW, PlayerAction.END_THROW, PlayerAction.END_ACTION}: await message.reply_text("You have to draw or throw a card before passing your turn") return else: while True: if lobby.game.direction == UNODirection.CLOCKWISE: lobby.game.current_pos += 1 else: lobby.game.current_pos -= 1 lobby.game.current_player = lobby.game.players[lobby.game.current_pos % len(lobby.game.players)] if lobby.game.current_player.won: # Skip players who won already continue else: break USERS[message.from_user.id] = (PlayerAction.END_TURN, USERS[message.from_user.id][1]) name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" next_name = lobby.game.current_player.name for user in lobby.game.players: if user.id != message.from_user.id: msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n" for i, card in enumerate(user.cards): msg += f"{i + 1}. {card_to_string(card)}\n" try: await client.send_message(user.id, f"{name} has finished their turn, it is now {next_name}'s turn") await client.send_message(lobby.game.current_player.id, msg) except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") await message.reply_text("Done!") await check_special_cards(client, lobby.game) except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") @Client.on_message(filters.command("draw")) async def draw(client: Client, message: Message): """ Handles the /draw command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] if not lobby.started: await message.reply_text("The game hasn't started yet, ask the creator to send /begin!") return for player in lobby.game.players: if player.id == message.from_user.id: break if lobby.game.current_player != player: await message.reply_text("It's not your turn! Wait for your opponents to finish first") return elif USERS[message.from_user.id][0] in {PlayerAction.END_DRAW, PlayerAction.END_THROW, PlayerAction.END_ACTION}: await message.reply_text("You have already performed an action for this turn!") return for user in lobby.game.players: if user.id != message.from_user.id: try: await client.send_message(user.id, f"{player.name} draws a card") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") if not lobby.game.deck: reshuffle_table_in_deck(lobby.game) for user in lobby.game.players: try: await client.send_message(user.id, "The deck pile is empty! Shuffling the table back into the deck") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") player.cards.append(lobby.game.deck.popleft()) USERS[message.from_user.id] = (PlayerAction.END_DRAW, USERS[message.from_user.id][1]) await message.reply_text(f"You draw a card. It's {card_to_string(player.cards[-1])}") msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n" for i, card in enumerate(player.cards): msg += f"{i + 1}. {card_to_string(card)}\n" await client.send_message(lobby.game.current_player.id, msg) except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") def check_next_throw(game: UNOGame, card: UNOCard) -> bool: """ Returns if the card to be thrown is valid """ top = game.table[0] result = False match card.kind: case UNOCardType.NUMBER as k if top.kind == k: if card.metadata["value"] == top.metadata["value"]: result = True elif card.color == top.color: result = True # Can't stack colors! if top.placed_by and card.color == top.color and card.placed_by is top.placed_by: result = False case UNOCardType.DRAW | UNOCardType.REVERSE | UNOCardType.SKIP as k: result = top.kind == k and top.color == card.color case UNOCardType.WILD | UNOCardType.WILD_DRAW: result = True case _: result = False result = result and top.stackable return result @Client.on_message(filters.command("throw")) async def throw(client: Client, message: Message): """ Handles the /throw command """ try: if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) elif len(message.command) != 2: await message.reply_text("Invalid syntax, the correct one is `/throw `", parse_mode=ParseMode.MARKDOWN) else: lobby = data[1] if not lobby.started: await message.reply_text("The game hasn't started yet, ask the creator to send /begin!") return for player in lobby.game.players: if player.id == message.from_user.id: break if lobby.game.current_player != player: await message.reply_text("It's not your turn! Wait for your opponents to finish first") return elif USERS[message.from_user.id][0] == PlayerAction.END_ACTION: await message.reply_text("You have already performed an action for this turn!") return card_no = 0 if message.command[1].isnumeric(): card_no = int(message.command[1]) else: await message.reply_text("Invalid card number: it must be a positive integer") return if card_no > len(player.cards) or card_no == 0: await message.reply_text(f"Invalid card: you can choose between 1 and {len(player.cards)}") return if not check_next_throw(lobby.game, player.cards[card_no - 1]): await message.reply_text("You can't throw that card") return lobby.game.table.appendleft(player.cards.pop(card_no - 1)) USERS[message.from_user.id] = (PlayerAction.END_THROW, USERS[message.from_user.id][1]) for user in lobby.game.players: if user.id != message.from_user.id: try: await client.send_message(user.id, f"{player.name} throws {card_to_string(lobby.game.table[0])}") except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") if len(player.cards) == 0: if player.won: await message.reply_text("You already won! Please wait for the match to end") return lobby.game.winners.append(player) player.won = True await message.reply_text("You're out of cards, congrats!") if sum(player.won for player in lobby.game.players) == len(lobby.game.players) - 1: msg = "📝 Results:\n\n" for i, user in enumerate(lobby.game.winners): msg += f"- {user.name} " match i: case 0: msg += "🥇" case 1: msg += "🥈" case 2: msg += "🥉" msg += "\n" for player in lobby.game.players: if not player.won: msg += f"- {player.name}\n" # Only one player left for i, user in enumerate(lobby.game.players): user.cards = [] USERS[user.id] = (PlayerAction.WAITING_LOBBY, lobby) try: if user.id != message.from_user.id: await client.send_message(user.id, f"Game over!") await client.send_message(user.id, msg) except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") lobby.started = False else: await message.reply_text(f"You throw a card. It's {card_to_string(lobby.game.table[0])}") msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n" for i, card in enumerate(player.cards): msg += f"{i + 1}. {card_to_string(card)}\n" await client.send_message(lobby.game.current_player.id, msg) except RPCError as rpc_error: logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")