diff --git a/experiment.py b/experiment.py index 9ef74ed..edf2c6e 100644 --- a/experiment.py +++ b/experiment.py @@ -26,7 +26,7 @@ async def countdown(n): await giambio.sleep(1) print("Countdown over") return "Count DOWN over" - except giambio.exceptions.CancelledError: + except giambio.CancelledError: print("countdown cancelled!") async def count(stop, step=1): @@ -38,14 +38,14 @@ async def count(stop, step=1): await giambio.sleep(step) print("Countup over") return "Count UP over" - except giambio.exceptions.CancelledError: + except giambio.CancelledError: print("count cancelled!") async def main(): print("Spawning countdown immediately, scheduling count for 4 secs from now") task = loop.spawn(countdown(8)) - task1 = loop.schedule(count(8, 2), 4) - await giambio.sleep(0) # Beware! Cancelling a task straight away will propagate the error in the parent + task1 = loop.schedule(count(8, 2), 4) # Schedules the task, it will be ran 4 seconds from now + await giambio.sleep(0) # Act as a checkpoint to switch tasks. Beware! Cancelling a task straight away will propagate the error in the parent # await task.cancel() # TODO: Fix this to reschedule the parent task properly result = await task.join() result1 = await task1.join() diff --git a/giambio/__init__.py b/giambio/__init__.py index 439a463..40fadc0 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -1,5 +1,6 @@ __author__ = "Nocturn9x aka Isgiambyy" __version__ = (0, 0, 1) -from .core import EventLoop, join, sleep +from .core import EventLoop, sleep +from .exceptions import GiambioError, AlreadyJoinedError, CancelledError -__all__ = ["EventLoop", "join", "sleep"] \ No newline at end of file +__all__ = ["EventLoop", "sleep", "GiambioError", "AlreadyJoinedError", "CancelledError"] \ No newline at end of file diff --git a/giambio/core.py b/giambio/core.py index 266411c..4b9d80c 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -45,8 +45,8 @@ class EventLoop: self.to_run.append(coro) self.running = self.to_run.popleft() # Sets the currently running task try: - method, *args = self.running.run() # Sneaky method call, thanks to David Beazley for this ;) - getattr(self, method)(*args) + method, *args = self.running.run() + getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) except StopIteration as e: self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task @@ -69,7 +69,7 @@ class EventLoop: self.to_run.append(task) return task - def schedule(self, coroutine: types.coroutine, when: int): # TODO: Fix this + def schedule(self, coroutine: types.coroutine, when: int): """Schedules a task for execution after n seconds""" self.sequence += 1 @@ -78,55 +78,73 @@ class EventLoop: return task def start(self, coroutine: types.coroutine, *args, **kwargs): - """Starts the eventloop""" + """Starts the event loop""" self.spawn(coroutine(*args, **kwargs)) self.loop() def want_read(self, sock: socket.socket): - """Handler for the 'want_read' event, performs the needed operations to read from the passed socket - asynchronously""" + """Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing""" self.selector.register(sock, EVENT_READ, self.running) def want_write(self, sock: socket.socket): - """Handler for the 'want_write' event, performs the needed operations to write into the passed socket - asynchronously""" + """Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing""" self.selector.register(sock, EVENT_WRITE, self.running) def wrap_socket(self, sock): - """Wraps a standard socket into an AsyncSocket""" + """Wraps a standard socket into an AsyncSocket object""" return AsyncSocket(sock, self) async def read_sock(self, sock: socket.socket, buffer: int): + """Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes + from the socket + """ + await want_read(sock) return sock.recv(buffer) async def accept_sock(self, sock: socket.socket): + """Accepts a socket connection asynchronously, waiting until the resource is available and returning the + result of the accept() call + """ + await want_read(sock) return sock.accept() async def sock_sendall(self, sock: socket.socket, data: bytes): + """Sends all the passed data, as bytes, trough the socket asynchronously""" + while data: await want_write(sock) sent_no = sock.send(data) data = data[sent_no:] async def close_sock(self, sock: socket.socket): + """Closes the socket asynchronously""" + await want_write(sock) return sock.close() def want_join(self, coro: types.coroutine): + """Handler for the 'want_join' event, does some magic to tell the scheduler + to wait until the passed coroutine ends. The result of this call equals whatever the + coroutine returns or, if an exception gets raised, the exception will get propagated inside the + parent task""" + if coro not in self.joined: self.joined[coro].append(self.running) else: self.running.throw(AlreadyJoinedError("Joining the same task multiple times is not allowed!")) def want_sleep(self, seconds): - self.sequence += 1 # Make this specific sleeping task unique to avoid error when comparing identical deadlines - heappush(self.paused, (self.clock() + seconds, self.sequence, self.running)) + if seconds > 0: # If seconds <= 0 this function just acts as a checkpoint + self.sequence += 1 # Make this specific sleeping task unique to avoid error when comparing identical deadlines + heappush(self.paused, (self.clock() + seconds, self.sequence, self.running)) + else: + self.to_run.append(self.running) # Reschedule the task that called sleep def want_cancel(self, task): self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task @@ -190,13 +208,20 @@ class Task: @types.coroutine def sleep(seconds: int): """Pause the execution of a coroutine for the passed amount of seconds, - without blocking the entire event loop, which keeps watching for other events""" + without blocking the entire event loop, which keeps watching for other events + + This function is also useful as a sort of checkpoint, because it returns the execution + control to the scheduler, which can then switch to another task. If a coroutine does not have + enough calls to async methods (or 'checkpoints'), e.g one that needs the 'await' keyword before it, this might + affect performance as it would prevent the scheduler from switching tasks properly. If you feel + like this happens in your code, try adding a call to giambio.sleep(0); this will act as a checkpoint without + actually pausing the execution of your coroutine""" yield "want_sleep", seconds @types.coroutine -def want_read(sock: socket.socket): # TODO: Fix this and make it work also when tasks are not joined +def want_read(sock: socket.socket): """'Tells' the event loop that there is some coroutine that wants to read from the passed socket""" yield "want_read", sock @@ -221,4 +246,6 @@ def join(task: Task): @types.coroutine def cancel(task: Task): + """'Tells' the scheduler that the passed task must be cancelled""" + yield "want_cancel", task diff --git a/giambio/socket.py b/giambio/socket.py index 52dc821..cd4634a 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -42,7 +42,6 @@ class AsyncSocket(object): await self.loop.connect_sock(self.sock, addr) - def __enter__(self): return self.sock.__enter__() @@ -52,5 +51,5 @@ class AsyncSocket(object): def __repr__(self): return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})" - def __getitem__(self, item): - return self.sock.__getitem__(item) + def __getattribute__(self, item): + return self.sock.__getattribute__(item)