diff --git a/UnoVRBot/misc/__init__.py b/UnoVRBot/misc/__init__.py new file mode 100644 index 0000000..8c1e1cc --- /dev/null +++ b/UnoVRBot/misc/__init__.py @@ -0,0 +1,12 @@ +# Copyright (C) 2022 nocturn9x +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/UnoVRBot/misc/config.py b/UnoVRBot/misc/config.py new file mode 100644 index 0000000..7b3d309 --- /dev/null +++ b/UnoVRBot/misc/config.py @@ -0,0 +1,34 @@ +# Copyright (C) 2022 nocturn9x +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pydantic import BaseModel +from typing import Optional +import os + + +class BotConfig(BaseModel): + """ + A container class for bot configuration variables. + Because passing around a fuckload of global variables + wasn't bad enough + """ + + api_id: int + api_hash: str + bot_token: str + logging_format: str + logging_date_format: str + logging_level: int + restart_times: Optional[int] = 0 + plugins_dir: Optional[str] = "modules" + working_directory: Optional[str] = os.getcwd() + session_name: Optional[str] = "userbot" \ No newline at end of file diff --git a/UnoVRBot/misc/shared.py b/UnoVRBot/misc/shared.py new file mode 100644 index 0000000..be0b624 --- /dev/null +++ b/UnoVRBot/misc/shared.py @@ -0,0 +1,28 @@ +# Copyright (C) 2022 nocturn9x +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +class SharedStatus: + + """ + Simple class to share state across the userbot. + """ + + def __init__(self): + self.state = {} + + def add(self, key, value): + self.state[key] = value + + def __getattr__(self, item): + if item in self.__dict__: + return self.__dict__[item] + return self.state[item] \ No newline at end of file diff --git a/UnoVRBot/modules/help.py b/UnoVRBot/modules/help.py new file mode 100644 index 0000000..8c648e7 --- /dev/null +++ b/UnoVRBot/modules/help.py @@ -0,0 +1,51 @@ +# Copyright (C) 2022 nocturn9x +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from pyrogram.types import Message +from pyrogram.errors import RPCError +from pyrogram import Client, filters +from pyrogram.enums.parse_mode import ParseMode + + +HELP_MSG = """List of available commands: +- /help: Opens help menu (you are here) +- /start: Starts the bot +- /play: Create a lobby to play UNO! +- /stop: Stops the current game +- /draw: Draw a card from the deck +- /uno: Say UNO! +- /pass: End your turn +- /join : Join a game lobby +- /close: Close the game lobby to new players +- /open: Reopens a lobby to new players +- /list: List all players in the current lobby +- /leave: Leave the current lobby +- /kick : Kicks a given player from the current game +- /begin: Starts the game and closes the lobby +- /broadcast : Sends a message to all users +- /whisper : Sends a message to a single user +- /throw : Throws a card onto the table +- /info: Shows the current state of the game (your cards and the table) +""" + + +@Client.on_message(filters.command("help")) +async def help(_: Client, message: Message): + """ + Handles the /help command + """ + + try: + await message.reply_text(HELP_MSG, parse_mode=ParseMode.MARKDOWN) + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") diff --git a/UnoVRBot/modules/start.py b/UnoVRBot/modules/start.py new file mode 100644 index 0000000..526e300 --- /dev/null +++ b/UnoVRBot/modules/start.py @@ -0,0 +1,29 @@ +# Copyright (C) 2022 nocturn9x +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from pyrogram.types import Message +from pyrogram.errors import RPCError +from pyrogram import Client, filters + + + +@Client.on_message(filters.command("start")) +async def start(_: Client, message: Message): + """ + Handles the /start command + """ + + try: + await message.reply_text("Hi! I'm the UNO! bot. I can let you play with your friends in a group. Send /help to learn more!") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") diff --git a/UnoVRBot/modules/uno.py b/UnoVRBot/modules/uno.py new file mode 100644 index 0000000..239f9d5 --- /dev/null +++ b/UnoVRBot/modules/uno.py @@ -0,0 +1,869 @@ +# Copyright (C) 2022 nocturn9x +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import logging +from dataclasses import dataclass, field +from enum import Enum, auto +from typing import Optional +from pyrogram.types import Message +from pyrogram.errors import RPCError +from pyrogram.enums import ChatType +from pyrogram.enums.parse_mode import ParseMode +from pyrogram import Client, filters +from collections import deque, defaultdict + + +class UNODirection(Enum): + """ + The direction of play in the + current game + """ + + CLOCKWISE: int = auto() + COUNTER_CLOCKWISE: int = auto() + + +class UNOCardType(Enum): + """ + A UNO card type + """ + + NUMBER: int = auto() + DRAW: int = auto() + WILD: int = auto() + WILD_DRAW: int = auto() + REVERSE: int = auto() + SKIP: int = auto() + + +class UNOCardColor(Enum): + """ + A UNO card's color + """ + + RED: int = auto() + BLUE: int = auto() + GREEN: int = auto() + YELLOW: int = auto() + BLACK: int = auto() + + +@dataclass +class UNOCard: + """ + A single UNO card + """ + + kind: UNOCardType + color: UNOCardColor + stackable: Optional[bool] = True + placed_by: "UNOPlayer" = None + # Extra metadata + metadata: Optional[dict[str, int | str | bool]] = None + + +def get_default_deck(n_decks: int = 1) -> deque[UNOCard]: + """ + Generates the UNO! default deck. Can merge more than + one deck if there's more people playing (use n_decks) + """ + + result = [] + for __ in range(n_decks): + # Numbers: 1 to 9 for each color + for color in [UNOCardColor.GREEN, UNOCardColor.RED, + UNOCardColor.YELLOW, UNOCardColor.BLUE]: + for _ in range(2): + for i in range(1, 10): + result.append(UNOCard(UNOCardType.NUMBER, color, metadata={"value": i})) + # 8 skip cards, 2 per color + for _ in [UNOCardColor.GREEN, UNOCardColor.RED, + UNOCardColor.YELLOW, UNOCardColor.BLUE]: + for _ in range(2): + result.append(UNOCard(UNOCardType.SKIP, color, metadata={"exhausted": False})) + # 8 reverse cards, 2 per color + for _ in [UNOCardColor.GREEN, UNOCardColor.RED, + UNOCardColor.YELLOW, UNOCardColor.BLUE]: + for _ in range(2): + result.append(UNOCard(UNOCardType.REVERSE, color, metadata={"exhausted": False})) + # 8 draw cards, 2 per color + for _ in [UNOCardColor.GREEN, UNOCardColor.RED, + UNOCardColor.YELLOW, UNOCardColor.BLUE]: + for _ in range(2): + result.append(UNOCard(UNOCardType.DRAW, color, metadata={"exhausted": False})) + # Wild and wild draw cards + for _ in range(4): + result.append(UNOCard(UNOCardType.WILD, UNOCardColor.BLACK, stackable=False, metadata={"exhausted": False})) + for _ in range(4): + result.append(UNOCard(UNOCardType.WILD_DRAW, UNOCardColor.BLACK, stackable=False, metadata={"exhausted": False})) + random.shuffle(result) + return deque(result) + + +@dataclass +class UNOPlayer: + """ + A single UNO player + """ + + id: int + name: str + cards: list[UNOCard] = field(default_factory=list) + said_uno: bool = False + won: bool = False + + +@dataclass +class UNOGame: + """ + A single UNO game + """ + + players: list[UNOPlayer] = field(default_factory=list) + # We literally have a stack of cards, so it makes sense + # to use a deque + deck: deque[UNOCard] = field(default_factory=get_default_deck) + table: deque[UNOCard] = field(default_factory=deque) + direction: UNODirection = UNODirection.CLOCKWISE + current_pos: int = 0 + current_player: Optional[UNOPlayer] = None + winners: list[UNOPlayer] = field(default_factory=list) + + +@dataclass +class UNOLobby: + """ + A UNO playing lobby + """ + + creator: UNOPlayer + game: UNOGame + open: bool = True + started: bool = False + + + +class PlayerAction(Enum): + """ + An enum of possible + player actions and + statuses + """ + + WAITING_LOBBY: int = auto() + WAITING_TURN: int = auto() + PLAYING_TURN: int = auto() + CHOOSING_COLOR: int = auto() + END_TURN: int = auto() + END_ACTION: int = auto() + END_THROW: int = auto() + END_DRAW: int = auto() + NONE: int = auto() + + +LOBBIES: list[UNOLobby] = [] +USERS: dict[int, tuple[PlayerAction, UNOLobby]] = defaultdict(lambda: (PlayerAction.NONE, None)) + + +@Client.on_message(filters.command("play")) +async def play(_: Client, message: Message): + """ + Handles the /play command + """ + + try: + if message.chat.type != ChatType.PRIVATE: + await message.reply_text("This command can only be used in private chats. Send /help to learn more!") + return + elif USERS[message.from_user.id][0] != PlayerAction.NONE: + await message.reply_text("You're already in a lobby: type /leave if you want to leave it or /stop if you're the creator") + return + decks = 1 + if len(message.command) > 1: + if message.command[1].isnumeric(): + decks = int(message.command[1]) + else: + await message.reply_text("Invalid deck count: it must be a positive integer") + return + name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" + creator = UNOPlayer(message.from_user.id, name, []) + deck = get_default_deck(decks) + game = UNOGame(players=[creator], table=deque([deck.popleft()]), direction=UNODirection.CLOCKWISE) + LOBBIES.append(UNOLobby(creator, game)) + USERS[creator.id] = (PlayerAction.WAITING_LOBBY, LOBBIES[-1]) + await message.reply_text(f"Lobby created (playing with {decks} deck{'s' if decks > 1 else ''}). Tell your friends to send me `/join {message.from_user.id}` to join the game") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +@Client.on_message(filters.command("close")) +async def close(_: Client, message: Message): + """ + Handles the /close command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + if lobby.creator.id != message.from_user.id: + await message.reply_text("Only the creator can close the lobby") + elif not lobby.open: + await message.reply_text("The lobby is already closed") + else: + lobby.open = False + await message.reply_text("Closed the lobby to new players") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +@Client.on_message(filters.command("open")) +async def open(_: Client, message: Message): + """ + Handles the /open command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + if lobby.creator.id != message.from_user.id: + await message.reply_text("Only the creator can open the lobby") + elif lobby.open: + await message.reply_text("The lobby is already open") + else: + lobby.open = True + await message.reply_text("Opened the lobby to new players") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +@Client.on_message(filters.command("join")) +async def join(client: Client, message: Message): + """ + Handles the /join command + """ + + try: + if len(message.command) != 2: + await message.reply_text(f"Invalid syntax, the correct one is `/join `", parse_mode=ParseMode.MARKDOWN) + return + elif message.command[1].isnumeric(): + join_lobby = None + creator_id = int(message.command[1]) + for i, lobby in enumerate(LOBBIES): + if creator_id == lobby.creator.id: + join_lobby = lobby + break + if not join_lobby: + await message.reply_text(f"Could not find lobby `{creator_id}`. It may have been deleted or not exist yet. Send /help for more info") + elif not lobby.open: + await message.reply_text(f"Sorry, but this lobby isn't accepting any new players") + else: + for player in lobby.game.players: + if player.id == message.from_user.id: + await message.reply_text("You're already in this lobby") + return + USERS[message.from_user.id] = (PlayerAction.WAITING_LOBBY, lobby) + name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" + lobby.game.players.append(UNOPlayer(message.from_user.id, name, [])) + await message.reply_text(f"Joined lobby `{creator_id}`") + name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" + for user in lobby.game.players: + if user.id != message.from_user.id: + try: + await client.send_message(user.id, f"{name} has joined!") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + else: + await message.reply_text("Invalid lobby ID: it must be a positive integer") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +@Client.on_message(filters.command("list")) +async def list(client: Client, message: Message): + """ + Handles the /list command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + msg = f"List of players in lobby `{lobby.creator.id}`:\n" + for player in lobby.game.players: + msg += f"- {player.name} (`{player.id}`)" + if player.cards: + msg += f" (`{len(player.cards)}` card{'s' if len(player.cards) > 1 else ''})" + if lobby.started and player == lobby.game.current_player: + msg += " ⬅️" + elif player.won: + msg += " 👑" + msg += "\n" + await message.reply_text(msg) + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +@Client.on_message(filters.command("stop")) +async def stop(client: Client, message: Message): + """ + Handles the /stop command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + if message.from_user.id != lobby.creator.id: + await message.reply_text("Only the creator can stop the game") + return + for user in lobby.game.players: + del USERS[user.id] + if user.id != message.from_user.id: + try: + await client.send_message(user.id, f"The game has been stopped by its creator, thanks for playing!") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + LOBBIES.remove(data[1]) + await message.reply_text("Lobby closed, thanks for playing!") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + + +@Client.on_message(filters.command("leave")) +async def leave(client: Client, message: Message): + """ + Handles the /leave command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + if message.from_user.id == lobby.creator.id: + await message.reply_text("You can't leave a lobby as a creator: use /stop instead") + return + for player in lobby.game.players: + if player.id == message.from_user.id: + break + lobby.game.players.remove(player) + # User's cards are added at the bottom + # of the table + lobby.game.table.extend(player.cards) + name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" + for user in lobby.game.players: + if user.id != message.from_user.id: + try: + await client.send_message(user.id, f"{name} has left!") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + del USERS[message.from_user.id] + await message.reply_text(f"Left lobby `{lobby.creator.id}`") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +@Client.on_message(filters.command("kick")) +async def kick(client: Client, message: Message): + """ + Handles the /kick command + """ + + try: + if len(message.command) != 2: + await message.reply_text("Invalid syntax, the correct one is `/kick `", parse_mode=ParseMode.MARKDOWN) + elif (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + if lobby.creator.id != message.from_user.id: + await message.reply_text("Only the creator can kick users") + elif message.command[1].isnumeric(): + kicked = int(message.command[1]) + if kicked == lobby.creator.id: + await message.reply_text("You can't kick yourself") + return + found = None + for player in lobby.game.players: + if player.id == kicked: + found = player + if found: + lobby.game.players.remove(player) + del USERS[kicked] + await client.send_message(kicked, f"You have been kicked from lobby `{lobby.creator.id}`") + await message.reply_text(f"Kicked `{kicked}`") + else: + await message.reply_text(f"`{kicked}` is not in the lobby") + else: + await message.reply_text("Invalid user ID: it must be a positive integer") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +@Client.on_message(filters.command("broadcast")) +async def broadcast(client: Client, message: Message): + """ + Handles the /broadcast command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + elif len(message.command) <= 1: + await message.reply_text("Invalid syntax, the correct one is `/broadcast `", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" + for user in lobby.game.players: + if user.id != message.from_user.id: + try: + await client.send_message(user.id, f"{name} says: {message.text[11:]}") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + await message.reply_text(f"Done!") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +@Client.on_message(filters.command("whisper")) +async def whisper(client: Client, message: Message): + """ + Handles the /whisper command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + elif len(message.command) <= 2: + await message.reply_text("Invalid syntax, the correct one is `/whisper `", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" + user_id = None + if message.command[1].isnumeric(): + user_id = int(message.command[1]) + else: + await message.reply_text("Invalid user ID: it must be a positive integer") + return + found = False + for user in lobby.game.players: + if user.id == user_id: + found = True + break + if not found: + await message.reply_text(f"`{user_id}` is not in this lobby, use /list to see the list of users") + return + try: + await client.send_message(user_id, f"{name} whispers you: {message.text[11 + len(message.command[1]) - 1:]}") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + await message.reply_text(f"Done!") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +def reshuffle_table_in_deck(game: UNOGame): + """ + Called when the deck becomes empty and + the cards on the table need to be reshuffled + into it + """ + + game.deck.extend(game.table) + game.table.clear() + game.table.append(game.deck.popleft()) + while True: + if game.table[0].kind != UNOCardType.NUMBER: + game.table.append(game.deck.popleft()) + game.deck.append(game.table.popleft()) + else: + break + # Breaks up the pairs + random.shuffle(game.deck) + + +def pick_first_card(game: UNOGame): + """ + Picks the first card on the table from the + deck, ensuring that it is a number + """ + + # Picks the first card (which can't be a special card) + game.table.append(game.deck.popleft()) + while True: + if game.table[0].kind != UNOCardType.NUMBER: + game.table.append(game.deck.popleft()) + game.deck.append(game.table.popleft()) + else: + break + + +def distribute_cards(game: UNOGame, n: int = 1): + """ + Distributes n cards to all players evenly + as if they were being given in real life + one by one + """ + + # Gives 7 cards to each player, one by one (i.e first one to each player, then the second, etc.) + while not all(len(player.cards) == n for player in game.players): + for player in game.players: + player.cards.append(game.deck.popleft()) + for player in game.players: + for card in player.cards: + card.placed_by = player + + +@Client.on_message(filters.command("begin")) +async def begin(client: Client, message: Message): + """ + Handles the /begin command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + if lobby.creator.id != message.from_user.id: + await message.reply_text("Only the creator can begin the game") + elif lobby.started: + await message.reply_text("The game has already started") + else: + if len(lobby.game.players) < 2: + await message.reply_text("You need at least 2 players to play UNO!") + return + name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" + lobby.open = False + lobby.started = True + pick_first_card(lobby.game) + distribute_cards(lobby.game) + lobby.game.current_pos = random.randint(0, len(lobby.game.players) - 1) + lobby.game.current_player = lobby.game.players[lobby.game.current_pos] + await message.reply_text("The game has started and the lobby is now closed to new players") + for user in lobby.game.players: + user.won = False + try: + if user.id != message.from_user.id: + await client.send_message(user.id, f"{name} has started the game!") + msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n" + for i, card in enumerate(user.cards): + msg += f"{i + 1}. {card_to_string(card)}\n" + await client.send_message(user.id, msg) + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +def card_to_string(card: UNOCard) -> str: + """ + Converts a UNO card to its corresponding + string representation + """ + + maps = { + 1: "1️⃣", + 2: "2️⃣", + 3: "3️⃣", + 4: "4️⃣", + 5: "5️⃣", + 6: "6️⃣", + 7: "7️⃣", + 8: "8️⃣", + 9: "9️⃣", + UNOCardType.NUMBER: "", + UNOCardType.DRAW: "➕", + UNOCardType.WILD: "🌈", + UNOCardType.WILD_DRAW: "🌈➕4️⃣", + UNOCardType.REVERSE: "↩️", + UNOCardType.SKIP: "🚫", + + UNOCardColor.BLACK: "", + UNOCardColor.BLUE: "🔵", + UNOCardColor.RED: "🔴", + UNOCardColor.GREEN: "🟢", + UNOCardColor.YELLOW: "🟡", + } + result = "" + match card.kind: + case UNOCardType.NUMBER: + result += maps[card.metadata["value"]] + result += maps[card.color] + case UNOCardType.DRAW: + result += maps[card.kind] + result += maps[2] + result += maps[card.color] + case UNOCardType.REVERSE | UNOCardType.SKIP: + result += maps[card.kind] + result += maps[card.color] + case UNOCardType.WILD | UNOCardType.WILD_DRAW: + result += maps[card.kind] + return result + + +@Client.on_message(filters.command("info")) +async def info(_: Client, message: Message): + """ + Handles the /info command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + if not lobby.started: + await message.reply_text("The game hasn't started yet, ask the creator to send /begin!") + return + msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n" + for player in lobby.game.players: + if player.id == message.from_user.id: + break + for i, card in enumerate(player.cards): + msg += f"{i + 1}. {card_to_string(card)}\n" + await message.reply_text(msg) + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +async def check_special_cards(client: Client, game: UNOGame): + """ + Performs some checks on the effects of special cards and + sends some messages to users to make them aware of any + changes that occur + """ + + +@Client.on_message(filters.command("pass")) +async def pass_turn(client: Client, message: Message): + """ + Handles the /pass command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + for player in lobby.game.players: + if player.id == message.from_user.id: + break + if not lobby.started: + await message.reply_text("The game hasn't started yet, ask the creator to send /begin!") + return + elif lobby.game.current_player != player: + await message.reply_text("It's not your turn! Wait for your opponents to finish first") + return + elif USERS[message.from_user.id][0] not in {PlayerAction.END_DRAW, PlayerAction.END_THROW, PlayerAction.END_ACTION}: + await message.reply_text("You have to draw or throw a card before passing your turn") + return + else: + while True: + if lobby.game.direction == UNODirection.CLOCKWISE: + lobby.game.current_pos += 1 + else: + lobby.game.current_pos -= 1 + lobby.game.current_player = lobby.game.players[lobby.game.current_pos % len(lobby.game.players)] + if lobby.game.current_player.won: + # Skip players who won already + continue + else: + break + USERS[message.from_user.id] = (PlayerAction.END_TURN, USERS[message.from_user.id][1]) + name = f"{message.from_user.first_name or ''}{message.from_user.last_name or ''}" + next_name = lobby.game.current_player.name + for user in lobby.game.players: + if user.id != message.from_user.id: + msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n" + for i, card in enumerate(user.cards): + msg += f"{i + 1}. {card_to_string(card)}\n" + try: + await client.send_message(user.id, f"{name} has finished their turn, it is now {next_name}'s turn") + await client.send_message(lobby.game.current_player.id, msg) + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + await message.reply_text("Done!") + await check_special_cards(client, lobby.game) + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +@Client.on_message(filters.command("draw")) +async def draw(client: Client, message: Message): + """ + Handles the /draw command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + if not lobby.started: + await message.reply_text("The game hasn't started yet, ask the creator to send /begin!") + return + for player in lobby.game.players: + if player.id == message.from_user.id: + break + if lobby.game.current_player != player: + await message.reply_text("It's not your turn! Wait for your opponents to finish first") + return + elif USERS[message.from_user.id][0] in {PlayerAction.END_DRAW, PlayerAction.END_THROW, PlayerAction.END_ACTION}: + await message.reply_text("You have already performed an action for this turn!") + return + for user in lobby.game.players: + if user.id != message.from_user.id: + try: + await client.send_message(user.id, f"{player.name} draws a card") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + if not lobby.game.deck: + reshuffle_table_in_deck(lobby.game) + for user in lobby.game.players: + try: + await client.send_message(user.id, "The deck pile is empty! Shuffling the table back into the deck") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + player.cards.append(lobby.game.deck.popleft()) + USERS[message.from_user.id] = (PlayerAction.END_DRAW, USERS[message.from_user.id][1]) + await message.reply_text(f"You draw a card. It's {card_to_string(player.cards[-1])}") + msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n" + for i, card in enumerate(player.cards): + msg += f"{i + 1}. {card_to_string(card)}\n" + await client.send_message(lobby.game.current_player.id, msg) + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + + +def check_next_throw(game: UNOGame, card: UNOCard) -> bool: + """ + Returns if the card to be thrown is valid + """ + + top = game.table[0] + result = False + match card.kind: + case UNOCardType.NUMBER as k if top.kind == k: + if card.metadata["value"] == top.metadata["value"]: + result = True + elif card.color == top.color: + result = True + # Can't stack colors! + if top.placed_by and card.color == top.color and card.placed_by is top.placed_by: + result = False + case UNOCardType.DRAW | UNOCardType.REVERSE | UNOCardType.SKIP as k: + result = top.kind == k and top.color == card.color + case UNOCardType.WILD | UNOCardType.WILD_DRAW: + result = True + case _: + result = False + result = result and top.stackable + return result + + +@Client.on_message(filters.command("throw")) +async def throw(client: Client, message: Message): + """ + Handles the /throw command + """ + + try: + if (data := USERS[message.from_user.id])[0] == PlayerAction.NONE: + await message.reply_text("You haven't created or joined a game lobby yet. Send /play to create it or /join to join an existing one", parse_mode=ParseMode.MARKDOWN) + elif len(message.command) != 2: + await message.reply_text("Invalid syntax, the correct one is `/throw `", parse_mode=ParseMode.MARKDOWN) + else: + lobby = data[1] + if not lobby.started: + await message.reply_text("The game hasn't started yet, ask the creator to send /begin!") + return + for player in lobby.game.players: + if player.id == message.from_user.id: + break + if lobby.game.current_player != player: + await message.reply_text("It's not your turn! Wait for your opponents to finish first") + return + elif USERS[message.from_user.id][0] == PlayerAction.END_ACTION: + await message.reply_text("You have already performed an action for this turn!") + return + card_no = 0 + if message.command[1].isnumeric(): + card_no = int(message.command[1]) + else: + await message.reply_text("Invalid card number: it must be a positive integer") + return + if card_no > len(player.cards) or card_no == 0: + await message.reply_text(f"Invalid card: you can choose between 1 and {len(player.cards)}") + return + if not check_next_throw(lobby.game, player.cards[card_no - 1]): + await message.reply_text("You can't throw that card") + return + lobby.game.table.appendleft(player.cards.pop(card_no - 1)) + USERS[message.from_user.id] = (PlayerAction.END_THROW, USERS[message.from_user.id][1]) + for user in lobby.game.players: + if user.id != message.from_user.id: + try: + await client.send_message(user.id, f"{player.name} throws {card_to_string(lobby.game.table[0])}") + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + if len(player.cards) == 0: + if player.won: + await message.reply_text("You already won! Please wait for the match to end") + return + lobby.game.winners.append(player) + player.won = True + await message.reply_text("You're out of cards, congrats!") + if sum(player.won for player in lobby.game.players) == len(lobby.game.players) - 1: + msg = "📝 Results:\n\n" + for i, user in enumerate(lobby.game.winners): + msg += f"- {user.name} " + match i: + case 0: + msg += "🥇" + case 1: + msg += "🥈" + case 2: + msg += "🥉" + msg += "\n" + for player in lobby.game.players: + if not player.won: + msg += f"- {player.name}\n" + # Only one player left + for i, user in enumerate(lobby.game.players): + user.cards = [] + USERS[user.id] = (PlayerAction.WAITING_LOBBY, lobby) + try: + if user.id != message.from_user.id: + await client.send_message(user.id, f"Game over!") + await client.send_message(user.id, msg) + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") + lobby.started = False + else: + await message.reply_text(f"You throw a card. It's {card_to_string(lobby.game.table[0])}") + msg = f"Game status: \n- Card on the table: {card_to_string(lobby.game.table[0])}\n\n- Current Player: {lobby.game.current_player.name}\n\n- Your cards:\n" + for i, card in enumerate(player.cards): + msg += f"{i + 1}. {card_to_string(card)}\n" + await client.send_message(lobby.game.current_player.id, msg) + except RPCError as rpc_error: + logging.error(f"An unexpected RPC error occurred: {type(rpc_error).__name__} -> {rpc_error}") diff --git a/bin/activate b/bin/activate new file mode 100644 index 0000000..0fe1df4 --- /dev/null +++ b/bin/activate @@ -0,0 +1,83 @@ +# This file must be used with "source bin/activate" *from bash* +# you cannot run it directly + + +if [ "${BASH_SOURCE-}" = "$0" ]; then + echo "You must source this script: \$ source $0" >&2 + exit 33 +fi + +deactivate () { + unset -f pydoc >/dev/null 2>&1 || true + + # reset old environment variables + # ! [ -z ${VAR+_} ] returns true if VAR is declared at all + if ! [ -z "${_OLD_VIRTUAL_PATH:+_}" ] ; then + PATH="$_OLD_VIRTUAL_PATH" + export PATH + unset _OLD_VIRTUAL_PATH + fi + if ! [ -z "${_OLD_VIRTUAL_PYTHONHOME+_}" ] ; then + PYTHONHOME="$_OLD_VIRTUAL_PYTHONHOME" + export PYTHONHOME + unset _OLD_VIRTUAL_PYTHONHOME + fi + + # The hash command must be called to get it to forget past + # commands. Without forgetting past commands the $PATH changes + # we made may not be respected + hash -r 2>/dev/null + + if ! [ -z "${_OLD_VIRTUAL_PS1+_}" ] ; then + PS1="$_OLD_VIRTUAL_PS1" + export PS1 + unset _OLD_VIRTUAL_PS1 + fi + + unset VIRTUAL_ENV + if [ ! "${1-}" = "nondestructive" ] ; then + # Self destruct! + unset -f deactivate + fi +} + +# unset irrelevant variables +deactivate nondestructive + +VIRTUAL_ENV='/home/nocturn9x/UnoVRBot' +if ([ "$OSTYPE" = "cygwin" ] || [ "$OSTYPE" = "msys" ]) && $(command -v cygpath &> /dev/null) ; then + VIRTUAL_ENV=$(cygpath -u "$VIRTUAL_ENV") +fi +export VIRTUAL_ENV + +_OLD_VIRTUAL_PATH="$PATH" +PATH="$VIRTUAL_ENV/bin:$PATH" +export PATH + +# unset PYTHONHOME if set +if ! [ -z "${PYTHONHOME+_}" ] ; then + _OLD_VIRTUAL_PYTHONHOME="$PYTHONHOME" + unset PYTHONHOME +fi + +if [ -z "${VIRTUAL_ENV_DISABLE_PROMPT-}" ] ; then + _OLD_VIRTUAL_PS1="${PS1-}" + if [ "x" != x ] ; then + PS1="() ${PS1-}" + else + PS1="(`basename \"$VIRTUAL_ENV\"`) ${PS1-}" + fi + export PS1 +fi + +# Make sure to unalias pydoc if it's already there +alias pydoc 2>/dev/null >/dev/null && unalias pydoc || true + +pydoc () { + python -m pydoc "$@" +} + +# The hash command must be called to get it to forget past +# commands. Without forgetting past commands the $PATH changes +# we made may not be respected +hash -r 2>/dev/null diff --git a/bin/activate.csh b/bin/activate.csh new file mode 100644 index 0000000..b48511f --- /dev/null +++ b/bin/activate.csh @@ -0,0 +1,55 @@ +# This file must be used with "source bin/activate.csh" *from csh*. +# You cannot run it directly. +# Created by Davide Di Blasi . + +set newline='\ +' + +alias deactivate 'test $?_OLD_VIRTUAL_PATH != 0 && setenv PATH "$_OLD_VIRTUAL_PATH:q" && unset _OLD_VIRTUAL_PATH; rehash; test $?_OLD_VIRTUAL_PROMPT != 0 && set prompt="$_OLD_VIRTUAL_PROMPT:q" && unset _OLD_VIRTUAL_PROMPT; unsetenv VIRTUAL_ENV; test "\!:*" != "nondestructive" && unalias deactivate && unalias pydoc' + +# Unset irrelevant variables. +deactivate nondestructive + +setenv VIRTUAL_ENV '/home/nocturn9x/UnoVRBot' + +set _OLD_VIRTUAL_PATH="$PATH:q" +setenv PATH "$VIRTUAL_ENV:q/bin:$PATH:q" + + + +if ('' != "") then + set env_name = '() ' +else + set env_name = '('"$VIRTUAL_ENV:t:q"') ' +endif + +if ( $?VIRTUAL_ENV_DISABLE_PROMPT ) then + if ( $VIRTUAL_ENV_DISABLE_PROMPT == "" ) then + set do_prompt = "1" + else + set do_prompt = "0" + endif +else + set do_prompt = "1" +endif + +if ( $do_prompt == "1" ) then + # Could be in a non-interactive environment, + # in which case, $prompt is undefined and we wouldn't + # care about the prompt anyway. + if ( $?prompt ) then + set _OLD_VIRTUAL_PROMPT="$prompt:q" + if ( "$prompt:q" =~ *"$newline:q"* ) then + : + else + set prompt = "$env_name:q$prompt:q" + endif + endif +endif + +unset env_name +unset do_prompt + +alias pydoc python -m pydoc + +rehash diff --git a/bin/activate.fish b/bin/activate.fish new file mode 100644 index 0000000..16dea9f --- /dev/null +++ b/bin/activate.fish @@ -0,0 +1,100 @@ +# This file must be used using `source bin/activate.fish` *within a running fish ( http://fishshell.com ) session*. +# Do not run it directly. + +function _bashify_path -d "Converts a fish path to something bash can recognize" + set fishy_path $argv + set bashy_path $fishy_path[1] + for path_part in $fishy_path[2..-1] + set bashy_path "$bashy_path:$path_part" + end + echo $bashy_path +end + +function _fishify_path -d "Converts a bash path to something fish can recognize" + echo $argv | tr ':' '\n' +end + +function deactivate -d 'Exit virtualenv mode and return to the normal environment.' + # reset old environment variables + if test -n "$_OLD_VIRTUAL_PATH" + # https://github.com/fish-shell/fish-shell/issues/436 altered PATH handling + if test (echo $FISH_VERSION | head -c 1) -lt 3 + set -gx PATH (_fishify_path "$_OLD_VIRTUAL_PATH") + else + set -gx PATH $_OLD_VIRTUAL_PATH + end + set -e _OLD_VIRTUAL_PATH + end + + if test -n "$_OLD_VIRTUAL_PYTHONHOME" + set -gx PYTHONHOME "$_OLD_VIRTUAL_PYTHONHOME" + set -e _OLD_VIRTUAL_PYTHONHOME + end + + if test -n "$_OLD_FISH_PROMPT_OVERRIDE" + and functions -q _old_fish_prompt + # Set an empty local `$fish_function_path` to allow the removal of `fish_prompt` using `functions -e`. + set -l fish_function_path + + # Erase virtualenv's `fish_prompt` and restore the original. + functions -e fish_prompt + functions -c _old_fish_prompt fish_prompt + functions -e _old_fish_prompt + set -e _OLD_FISH_PROMPT_OVERRIDE + end + + set -e VIRTUAL_ENV + + if test "$argv[1]" != 'nondestructive' + # Self-destruct! + functions -e pydoc + functions -e deactivate + functions -e _bashify_path + functions -e _fishify_path + end +end + +# Unset irrelevant variables. +deactivate nondestructive + +set -gx VIRTUAL_ENV '/home/nocturn9x/UnoVRBot' + +# https://github.com/fish-shell/fish-shell/issues/436 altered PATH handling +if test (echo $FISH_VERSION | head -c 1) -lt 3 + set -gx _OLD_VIRTUAL_PATH (_bashify_path $PATH) +else + set -gx _OLD_VIRTUAL_PATH $PATH +end +set -gx PATH "$VIRTUAL_ENV"'/bin' $PATH + +# Unset `$PYTHONHOME` if set. +if set -q PYTHONHOME + set -gx _OLD_VIRTUAL_PYTHONHOME $PYTHONHOME + set -e PYTHONHOME +end + +function pydoc + python -m pydoc $argv +end + +if test -z "$VIRTUAL_ENV_DISABLE_PROMPT" + # Copy the current `fish_prompt` function as `_old_fish_prompt`. + functions -c fish_prompt _old_fish_prompt + + function fish_prompt + # Run the user's prompt first; it might depend on (pipe)status. + set -l prompt (_old_fish_prompt) + + # Prompt override provided? + # If not, just prepend the environment name. + if test -n '' + printf '(%s) ' '' + else + printf '(%s) ' (basename "$VIRTUAL_ENV") + end + + string join -- \n $prompt # handle multi-line prompts + end + + set -gx _OLD_FISH_PROMPT_OVERRIDE "$VIRTUAL_ENV" +end diff --git a/bin/activate.nu b/bin/activate.nu new file mode 100644 index 0000000..61415d7 --- /dev/null +++ b/bin/activate.nu @@ -0,0 +1,41 @@ +# Setting all environment variables for the venv +let path-name = (if ((sys).host.name == "Windows") { "Path" } { "PATH" }) +let virtual-env = "/home/nocturn9x/UnoVRBot" +let bin = "bin" +let path-sep = ":" + +let old-path = ($nu.path | str collect ($path-sep)) + +let venv-path = ([$virtual-env $bin] | path join) +let new-path = ($nu.path | prepend $venv-path | str collect ($path-sep)) + +# environment variables that will be batched loaded to the virtual env +let new-env = ([ + [name, value]; + [$path-name $new-path] + [_OLD_VIRTUAL_PATH $old-path] + [VIRTUAL_ENV $virtual-env] +]) + +load-env $new-env + +# Creating the new prompt for the session +let virtual_prompt = (if ("" != "") { + "() " +} { + (build-string '(' ($virtual-env | path basename) ') ') +} +) + +# If there is no default prompt, then only the env is printed in the prompt +let new_prompt = (if ( config | select prompt | empty? ) { + ($"build-string '($virtual_prompt)'") +} { + ($"build-string '($virtual_prompt)' (config get prompt | str find-replace "build-string" "")") +}) +let-env PROMPT_COMMAND = $new_prompt + +# We are using alias as the function definitions because only aliases can be +# removed from the scope +alias pydoc = python -m pydoc +alias deactivate = source "/home/nocturn9x/UnoVRBot/bin/deactivate.nu" diff --git a/bin/activate.ps1 b/bin/activate.ps1 new file mode 100644 index 0000000..04c2472 --- /dev/null +++ b/bin/activate.ps1 @@ -0,0 +1,60 @@ +$script:THIS_PATH = $myinvocation.mycommand.path +$script:BASE_DIR = Split-Path (Resolve-Path "$THIS_PATH/..") -Parent + +function global:deactivate([switch] $NonDestructive) { + if (Test-Path variable:_OLD_VIRTUAL_PATH) { + $env:PATH = $variable:_OLD_VIRTUAL_PATH + Remove-Variable "_OLD_VIRTUAL_PATH" -Scope global + } + + if (Test-Path function:_old_virtual_prompt) { + $function:prompt = $function:_old_virtual_prompt + Remove-Item function:\_old_virtual_prompt + } + + if ($env:VIRTUAL_ENV) { + Remove-Item env:VIRTUAL_ENV -ErrorAction SilentlyContinue + } + + if (!$NonDestructive) { + # Self destruct! + Remove-Item function:deactivate + Remove-Item function:pydoc + } +} + +function global:pydoc { + python -m pydoc $args +} + +# unset irrelevant variables +deactivate -nondestructive + +$VIRTUAL_ENV = $BASE_DIR +$env:VIRTUAL_ENV = $VIRTUAL_ENV + +New-Variable -Scope global -Name _OLD_VIRTUAL_PATH -Value $env:PATH + +$env:PATH = "$env:VIRTUAL_ENV/bin:" + $env:PATH +if (!$env:VIRTUAL_ENV_DISABLE_PROMPT) { + function global:_old_virtual_prompt { + "" + } + $function:_old_virtual_prompt = $function:prompt + + if ("" -ne "") { + function global:prompt { + # Add the custom prefix to the existing prompt + $previous_prompt_value = & $function:_old_virtual_prompt + ("() " + $previous_prompt_value) + } + } + else { + function global:prompt { + # Add a prefix to the current prompt, but don't discard it. + $previous_prompt_value = & $function:_old_virtual_prompt + $new_prompt_value = "($( Split-Path $env:VIRTUAL_ENV -Leaf )) " + ($new_prompt_value + $previous_prompt_value) + } + } +} diff --git a/bin/activate_this.py b/bin/activate_this.py new file mode 100644 index 0000000..2e57677 --- /dev/null +++ b/bin/activate_this.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +"""Activate virtualenv for current interpreter: + +Use exec(open(this_file).read(), {'__file__': this_file}). + +This can be used when you must use an existing Python interpreter, not the virtualenv bin/python. +""" +import os +import site +import sys + +try: + abs_file = os.path.abspath(__file__) +except NameError: + raise AssertionError("You must use exec(open(this_file).read(), {'__file__': this_file}))") + +bin_dir = os.path.dirname(abs_file) +base = bin_dir[: -len("bin") - 1] # strip away the bin part from the __file__, plus the path separator + +# prepend bin to PATH (this file is inside the bin directory) +os.environ["PATH"] = os.pathsep.join([bin_dir] + os.environ.get("PATH", "").split(os.pathsep)) +os.environ["VIRTUAL_ENV"] = base # virtual env is right above bin directory + +# add the virtual environments libraries to the host python import mechanism +prev_length = len(sys.path) +for lib in "../lib/python3.10/site-packages".split(os.pathsep): + path = os.path.realpath(os.path.join(bin_dir, lib)) + site.addsitedir(path.decode("utf-8") if "" else path) +sys.path[:] = sys.path[prev_length:] + sys.path[0:prev_length] + +sys.real_prefix = sys.prefix +sys.prefix = base diff --git a/bin/deactivate.nu b/bin/deactivate.nu new file mode 100644 index 0000000..4052438 --- /dev/null +++ b/bin/deactivate.nu @@ -0,0 +1,11 @@ +# Setting the old path +let path-name = (if ((sys).host.name == "Windows") { "Path" } { "PATH" }) +let-env $path-name = $nu.env._OLD_VIRTUAL_PATH + +# Unleting the environment variables that were created when activating the env +unlet-env VIRTUAL_ENV +unlet-env _OLD_VIRTUAL_PATH +unlet-env PROMPT_COMMAND + +unalias pydoc +unalias deactivate diff --git a/bin/pip b/bin/pip new file mode 100755 index 0000000..b0851ed --- /dev/null +++ b/bin/pip @@ -0,0 +1,8 @@ +#!/home/nocturn9x/UnoVRBot/bin/python3 +# -*- coding: utf-8 -*- +import re +import sys +from pip._internal.cli.main import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/bin/pip3 b/bin/pip3 new file mode 100755 index 0000000..b0851ed --- /dev/null +++ b/bin/pip3 @@ -0,0 +1,8 @@ +#!/home/nocturn9x/UnoVRBot/bin/python3 +# -*- coding: utf-8 -*- +import re +import sys +from pip._internal.cli.main import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/bin/pip3.10 b/bin/pip3.10 new file mode 100755 index 0000000..b0851ed --- /dev/null +++ b/bin/pip3.10 @@ -0,0 +1,8 @@ +#!/home/nocturn9x/UnoVRBot/bin/python3 +# -*- coding: utf-8 -*- +import re +import sys +from pip._internal.cli.main import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/bin/python b/bin/python new file mode 120000 index 0000000..ae65fda --- /dev/null +++ b/bin/python @@ -0,0 +1 @@ +/usr/bin/python3 \ No newline at end of file diff --git a/bin/python3 b/bin/python3 new file mode 120000 index 0000000..d8654aa --- /dev/null +++ b/bin/python3 @@ -0,0 +1 @@ +python \ No newline at end of file diff --git a/bin/python3.10 b/bin/python3.10 new file mode 120000 index 0000000..d8654aa --- /dev/null +++ b/bin/python3.10 @@ -0,0 +1 @@ +python \ No newline at end of file diff --git a/bin/wheel b/bin/wheel new file mode 100755 index 0000000..47ed8a9 --- /dev/null +++ b/bin/wheel @@ -0,0 +1,8 @@ +#!/home/nocturn9x/UnoVRBot/bin/python +# -*- coding: utf-8 -*- +import re +import sys +from wheel.cli import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/bin/wheel-3.10 b/bin/wheel-3.10 new file mode 100755 index 0000000..47ed8a9 --- /dev/null +++ b/bin/wheel-3.10 @@ -0,0 +1,8 @@ +#!/home/nocturn9x/UnoVRBot/bin/python +# -*- coding: utf-8 -*- +import re +import sys +from wheel.cli import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/bin/wheel3 b/bin/wheel3 new file mode 100755 index 0000000..47ed8a9 --- /dev/null +++ b/bin/wheel3 @@ -0,0 +1,8 @@ +#!/home/nocturn9x/UnoVRBot/bin/python +# -*- coding: utf-8 -*- +import re +import sys +from wheel.cli import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/bin/wheel3.10 b/bin/wheel3.10 new file mode 100755 index 0000000..47ed8a9 --- /dev/null +++ b/bin/wheel3.10 @@ -0,0 +1,8 @@ +#!/home/nocturn9x/UnoVRBot/bin/python +# -*- coding: utf-8 -*- +import re +import sys +from wheel.cli import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/config.ini.example b/config.ini.example new file mode 100644 index 0000000..1aa000d --- /dev/null +++ b/config.ini.example @@ -0,0 +1,15 @@ +[bot] + +# Bot configuration file. Options that are left empty (with or without a '=' sign) +# are automatically set to None. Don't mess shit up, idiot <3 + +api_id = 12345 +api_hash = API HASH HERE +bot_token = BOT TOKEN HERE +logging_format = [%%(levelname)s %%(asctime)s] Module '%%(module)s', function '%%(funcName)s' at line %%(lineno)d -> %%(message)s +logging_date_format = %%d/%%m/%%Y %%T %%p +logging_level = 20 +plugins_dir = UnoVRBot/modules +working_directory = +session_name = UnoVRBot +restart_times = 0 diff --git a/main.py b/main.py new file mode 100644 index 0000000..407ebba --- /dev/null +++ b/main.py @@ -0,0 +1,161 @@ +# Copyright (C) 2022 nocturn9x +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pyrogram import Client, idle +from pyrogram.session import Session +import asyncio +import logging +import time +import uvloop +import sys +from configparser import ConfigParser +from UnoVRBot.misc.config import BotConfig +from pydantic import ValidationError + + +# Main Bot module. This is the entry point to the Bot and it loads all the modules and +# performs automatic startup, actually pyrogram does all the job but shut up we are communist + + +def log(prefix: str = "", message: str = ""): + """ + Hey I like logging but this is used only when no proper + logging configuration is yet in place to actually be useful + :param message: The message to log to stderr, default to "" + :type message: str + :param prefix: Any extra prefix to add to the message, default to "" + Use {date} to replace it with the current date in d/m/Y format and + {time} to replace it with the current time in %H:%M:%S %p format or + just %T %p if you're a lazy sucker. A whitespace is automatically + added between the prefix and the suffix just in case you're dumb + :type prefix: str + :return: Nothing + :rtype: None + """ + + sys.stderr.write(f"{prefix.format(date=time.strftime('%d/%m/%Y'), time=time.strftime('%T %p'))} {message}\n") + + +async def start(config: BotConfig): + """ + Starts the Bot, or at least tries to do so and give + up after a while cuz we've got other shit to do than starting shitty + Bots all day + :param config: The BotConfig class (yeah, thanks, explanation be like) + :type config: BotConfig + :return: Nothing, fucker + :rtype: None + """ + + log("[Bot - INFO {date} {time}]", + "Loading an actual logging configuration") + logging.basicConfig(format=config.logging_format, + datefmt=config.logging_date_format, + level=config.logging_level) + logging.info("Now we're talking, creating the Client instance") + Session.notice_displayed = True # Sorry but no one really cares + logging.getLogger("pyrogram").setLevel(logging.WARNING) # Because pyrogram logs are fucking verbose + client = Client(api_id=config.api_id, + api_hash=config.api_hash, + bot_token=config.bot_token, + plugins={"root": config.plugins_dir}, + workdir=config.working_directory, + name=config.session_name) + try: + await client.start() + logging.info("Client started, goin' asleep baby") + await idle() + except Exception as oh_no: + logging.error(f"A fatal error occurred -> {type(oh_no).__name__}: {oh_no}, exiting") + try: + logging.debug("Disconnecting from telegram") + await client.disconnect() + except Exception as disconnect: + logging.debug(f"Could not disconnect -> {type(disconnect).__name__}: {disconnect}") + finally: + if config.restart_times: + logging.warning(f"Restarting up to {config.restart_times} more times") + config.restart_times -= 1 + await start(config) + + +def check_config(parser: ConfigParser): + """ + Checks if a parsed .INI file actually contains the required settings + or if the user is a sack of shit. Starts the Bot if the user + is actually a good boy or beat him to death otherwise :) + :param parser: A ConfigParser instance (I should really consider to become a teacher, boy) + :type parser: ConfigParser + :return: Again, not the slightest shit + :rtype: None + """ + + getters = { # Sort of like a computed goto. Kind of, I just hate a ton of if conditions + "api_id": (ConfigParser.getint, "int"), + "api_hash": (ConfigParser.get, "str"), + "logging_format": (ConfigParser.get, "int"), + "logging_level": (ConfigParser.getint, "str"), + "logging_date_format": (ConfigParser.get, "str"), + "plugins_dir": (ConfigParser.get, "str"), + "working_directory": (ConfigParser.get, "str"), + "session_name": (ConfigParser.get, "str"), + "restart_times": (ConfigParser.getint, "int"), + "bot_token": (ConfigParser.get, "str") + } + options = {} + if "bot" not in parser: + log("[Bot - ERROR {date} {time}]", + "How am I gonna start if you can't even fucking fill a config file? Missing 'bot' section") + return + for option in parser["bot"]: + getter, expected = getters.get(option, (None, None)) + if not getter: + log("[Bot - WARN {date} {time}]", + f"Unknown option '{option}', skipping it, idiot!") + continue + try: + options[option] = getter(parser, "bot", option) + except Exception as fucked_up: + log("[Bot - ERROR {date} {time}]", + f"Hey! Got a type mismatch for option '{option}' (expecting '{expected}'), can you please stop being a dumbass? -> {type(fucked_up).__name__}: {fucked_up}") + try: + asyncio.run(start(BotConfig(**options))) + except AttributeError: # Python 3.6 + asyncio.get_event_loop().run_until_complete(start(BotConfig(**options))) + except ValidationError as validation_error: + log("[Bot - ERROR {date} {time}]", + f"What the heck did you do? Apparently some type mismatch occurred -> {validation_error}") + + +if __name__ == "__main__": # Start dat shit boi + uvloop.install() + parser = ConfigParser() + if len(sys.argv) < 2: + # Yeah yeah say what you want fucker, but how am I gonna log without proper config in place? :\ + # Take this shitty print instead! + log("[Bot - INFO {date} {time}]", + "Loading config file at default path ('./config.ini'), were you too lazy to specify a custom one?") + file = "./config.ini" + else: + log("[Bot - INFO {date} {time}]", + f"I see you were not lazy! Loading config file at '{sys.argv[1]}'") + file = sys.argv[1] + try: + with open(file) as config: + parser.read_file(config) + except Exception as fuck: # Cause we're too lazy for anything else + log("[Bot - ERROR {date} {time}]", + f"Sometimes thing go well, sometimes they don't, fix this shit fucker -> {type(fuck.__name__)}: {fuck}") + else: + log("[Bot - INFO {date} {time}]", + "Config file loaded, let's see if you're retarded or not now, checking options") + check_config(parser) diff --git a/pyvenv.cfg b/pyvenv.cfg new file mode 100644 index 0000000..fa63b6d --- /dev/null +++ b/pyvenv.cfg @@ -0,0 +1,8 @@ +home = /usr +implementation = CPython +version_info = 3.10.4.final.0 +virtualenv = 20.13.0 +include-system-site-packages = false +base-prefix = /usr +base-exec-prefix = /usr +base-executable = /usr/bin/python3 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e312d69 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +pyrogram +tgcrypto +uvloop