2020-03-19 18:24:06 +01:00
import types
from collections import deque , defaultdict
from selectors import DefaultSelector , EVENT_READ , EVENT_WRITE
2020-03-20 11:12:44 +01:00
from heapq import heappush , heappop
2020-03-19 18:24:06 +01:00
import socket
2020-03-20 17:53:30 +01:00
from . exceptions import AlreadyJoinedError , CancelledError
2020-03-19 18:24:06 +01:00
import traceback
2020-03-20 11:12:44 +01:00
from timeit import default_timer
from time import sleep as wait
2020-03-20 18:13:04 +01:00
from . socket import AsyncSocket
2020-03-19 18:24:06 +01:00
2020-03-20 10:26:42 +01:00
2020-03-19 18:24:06 +01:00
class EventLoop :
""" Implementation of an event loop, alternates between execution of coroutines (asynchronous functions)
2020-03-20 11:12:44 +01:00
to allow a concurrency model or ' green threads ' """
2020-03-19 18:24:06 +01:00
def __init__ ( self ) :
2020-03-19 19:48:24 +01:00
""" Object constructor """
2020-03-19 18:24:06 +01:00
self . to_run = deque ( ) # Scheduled tasks
2020-03-20 11:12:44 +01:00
self . paused = [ ] # Sleeping tasks
2020-03-19 18:24:06 +01:00
self . selector = DefaultSelector ( ) # Selector object to perform I/O multiplexing
2020-03-19 22:15:22 +01:00
self . running = None # This will always point to the currently running coroutine (Task object)
2020-03-20 11:12:44 +01:00
self . joined = defaultdict ( list ) # Tasks that want to join
self . clock = default_timer # Monotonic clock to keep track of elapsed time
2020-03-20 18:38:17 +01:00
self . sequence = 0 # To avoid TypeError in the (unlikely) event of two task with the same deadline we use a unique and incremental integer pushed to the queue together with the deadline and the function itself
2020-03-19 18:24:06 +01:00
def loop ( self ) :
""" Main event loop for giambio """
2020-03-19 19:48:24 +01:00
2020-03-19 18:24:06 +01:00
while True :
2020-03-21 10:42:52 +01:00
if not self . selector . get_map ( ) and not any ( ( self . to_run + deque ( self . paused ) ) ) :
2020-03-19 18:24:06 +01:00
break
2020-03-20 17:53:30 +01:00
while self . selector . get_map ( ) : # If there are sockets ready, (re)schedule their associated task
2020-03-20 11:12:44 +01:00
timeout = 0.0 if self . to_run else None
tasks = deque ( self . selector . select ( timeout ) )
2020-03-19 18:24:06 +01:00
for key , _ in tasks :
2020-03-19 19:48:24 +01:00
self . to_run . append ( key . data ) # Socket ready? Schedule the task
2020-03-20 17:53:30 +01:00
self . selector . unregister ( key . fileobj ) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now)
2020-03-20 11:12:44 +01:00
while self . to_run or self . paused :
if not self . to_run :
wait ( max ( 0.0 , self . paused [ 0 ] [ 0 ] - self . clock ( ) ) ) # If there are no tasks ready, just do nothing
2020-03-20 17:53:30 +01:00
while self . paused and self . paused [ 0 ] [ 0 ] < self . clock ( ) : # Reschedules task when their timer has elapsed
2020-03-20 18:38:17 +01:00
_ , __ , coro = heappop ( self . paused )
2020-03-20 11:12:44 +01:00
self . to_run . append ( coro )
2020-03-19 18:24:06 +01:00
self . running = self . to_run . popleft ( ) # Sets the currently running task
try :
2020-03-20 11:12:44 +01:00
method , * args = self . running . run ( ) # Sneaky method call, thanks to David Beazley for this ;)
getattr ( self , method ) ( * args )
2020-03-20 18:13:04 +01:00
except StopIteration as e : # TODO: Fix this return mechanism, it looks like the return value of the child task gets "replaced" by None at some point
2020-03-20 10:26:42 +01:00
self . running . ret_value = e . args [ 0 ] if e . args else None # Saves the return value
2020-03-20 11:12:44 +01:00
self . to_run . extend ( self . joined . pop ( self . running , ( ) ) ) # Reschedules the parent task
2020-03-21 11:29:22 +01:00
except RuntimeError :
self . running . cancelled = True
2020-03-21 13:09:11 +01:00
self . to_run . extend ( self . joined . pop ( self . running , ( ) ) ) # Reschedules the parent task
print ( self . to_run )
2020-03-19 19:48:24 +01:00
except Exception as has_raised :
2020-03-20 17:53:30 +01:00
self . to_run . extend ( self . joined . pop ( self . running , ( ) ) ) # Reschedules the parent task
if self . running . joined : # Let the join function handle the hassle of propagating the error
self . running . exception = has_raised # Save the exception
else : # Let the exception propagate (I'm looking at you asyncIO ;))
2020-03-20 11:12:44 +01:00
raise
except KeyboardInterrupt :
self . running . coroutine . throw ( KeyboardInterrupt )
2020-03-19 18:24:06 +01:00
def spawn ( self , coroutine : types . coroutine ) :
""" Schedules a task for execution, appending it to the call stack """
2020-03-19 19:48:24 +01:00
task = Task ( coroutine )
self . to_run . append ( task )
return task
2020-03-21 10:42:52 +01:00
def schedule ( self , coroutine : types . coroutine , when : int ) :
""" Schedules a task for execution after n seconds """
self . sequence + = 1
task = Task ( coroutine )
heappush ( self . paused , ( self . clock ( ) + when , self . sequence , task ) )
return task
2020-03-19 18:24:06 +01:00
def start ( self , coroutine : types . coroutine , * args , * * kwargs ) :
2020-03-21 10:59:59 +01:00
""" Starts the eventloop """
2020-03-19 18:24:06 +01:00
self . spawn ( coroutine ( * args , * * kwargs ) )
self . loop ( )
def want_read ( self , sock : socket . socket ) :
""" Handler for the ' want_read ' event, performs the needed operations to read from the passed socket
asynchronously """
self . selector . register ( sock , EVENT_READ , self . running )
def want_write ( self , sock : socket . socket ) :
2020-03-19 22:15:22 +01:00
""" Handler for the ' want_write ' event, performs the needed operations to write into the passed socket
2020-03-19 18:24:06 +01:00
asynchronously """
self . selector . register ( sock , EVENT_WRITE , self . running )
def wrap_socket ( self , sock ) :
2020-03-21 10:59:59 +01:00
""" Wraps a standard socket into an AsyncSocket """
2020-03-19 18:24:06 +01:00
return AsyncSocket ( sock , self )
2020-03-20 11:12:44 +01:00
async def read_sock ( self , sock : socket . socket , buffer : int ) :
2020-03-19 18:24:06 +01:00
await want_read ( sock )
return sock . recv ( buffer )
2020-03-20 11:12:44 +01:00
async def accept_sock ( self , sock : socket . socket ) :
2020-03-19 18:24:06 +01:00
await want_read ( sock )
return sock . accept ( )
2020-03-20 11:12:44 +01:00
async def sock_sendall ( self , sock : socket . socket , data : bytes ) :
2020-03-19 18:24:06 +01:00
while data :
await want_write ( sock )
sent_no = sock . send ( data )
data = data [ sent_no : ]
2020-03-20 11:12:44 +01:00
async def close_sock ( self , sock : socket . socket ) :
2020-03-19 18:24:06 +01:00
await want_write ( sock )
return sock . close ( )
2020-03-20 11:12:44 +01:00
def want_join ( self , coro : types . coroutine ) :
if coro not in self . joined :
self . joined [ coro ] . append ( self . running )
2020-03-19 19:48:24 +01:00
else :
raise AlreadyJoinedError ( " Joining the same task multiple times is not allowed! " )
2020-03-20 11:12:44 +01:00
def want_sleep ( self , seconds ) :
2020-03-20 18:38:17 +01:00
self . sequence + = 1 # Make this specific sleeping task unique to avoid error when comparing identical deadlines
heappush ( self . paused , ( self . clock ( ) + seconds , self . sequence , self . running ) )
2020-03-20 11:12:44 +01:00
2020-03-20 11:27:26 +01:00
def want_cancel ( self , task ) :
task . coroutine . throw ( CancelledError )
2020-03-20 16:42:44 +01:00
async def connect_sock ( self , sock : socket . socket , addr : tuple ) :
await want_write ( sock )
return sock . connect ( addr )
2020-03-19 19:48:24 +01:00
2020-03-20 18:06:12 +01:00
class Task :
""" A simple wrapper around a coroutine object """
def __init__ ( self , coroutine : types . coroutine ) :
self . coroutine = coroutine
self . status = False # Not ran yet
self . joined = False
self . ret_val = None # Return value is saved here
self . exception = None # If errored, the exception is saved here
self . cancelled = False # When cancelled, this is True
def run ( self ) :
self . status = True
return self . coroutine . send ( None )
def __repr__ ( self ) :
return f " giambio.core.Task( { self . coroutine } , { self . status } , { self . joined } , { self . ret_val } , { self . exception } , { self . cancelled } ) "
async def cancel ( self ) :
return await cancel ( self )
async def join ( self ) :
return await join ( self )
2020-03-20 11:12:44 +01:00
@types.coroutine
2020-03-19 18:24:06 +01:00
def sleep ( seconds : int ) :
""" Pause the execution of a coroutine for the passed amount of seconds,
without blocking the entire event loop , which keeps watching for other events """
2020-03-19 19:48:24 +01:00
yield " want_sleep " , seconds
2020-03-19 18:24:06 +01:00
@types.coroutine
2020-03-20 18:13:04 +01:00
def want_read ( sock : socket . socket ) : # TODO: Fix this and make it work also when tasks are not joined
2020-03-19 18:24:06 +01:00
""" ' Tells ' the event loop that there is some coroutine that wants to read from the passed socket """
yield " want_read " , sock
@types.coroutine
def want_write ( sock : socket . socket ) :
""" ' Tells ' the event loop that there is some coroutine that wants to write into the passed socket """
yield " want_write " , sock
@types.coroutine
def join ( task : Task ) :
""" ' Tells ' the scheduler that the desired task MUST be awaited for completion """
2020-03-19 19:48:24 +01:00
task . joined = True
2020-03-19 18:24:06 +01:00
yield " want_join " , task
2020-03-20 10:26:42 +01:00
if task . exception :
2020-03-19 18:24:06 +01:00
print ( " Traceback (most recent call last): " )
2020-03-20 10:26:42 +01:00
traceback . print_tb ( task . exception . __traceback__ )
2020-03-20 17:53:30 +01:00
exception_name = type ( task . exception ) . __name__
2020-03-20 10:26:42 +01:00
if str ( task . exception ) :
2020-03-20 17:53:30 +01:00
print ( f " { exception_name } : { task . exception } " )
2020-03-19 18:24:06 +01:00
else :
2020-03-20 10:26:42 +01:00
print ( task . exception )
2020-03-20 17:53:30 +01:00
raise task . exception
2020-03-20 10:26:42 +01:00
return task . ret_val
2020-03-19 18:24:06 +01:00
2020-03-20 11:27:26 +01:00
@types.coroutine
def cancel ( task : Task ) :
yield " want_cancel " , task
2020-03-20 11:38:42 +01:00
assert task . cancelled
2020-03-19 18:24:06 +01:00