import time from datetime import datetime, timedelta from dataclasses import dataclass, field from abc import ABC, abstractmethod from threading import Lock, Event, Thread from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count from timeit import default_timer import structio def parse_delay(delta: str) -> timedelta: """ Parse a delay string like '1y 2mo 3w 4d 5h 6m 7s' into a timedelta object. Case and space insensitive """ second = 1 minute = second * 60 hour = minute * 60 day = hour * 24 week = day * 7 # 30 days is just the most commonly accepted # duration for a month, even though 4 weeks # also is, and they don't always match up in # duration month = day * 30 # No leap years or stuff like that. Sorry! year = day * 365 units = {"s": second, "m": minute, "h": hour, "d": day, "w": week, "mo": month, "y": year} # We need this to handle multi-character units # like months longest = max(len(k) for k in units.keys()) seconds = 0 default_unit = unit = units['d'] # Defaults to days value = "" # Clean up the string, we don't need this junk anyway delta = delta.translate(str.maketrans({"\n": "", "\t": "", "\r": "", " ": ""})).lower() i = 0 while i < len(delta): ch = delta[i] i += 1 # isdigit will return True even for things like numbers # in Arabic and stuff. For my use case this is undesirable, # but if you care about that, just swap isnumeric() for isdigit() if ch.isnumeric(): value += ch # Only restrict ourselves to ASCII letters. Again, this is specific # to my use-case. Can edit as necessary elif ch.isalpha() and ch.isascii(): unit_name = ch j = i # Try to parse as many characters of the unit as possible, # but stop once we either reach the end of the string or # the length of the current (potential) unit equals the # length of the longest one we know while j < len(delta) and len(unit_name) < longest: if (char := delta[j]).isalpha() and char.isascii(): unit_name += char else: break j += 1 # If the unit was more than one character long, we # have to skip the next len(unit_name) - 1 characters, # so we don't try to parse 'mo' and then 'o' again, for # example. If the unit was one character long, this value # will just be zero i += len(unit_name) - 1 try: unit = units[unit_name] except KeyError: raise ValueError(f"invalid unit {unit_name!r}") if value: seconds += unit * int(value) value = "" unit = default_unit else: # Unknown character raise ValueError(f"Invalid character {ch!r} in time delta: {delta!r}") if value: seconds = unit * int(value) return timedelta(seconds=seconds) @dataclass(order=True) class ScheduledTask: """ A scheduled task """ command: str = field(compare=True) when: timedelta = field(compare=True) last: datetime | None = field(compare=False, default=None) def should_run(self): if self.last is None: return True return (datetime.now() - self.last) >= self.when class TaskScheduler(ABC): """ A task scheduler. The concurrency_limit argument limits how many tasks can be run at the same time """ def __init__(self, concurrency_limit: int | None = None): """ Public object constructor """ self._running = False self._stop = False self._timer = default_timer self.concurrency_limit = concurrency_limit @property def running(self) -> bool: """ Returns whether the event scheduler is running """ return self._running @property def time(self) -> float: """ Return the scheduler's internal wall clock time, in seconds """ return self._timer() @property def stopping(self) -> bool: """ Returns whether the scheduler is in the process of shutting down """ return self.running and self._stop @property def stopped(self) -> bool: """ Returns whether the scheduler has stopped """ return not self.running and self._stop @abstractmethod def schedule(self, command: str, when: timedelta) -> ScheduledTask: """ Schedules the given command to run with the chosen interval. Duplicates are allowed. A ScheduledTask object is returned """ return NotImplemented() @abstractmethod def unschedule(self, task: ScheduledTask): """ Unschedules the command associated to the given ScheduledTask object. Note that this only removes the command associate with this exact object, not any of its duplicates (for that, use unschedule_all). If the given task is not scheduled, this method does nothing """ return NotImplemented() @abstractmethod def unschedule_all(self, task: ScheduledTask) -> int: """ Unschedules all commands associated to the given ScheduledTask object. Unlike unschedule(), this method checks for object equality rather than identity, so it removes all instances of the given command that match the given one (meaning it can be instantiated 'ex-novo' without needing the specific object returned by schedule()). Returns the number of tasks that were unscheduled """ return NotImplemented() @abstractmethod def start(self): """ Start the event scheduler. Unless explicitly stated elsewhere, tasks can be scheduled both before and during the call to run(), and they will be executed as expected. Tasks scheduled after calling stop() will be executed at the next call to run(). If the scheduler is already running, RuntimeError is raised. Note that, depending on the underlying implementation, this method call may block until the event loop is stopped by calling stop() (usually by another thread or task) """ return NotImplemented() @abstractmethod def stop(self): """ Stop the event scheduler. It can be restarted by calling start() again. Note that the scheduler will first terminate running active tasks before stopping, but stop() does not block to reflect that """ return NotImplemented() class SyncTaskScheduler(TaskScheduler): """ A synchronous, thread-safe task scheduler. Calling start() blocks the calling thread until the scheduler is stopped by calling stop() in another thread. Commands are executed in their own process """ def __init__(self, concurrency_limit: int | None = None): super().__init__(concurrency_limit) self._sem = Lock() self._evt = Event() self._executor = ProcessPoolExecutor(cpu_count() * 2 - 1 if concurrency_limit is None else concurrency_limit) self._tie = 0 self._tasks: list[tuple[ScheduledTask, int]] = [] @staticmethod def _run_command(task: ScheduledTask): print(f"[{datetime.now().strftime('%d/%m/%Y %H:%M:%S %p')}] Running command {task.command!r}") # TODO: os.exec(...) def schedule(self, command: str, when: timedelta) -> ScheduledTask: task = ScheduledTask(command, when) with self._sem: self._tasks.append((task, self._tie)) self._tie += 1 self._tasks.sort() # Wakeup other thread blocked in start() self._evt.set() self._evt.clear() return task def unschedule(self, task: ScheduledTask): with self._sem: for t in self._tasks: if t[0] is task: self._tasks.remove(t) self._tasks.sort() break def unschedule_all(self, task: ScheduledTask) -> int: result = 0 with self._sem: for t in self._tasks: if t[0] == task: self._tasks.remove(t) result += 1 if result > 0: self._tasks.sort() return result def start(self): with self._sem: if self.running: raise RuntimeError("scheduler is already running") self._stop = False while True: with self._sem: if self._stop: self._running = False break self._sem.acquire() if not self._tasks: # No tasks to run. To avoid busy-waiting, we # go to sleep indefinitely on an event until # a task is scheduled. Note how we don't hold # the lock before calling wait, this is to avoid # a deadlock when other threads call schedule() self._sem.release() self._evt.wait() if self._sem.locked(): self._sem.release() tasks: list[ScheduledTask] = [] with self._sem: for task, _ in self._tasks: if task.should_run(): tasks.append(task) task.last = datetime.now() self._executor.map(self._run_command, tasks) with self._sem: deadline = self._tasks[0][0].when.total_seconds() # Wait again until either the closest deadline is due # or when a new task was added (this is to make sure # that the scheduler stays responsive to schedule() # calls made by other threads during its runtime) self._evt.wait(timeout=deadline) def stop(self): with self._sem: self._stop = True class AsyncTaskScheduler(TaskScheduler): """ An asynchronous task scheduler. Not thread safe. """ def __init__(self, concurrency_limit: int | None = None): super().__init__(concurrency_limit) if self.concurrency_limit is None: self.concurrency_limit = 20 self._sem = structio.Lock() # Task concurrency limiter self._cap = structio.Semaphore(self.concurrency_limit) self._evt = structio.Event() self._tie = 0 self._tasks: list[tuple[ScheduledTask, int]] = [] async def _run_command(self, task: ScheduledTask): print(f"[{datetime.now().strftime('%d/%m/%Y %H:%M:%S %p')}] Running command {task.command!r}") # TODO: await structio.parallel.run(...) # Allow more commands to be executed by releasing # a slot await self._cap.release() def schedule(self, command: str, when: timedelta) -> ScheduledTask: task = ScheduledTask(command, when) self._tasks.append((task, self._tie)) self._tie += 1 self._tasks.sort() # Wakeup other task blocked in start() self._evt.set() # Events are cheap, so they don't have a # clear method. We just create a new event self._evt = structio.Event() return task def unschedule(self, task: ScheduledTask): for t in self._tasks: if t[0] is task: self._tasks.remove(t) self._tasks.sort() break def unschedule_all(self, task: ScheduledTask) -> int: result = 0 for t in self._tasks: if t[0] == task: self._tasks.remove(t) result += 1 if result > 0: self._tasks.sort() return result async def start(self): if self.running: raise RuntimeError("scheduler is already running") self._stop = False async with structio.create_pool() as pool: while True: if self._stop: self._running = False break if not self._tasks: await self._evt.wait() tasks: list[ScheduledTask] = [] for task, _ in self._tasks: if task.should_run(): tasks.append(task) task.last = datetime.now() for task in tasks: await self._cap.acquire() pool.spawn(self._run_command, task) deadline = self._tasks[0][0].when.total_seconds() # Wait again until either the closest deadline is due # or when a new task was added (this is to make sure # that the scheduler stays responsive to schedule() # calls made by other tasks during its runtime) with structio.skip_after(deadline): await self._evt.wait() def stop(self): self._stop = True def main_threaded(): print("Using synchronous scheduler") scheduler: TaskScheduler = SyncTaskScheduler() scheduler.schedule("test", parse_delay("1s")) scheduler.schedule("test 2", parse_delay("2s")) scheduler_thread = Thread(target=scheduler.start) scheduler_thread.start() time.sleep(5) print("Adding 3rd task") scheduler.schedule("test 3", parse_delay("3s")) time.sleep(10) print("Removing third task") scheduler.unschedule_all(ScheduledTask("test 3", parse_delay("3s"))) time.sleep(5) print("Stopping") scheduler.stop() scheduler_thread.join() print("Stopped") async def main_async(): print("Using asynchronous scheduler") scheduler: TaskScheduler = AsyncTaskScheduler() scheduler.schedule("test", parse_delay("1s")) scheduler.schedule("test 2", parse_delay("2s")) async with structio.create_pool() as pool: pool.spawn(scheduler.start) await structio.sleep(5) print("Adding 3rd task") scheduler.schedule("test 3", parse_delay("3s")) await structio.sleep(10) print("Removing third task") scheduler.unschedule_all(ScheduledTask("test 3", parse_delay("3s"))) await structio.sleep(5) print("Stopping") scheduler.stop() print("Stopped") structio.run(main_async) main_threaded()