UnoVRBot/UnoVRBot/modules/uno.py

940 lines
40 KiB
Python
Raw Normal View History

# Copyright (C) 2022 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import logging
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
2022-06-15 23:08:49 +02:00
from matplotlib.colors import same_color
from pyrogram.types import Message
from pyrogram.errors import RPCError
from pyrogram.enums import ChatType
from pyrogram.enums.parse_mode import ParseMode
from pyrogram import Client, filters
from collections import deque, defaultdict
class UNODirection(Enum):
"""
The direction of play in the
current game
"""
CLOCKWISE: int = auto()
COUNTER_CLOCKWISE: int = auto()
class UNOCardType(Enum):
"""
A UNO card type
"""
NUMBER: int = auto()
DRAW: int = auto()
WILD: int = auto()
WILD_DRAW: int = auto()
REVERSE: int = auto()
SKIP: int = auto()
class UNOCardColor(Enum):
"""
A UNO card's color
"""
RED: int = auto()
BLUE: int = auto()
GREEN: int = auto()
YELLOW: int = auto()
BLACK: int = auto()
@dataclass
class UNOCard:
"""
A single UNO card
"""
kind: UNOCardType
color: UNOCardColor
stackable: Optional[bool] = True
placed_by: "UNOPlayer" = None
# Extra metadata
metadata: Optional[dict[str, int | str | bool]] = None
2022-06-15 23:08:49 +02:00
placed_when: int = -1
def get_default_deck(n_decks: int = 1) -> deque[UNOCard]:
"""
Generates the UNO! default deck. Can merge more than
one deck if there's more people playing (use n_decks)
"""
result = []
2022-06-15 23:08:49 +02:00
for __ in range(0, n_decks, 1):
2022-06-13 01:18:12 +02:00
# Numbers: 1 to 9 for each color (twice for each color)
for color in [UNOCardColor.GREEN, UNOCardColor.RED,
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
for _ in range(2):
for i in range(1, 10):
result.append(UNOCard(UNOCardType.NUMBER, color, metadata={"value": i}))
# 8 skip cards, 2 per color
for _ in [UNOCardColor.GREEN, UNOCardColor.RED,
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
for _ in range(2):
result.append(UNOCard(UNOCardType.SKIP, color, metadata={"exhausted": False}))
# 8 reverse cards, 2 per color
for _ in [UNOCardColor.GREEN, UNOCardColor.RED,
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
for _ in range(2):
result.append(UNOCard(UNOCardType.REVERSE, color, metadata={"exhausted": False}))
# 8 draw cards, 2 per color
for _ in [UNOCardColor.GREEN, UNOCardColor.RED,
UNOCardColor.YELLOW, UNOCardColor.BLUE]:
for _ in range(2):
result.append(UNOCard(UNOCardType.DRAW, color, metadata={"exhausted": False}))
# Wild and wild draw cards
for _ in range(4):
2022-06-15 23:08:49 +02:00
result.append(UNOCard(UNOCardType.WILD, UNOCardColor.BLACK, stackable=False, metadata={"exhausted": False, "color": None}))
for _ in range(4):
2022-06-15 23:08:49 +02:00
result.append(UNOCard(UNOCardType.WILD_DRAW, UNOCardColor.BLACK, stackable=False, metadata={"exhausted": False, "color": None}))
random.shuffle(result)
return deque(result)
@dataclass
class UNOPlayer:
"""
A single UNO player
"""
id: int
name: str
cards: list[UNOCard] = field(default_factory=list)
said_uno: bool = False
won: bool = False
@dataclass
class UNOGame:
"""
A single UNO game
"""
players: list[UNOPlayer] = field(default_factory=list)
# We literally have a stack of cards, so it makes sense
# to use a deque
deck: deque[UNOCard] = field(default_factory=get_default_deck)
table: deque[UNOCard] = field(default_factory=deque)
direction: UNODirection = UNODirection.CLOCKWISE
current_pos: int = 0
current_player: Optional[UNOPlayer] = None
winners: list[UNOPlayer] = field(default_factory=list)
2022-06-15 23:08:49 +02:00
round: int = 0
@dataclass
class UNOLobby:
"""
A UNO playing lobby
"""
creator: UNOPlayer
game: UNOGame
open: bool = True
started: bool = False
class PlayerAction(Enum):
"""
An enum of possible
player actions and
statuses
"""
WAITING_LOBBY: int = auto()
WAITING_TURN: int = auto()
PLAYING_TURN: int = auto()
CHOOSING_COLOR: int = auto()
END_TURN: int = auto()
END_ACTION: int = auto()
END_THROW: int = auto()
END_DRAW: int = auto()
NONE: int = auto()
LOBBIES: list[UNOLobby] = []
USERS: dict[int, tuple[PlayerAction, UNOLobby]] = defaultdict(lambda: (PlayerAction.NONE, None))
@Client.on_message(filters.command("play"))
async def play(_: Client, message: Message):
"""
Handles the /play command
"""
try:
if message.chat.type != ChatType.PRIVATE:
await message.reply_text("This command can only be used in private chats. Send /help to learn more!")
return
elif USERS[message.from_user.id][0] != PlayerAction.NONE:
await message.reply_text("You're already in a lobby: type /leave if you want to leave it or /stop if you're the creator")
return
decks = 1
if len(message.command) > 1:
if message.command[1].isnumeric():
decks = int(message.command[1])
else:
await message.reply_text("Invalid deck count: it must be a positive integer")
return
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
creator = UNOPlayer(message.from_user.id, name, [])
deck = get_default_deck(decks)
2022-06-13 01:18:12 +02:00
game = UNOGame(players=[creator], deck=deck, table=deque([deck.popleft()]), direction=UNODirection.CLOCKWISE)
LOBBIES.append(UNOLobby(creator, game))
USERS[creator.id] = (PlayerAction.WAITING_LOBBY, LOBBIES[-1])
await message.reply_text(f"Lobby created (playing with {decks} deck{'s' if decks > 1 else ''}). Tell your friends to send me `/join {message.from_user.id}` to join the game")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("close"))
async def close(_: Client, message: Message):
"""
Handles the /close command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if lobby.creator.id != message.from_user.id:
await message.reply_text("Only the creator can close the lobby")
elif not lobby.open:
await message.reply_text("The lobby is already closed")
else:
lobby.open = False
await message.reply_text("Closed the lobby to new players")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("open"))
async def open(_: Client, message: Message):
"""
Handles the /open command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if lobby.creator.id != message.from_user.id:
await message.reply_text("Only the creator can open the lobby")
elif lobby.open:
await message.reply_text("The lobby is already open")
else:
lobby.open = True
await message.reply_text("Opened the lobby to new players")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("join"))
async def join(client: Client, message: Message):
"""
Handles the /join command
"""
try:
if len(message.command) != 2:
await message.reply_text(f"Invalid syntax, the correct one is `/join <id>`", parse_mode=ParseMode.MARKDOWN)
return
elif message.command[1].isnumeric():
join_lobby = None
creator_id = int(message.command[1])
for i, lobby in enumerate(LOBBIES):
if creator_id == lobby.creator.id:
join_lobby = lobby
break
if not join_lobby:
await message.reply_text(f"Could not find lobby `{creator_id}`. It may have been deleted or not exist yet. Send /help for more info")
elif not lobby.open:
await message.reply_text(f"Sorry, but this lobby isn't accepting any new players")
else:
for player in lobby.game.players:
if player.id == message.from_user.id:
await message.reply_text("You're already in this lobby")
return
USERS[message.from_user.id] = (PlayerAction.WAITING_LOBBY, lobby)
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
lobby.game.players.append(UNOPlayer(message.from_user.id, name, []))
await message.reply_text(f"Joined lobby `{creator_id}`")
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
for user in lobby.game.players:
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"{name} has joined!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
else:
await message.reply_text("Invalid lobby ID: it must be a positive integer")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("list"))
async def list(client: Client, message: Message):
"""
Handles the /list command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
msg = f"List of players in lobby `{lobby.creator.id}`:\n"
for player in lobby.game.players:
msg += f"- {player.name} (`{player.id}`)"
if player.cards:
msg += f" (`{len(player.cards)}` card{'s' if len(player.cards) > 1 else ''})"
if lobby.started and player == lobby.game.current_player:
msg += " ⬅️"
elif player.won:
msg += " 👑"
msg += "\n"
await message.reply_text(msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("stop"))
async def stop(client: Client, message: Message):
"""
Handles the /stop command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if message.from_user.id != lobby.creator.id:
await message.reply_text("Only the creator can stop the game")
return
for user in lobby.game.players:
del USERS[user.id]
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"The game has been stopped by its creator, thanks for playing!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
LOBBIES.remove(data[1])
await message.reply_text("Lobby closed, thanks for playing!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("leave"))
async def leave(client: Client, message: Message):
"""
Handles the /leave command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if message.from_user.id == lobby.creator.id:
await message.reply_text("You can't leave a lobby as a creator: use /stop instead")
return
for player in lobby.game.players:
if player.id == message.from_user.id:
break
lobby.game.players.remove(player)
# User's cards are added at the bottom
# of the table
lobby.game.table.extend(player.cards)
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
for user in lobby.game.players:
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"{name} has left!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
del USERS[message.from_user.id]
await message.reply_text(f"Left lobby `{lobby.creator.id}`")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("kick"))
async def kick(client: Client, message: Message):
"""
Handles the /kick command
"""
try:
if len(message.command) != 2:
await message.reply_text("Invalid syntax, the correct one is `/kick <id>`", parse_mode=ParseMode.MARKDOWN)
elif (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if lobby.creator.id != message.from_user.id:
await message.reply_text("Only the creator can kick users")
elif message.command[1].isnumeric():
kicked = int(message.command[1])
if kicked == lobby.creator.id:
await message.reply_text("You can't kick yourself")
return
found = None
for player in lobby.game.players:
if player.id == kicked:
found = player
if found:
lobby.game.players.remove(player)
del USERS[kicked]
await client.send_message(kicked, f"You have been kicked from lobby `{lobby.creator.id}`")
await message.reply_text(f"Kicked `{kicked}`")
else:
await message.reply_text(f"`{kicked}` is not in the lobby")
else:
await message.reply_text("Invalid user ID: it must be a positive integer")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("broadcast"))
async def broadcast(client: Client, message: Message):
"""
Handles the /broadcast command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
elif len(message.command) <= 1:
await message.reply_text("Invalid syntax, the correct one is `/broadcast <message>`", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
for user in lobby.game.players:
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"{name} says: {message.text[11:]}")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
await message.reply_text(f"Done!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("whisper"))
async def whisper(client: Client, message: Message):
"""
Handles the /whisper command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
elif len(message.command) <= 2:
await message.reply_text("Invalid syntax, the correct one is `/whisper <id> <message>`", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
user_id = None
if message.command[1].isnumeric():
user_id = int(message.command[1])
else:
await message.reply_text("Invalid user ID: it must be a positive integer")
return
found = False
for user in lobby.game.players:
if user.id == user_id:
found = True
break
if not found:
await message.reply_text(f"`{user_id}` is not in this lobby, use /list to see the list of users")
return
try:
await client.send_message(user_id, f"{name} whispers you: {message.text[11 + len(message.command[1]) - 1:]}")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
await message.reply_text(f"Done!")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
def reshuffle_table_in_deck(game: UNOGame):
"""
Called when the deck becomes empty and
the cards on the table need to be reshuffled
into it
"""
game.deck.extend(game.table)
game.table.clear()
game.table.append(game.deck.popleft())
while True:
if game.table[0].kind != UNOCardType.NUMBER:
game.table.append(game.deck.popleft())
game.deck.append(game.table.popleft())
else:
break
# Breaks up the pairs
random.shuffle(game.deck)
def pick_first_card(game: UNOGame):
"""
Picks the first card on the table from the
deck, ensuring that it is a number
"""
# Picks the first card (which can't be a special card)
game.table.append(game.deck.popleft())
while True:
if game.table[0].kind != UNOCardType.NUMBER:
game.table.append(game.deck.popleft())
game.deck.append(game.table.popleft())
else:
break
2022-06-15 23:08:49 +02:00
def distribute_cards(game: UNOGame, n: int = 7):
"""
Distributes n cards to all players evenly
as if they were being given in real life
one by one
"""
# Gives 7 cards to each player, one by one (i.e first one to each player, then the second, etc.)
while not all(len(player.cards) == n for player in game.players):
for player in game.players:
player.cards.append(game.deck.popleft())
@Client.on_message(filters.command("begin"))
async def begin(client: Client, message: Message):
"""
Handles the /begin command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if lobby.creator.id != message.from_user.id:
await message.reply_text("Only the creator can begin the game")
elif lobby.started:
await message.reply_text("The game has already started")
else:
if len(lobby.game.players) < 2:
await message.reply_text("You need at least 2 players to play UNO!")
return
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
lobby.open = False
lobby.started = True
pick_first_card(lobby.game)
distribute_cards(lobby.game)
lobby.game.current_pos = random.randint(0, len(lobby.game.players) - 1)
lobby.game.current_player = lobby.game.players[lobby.game.current_pos]
await message.reply_text("The game has started and the lobby is now closed to new players")
for user in lobby.game.players:
user.won = False
try:
if user.id != message.from_user.id:
await client.send_message(user.id, f"{name} has started the game!")
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
for i, card in enumerate(user.cards):
msg += f"{i + 1}. {card_to_string(card)}\n"
await client.send_message(user.id, msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
def card_to_string(card: UNOCard) -> str:
"""
Converts a UNO card to its corresponding
string representation
"""
maps = {
1: "1",
2: "2",
3: "3",
4: "4",
5: "5",
6: "6",
7: "7",
8: "8",
9: "9",
UNOCardType.NUMBER: "",
UNOCardType.DRAW: "",
UNOCardType.WILD: "🌈",
UNOCardType.WILD_DRAW: "🌈4",
UNOCardType.REVERSE: "↩️",
UNOCardType.SKIP: "🚫",
UNOCardColor.BLACK: "",
UNOCardColor.BLUE: "🔵",
UNOCardColor.RED: "🔴",
UNOCardColor.GREEN: "🟢",
UNOCardColor.YELLOW: "🟡",
}
2022-06-15 23:08:49 +02:00
print(maps)
result = ""
match card.kind:
case UNOCardType.NUMBER:
result += maps[card.metadata["value"]]
result += maps[card.color]
case UNOCardType.DRAW:
result += maps[card.kind]
result += maps[2]
result += maps[card.color]
case UNOCardType.REVERSE | UNOCardType.SKIP:
result += maps[card.kind]
result += maps[card.color]
case UNOCardType.WILD | UNOCardType.WILD_DRAW:
result += maps[card.kind]
return result
@Client.on_message(filters.command("info"))
async def info(_: Client, message: Message):
"""
Handles the /info command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if not lobby.started:
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
return
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
for player in lobby.game.players:
if player.id == message.from_user.id:
break
for i, card in enumerate(player.cards):
msg += f"{i + 1}. {card_to_string(card)}\n"
await message.reply_text(msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
2022-06-15 23:08:49 +02:00
def pick_next_player(game: UNOGame):
"""
Picks the next player in the queue and schedules
their turn
"""
while True:
game.current_pos += 1
game.current_player = game.players[game.current_pos % len(game.players)]
if game.current_player.won:
# Skip players who won already
continue
else:
break
async def check_special_cards(client: Client, game: UNOGame):
"""
Performs some checks on the effects of special cards and
sends some messages to users to make them aware of any
changes that occur
"""
2022-06-15 23:08:49 +02:00
try:
draw = 0
cards: list[UNOCard] = []
i = 0
while game.table[i].kind in {UNOCardType.WILD, UNOCardType.WILD_DRAW,
UNOCardType.REVERSE, UNOCardType.SKIP,
UNOCardType.DRAW} and not game.table[i].metadata["exhausted"]:
cards.append(game.table[i])
i += 1
for card in cards:
card.metadata["exhausted"] = True
match card.kind:
case UNOCardType.WILD | UNOCardType.WILD_DRAW as k:
await client.send_message(game.current_player.id, "Choose the color for the new top card between red, green and blue")
USERS[game.table[0].placed_by][0] = PlayerAction.CHOOSING_COLOR
found = False
if k == UNOCardType.WILD_DRAW:
for next_card in game.current_player.cards:
if next_card.kind == UNOCardType.WILD_DRAW:
found = True
break
if not found:
draw += 4
case UNOCardType.REVERSE:
for user in game.players:
if user != card.placed_by:
await client.send_message(user.id, "The direction of play reverses!")
game.players = list(reversed(game.players))
case UNOCardType.DRAW:
found = False
for next_card in game.current_player.cards:
if next_card.kind == UNOCardType.DRAW:
found = True
break
if not found:
draw += 2
case UNOCardType.SKIP:
for user in game.players:
if user != card.placed_by:
await client.send_message(user.id, "A turn has been skipped!")
pick_next_player(game)
if draw:
await client.send_message(game.current_player.id, f"You are foced to draw {draw} cards!")
for _ in range(draw):
if not game.deck:
reshuffle_table_in_deck(game)
game.current_player.cards.append(game.deck.popleft())
await client.send_message(game.current_player.id, f"You draw a card. It's {card_to_string(game.current_player.cards[-1])}")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("pass"))
async def pass_turn(client: Client, message: Message):
"""
Handles the /pass command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
2022-06-15 23:08:49 +02:00
elif data[0] == PlayerAction.CHOOSING_COLOR:
await message.reply_text("You need to choose a color between red, green, blue and yellow before passing your turn")
else:
lobby = data[1]
for player in lobby.game.players:
if player.id == message.from_user.id:
break
if not lobby.started:
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
return
elif lobby.game.current_player != player:
await message.reply_text("It's not your turn! Wait for your opponents to finish first")
return
elif USERS[message.from_user.id][0] not in {PlayerAction.END_DRAW, PlayerAction.END_THROW, PlayerAction.END_ACTION}:
await message.reply_text("You have to draw or throw a card before passing your turn")
return
else:
2022-06-15 23:08:49 +02:00
pick_next_player(lobby.game)
lobby.game.round += 1
USERS[message.from_user.id] = (PlayerAction.END_TURN, USERS[message.from_user.id][1])
name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}"
next_name = lobby.game.current_player.name
for user in lobby.game.players:
if user.id != message.from_user.id:
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
for i, card in enumerate(user.cards):
msg += f"{i + 1}. {card_to_string(card)}\n"
try:
await client.send_message(user.id, f"{name} has finished their turn, it is now {next_name}'s turn")
await client.send_message(lobby.game.current_player.id, msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
await message.reply_text("Done!")
await check_special_cards(client, lobby.game)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
@Client.on_message(filters.command("draw"))
async def draw(client: Client, message: Message):
"""
Handles the /draw command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if not lobby.started:
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
return
for player in lobby.game.players:
if player.id == message.from_user.id:
break
if lobby.game.current_player != player:
await message.reply_text("It's not your turn! Wait for your opponents to finish first")
return
elif USERS[message.from_user.id][0] in {PlayerAction.END_DRAW, PlayerAction.END_THROW, PlayerAction.END_ACTION}:
await message.reply_text("You have already performed an action for this turn!")
return
for user in lobby.game.players:
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"{player.name} draws a card")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
if not lobby.game.deck:
reshuffle_table_in_deck(lobby.game)
for user in lobby.game.players:
try:
await client.send_message(user.id, "The deck pile is empty! Shuffling the table back into the deck")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
player.cards.append(lobby.game.deck.popleft())
USERS[message.from_user.id] = (PlayerAction.END_DRAW, USERS[message.from_user.id][1])
await message.reply_text(f"You draw a card. It's {card_to_string(player.cards[-1])}")
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
for i, card in enumerate(player.cards):
msg += f"{i + 1}. {card_to_string(card)}\n"
await client.send_message(lobby.game.current_player.id, msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
def check_next_throw(game: UNOGame, card: UNOCard) -> bool:
"""
Returns if the card to be thrown is valid
"""
top = game.table[0]
2022-06-15 23:08:49 +02:00
top_color = top.color
card_color = card.color
if top.kind in {UNOCardType.WILD, UNOCardType.WILD_DRAW}:
top_color = top.metadata["color"]
if card.kind in {UNOCardType.WILD, UNOCardType.WILD_DRAW}:
card_color = card.metadata["color"]
same_color = card_color == top_color
same_turn = top.placed_when == game.round
match top.kind:
case UNOCardType.NUMBER as k if card.kind == k:
# Same number or same color in this turn
return top.metadata["value"] == card.metadata["value"] or (same_color and not same_turn)
case UNOCardType.NUMBER as k if card.kind != k:
# Same as above, without the number check
return same_color and not same_turn
case UNOCardType.SKIP | UNOCardType.REVERSE | UNOCardType.DRAW:
result = same_color and not same_turn
if top.kind == card.kind:
# Can you stack this special card in a turn?
result = result and top.stackable
return result
case UNOCardType.WILD | UNOCardType.WILD_DRAW:
2022-06-15 23:08:49 +02:00
return False # TODO
case _:
2022-06-15 23:08:49 +02:00
return False
@Client.on_message(filters.command("throw"))
async def throw(client: Client, message: Message):
"""
Handles the /throw command
"""
try:
if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE:
await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join <id> to join an existing one", parse_mode=ParseMode.MARKDOWN)
elif len(message.command) != 2:
await message.reply_text("Invalid syntax, the correct one is `/throw <card>`", parse_mode=ParseMode.MARKDOWN)
else:
lobby = data[1]
if not lobby.started:
await message.reply_text("The game hasn't started yet, ask the creator to send /begin!")
return
for player in lobby.game.players:
if player.id == message.from_user.id:
break
if lobby.game.current_player != player:
await message.reply_text("It's not your turn! Wait for your opponents to finish first")
return
elif USERS[message.from_user.id][0] == PlayerAction.END_ACTION:
await message.reply_text("You have already performed an action for this turn!")
return
card_no = 0
if message.command[1].isnumeric():
card_no = int(message.command[1])
else:
await message.reply_text("Invalid card number: it must be a positive integer")
return
if card_no > len(player.cards) or card_no == 0:
await message.reply_text(f"Invalid card: you can choose between 1 and {len(player.cards)}")
return
2022-06-15 23:08:49 +02:00
player.cards[card_no - 1].placed_when = lobby.game.round
player.cards[card_no - 1].placed_by = player
if not check_next_throw(lobby.game, player.cards[card_no - 1]):
await message.reply_text("You can't throw that card")
return
lobby.game.table.appendleft(player.cards.pop(card_no - 1))
USERS[message.from_user.id] = (PlayerAction.END_THROW, USERS[message.from_user.id][1])
for user in lobby.game.players:
if user.id != message.from_user.id:
try:
await client.send_message(user.id, f"{player.name} throws {card_to_string(lobby.game.table[0])}")
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
if len(player.cards) == 0:
if player.won:
await message.reply_text("You already won! Please wait for the match to end")
return
lobby.game.winners.append(player)
player.won = True
await message.reply_text("You're out of cards, congrats!")
if sum(player.won for player in lobby.game.players) == len(lobby.game.players) - 1:
msg = "📝 Results:\n\n"
for i, user in enumerate(lobby.game.winners):
msg += f"- {user.name} "
match i:
case 0:
msg += "🥇"
case 1:
msg += "🥈"
case 2:
msg += "🥉"
msg += "\n"
for player in lobby.game.players:
if not player.won:
msg += f"- {player.name}\n"
# Only one player left
for i, user in enumerate(lobby.game.players):
user.cards = []
USERS[user.id] = (PlayerAction.WAITING_LOBBY, lobby)
try:
if user.id != message.from_user.id:
await client.send_message(user.id, f"Game over!")
await client.send_message(user.id, msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")
lobby.started = False
else:
await message.reply_text(f"You throw a card. It's {card_to_string(lobby.game.table[0])}")
msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n"
for i, card in enumerate(player.cards):
msg += f"{i + 1}. {card_to_string(card)}\n"
2022-06-15 23:08:49 +02:00
await client.send_message(lobby.game.current_player.id, msg)
except RPCError as rpc_error:
logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}")