2020-04-04 18:43:07 +02:00
2020-03-19 18:24:06 +01:00
import types
from collections import deque , defaultdict
from selectors import DefaultSelector , EVENT_READ , EVENT_WRITE
2020-03-20 11:12:44 +01:00
from heapq import heappush , heappop
2020-03-19 18:24:06 +01:00
import socket
2020-04-04 19:14:04 +02:00
from . exceptions import AlreadyJoinedError , CancelledError , GiambioError
2020-03-20 11:12:44 +01:00
from timeit import default_timer
from time import sleep as wait
2020-03-25 08:40:01 +01:00
from . socket import AsyncSocket , WantRead , WantWrite
2020-03-24 14:19:10 +01:00
from . abstractions import Task , Result
2020-03-25 08:40:01 +01:00
from socket import SOL_SOCKET , SO_ERROR
2020-03-25 18:37:57 +01:00
from . traps import _join , _sleep , _want_read , _want_write , _cancel
2020-03-25 23:36:17 +01:00
from . util import TaskManager
2020-03-20 10:26:42 +01:00
2020-03-25 14:39:32 +01:00
2020-03-19 18:24:06 +01:00
class EventLoop :
""" Implementation of an event loop, alternates between execution of coroutines (asynchronous functions)
2020-03-20 11:12:44 +01:00
to allow a concurrency model or ' green threads ' """
2020-03-19 18:24:06 +01:00
def __init__ ( self ) :
2020-03-19 19:48:24 +01:00
""" Object constructor """
2020-03-19 18:24:06 +01:00
self . to_run = deque ( ) # Scheduled tasks
2020-03-20 11:12:44 +01:00
self . paused = [ ] # Sleeping tasks
2020-03-19 18:24:06 +01:00
self . selector = DefaultSelector ( ) # Selector object to perform I/O multiplexing
2020-03-19 22:15:22 +01:00
self . running = None # This will always point to the currently running coroutine (Task object)
2020-03-20 11:12:44 +01:00
self . joined = defaultdict ( list ) # Tasks that want to join
self . clock = default_timer # Monotonic clock to keep track of elapsed time
2020-03-20 18:38:17 +01:00
self . sequence = 0 # To avoid TypeError in the (unlikely) event of two task with the same deadline we use a unique and incremental integer pushed to the queue together with the deadline and the function itself
2020-04-04 19:14:04 +02:00
self . _exiting = False
2020-03-19 18:24:06 +01:00
def loop ( self ) :
""" Main event loop for giambio """
2020-03-19 19:48:24 +01:00
2020-03-19 18:24:06 +01:00
while True :
2020-03-21 10:42:52 +01:00
if not self . selector . get_map ( ) and not any ( ( self . to_run + deque ( self . paused ) ) ) :
2020-03-19 18:24:06 +01:00
break
2020-03-23 23:26:09 +01:00
while not self . to_run : # If there are sockets ready, (re)schedule their associated task
2020-03-20 11:12:44 +01:00
timeout = 0.0 if self . to_run else None
2020-03-23 23:26:09 +01:00
tasks = self . selector . select ( timeout )
2020-03-19 18:24:06 +01:00
for key , _ in tasks :
2020-03-19 19:48:24 +01:00
self . to_run . append ( key . data ) # Socket ready? Schedule the task
2020-03-20 17:53:30 +01:00
self . selector . unregister ( key . fileobj ) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now)
2020-03-20 11:12:44 +01:00
while self . to_run or self . paused :
if not self . to_run :
wait ( max ( 0.0 , self . paused [ 0 ] [ 0 ] - self . clock ( ) ) ) # If there are no tasks ready, just do nothing
2020-03-20 17:53:30 +01:00
while self . paused and self . paused [ 0 ] [ 0 ] < self . clock ( ) : # Reschedules task when their timer has elapsed
2020-03-20 18:38:17 +01:00
_ , __ , coro = heappop ( self . paused )
2020-03-20 11:12:44 +01:00
self . to_run . append ( coro )
2020-03-19 18:24:06 +01:00
self . running = self . to_run . popleft ( ) # Sets the currently running task
try :
2020-03-24 08:42:29 +01:00
method , * args = self . running . run ( )
getattr ( self , method ) ( * args ) # Sneaky method call, thanks to David Beazley for this ;)
2020-03-25 15:09:39 +01:00
self . running . steps + = 1
2020-03-23 19:36:58 +01:00
except StopIteration as e :
2020-03-25 14:39:32 +01:00
self . running . execution = " FINISH "
2020-03-23 18:52:17 +01:00
self . running . result = Result ( e . args [ 0 ] if e . args else None , None ) # Saves the return value
2020-03-20 11:12:44 +01:00
self . to_run . extend ( self . joined . pop ( self . running , ( ) ) ) # Reschedules the parent task
2020-03-23 20:49:12 +01:00
except RuntimeError :
2020-03-25 11:27:29 +01:00
self . to_run . extend ( self . joined . pop ( self . running , ( ) ) ) # Reschedules the parent task
2020-04-04 18:43:07 +02:00
except CancelledError :
self . running . execution = " CANCELLED "
self . to_run . extend ( self . joined . pop ( self . running , ( ) ) )
except Exception as err :
2020-04-04 19:14:04 +02:00
if not self . _exiting :
self . running . execution = " ERRORED "
self . running . result = Result ( None , err )
self . to_run . extend ( self . joined . pop ( self . running , ( ) ) ) # Reschedules the parent task
else :
raise
2020-03-20 11:12:44 +01:00
except KeyboardInterrupt :
2020-03-23 19:36:58 +01:00
self . running . throw ( KeyboardInterrupt )
2020-03-19 18:24:06 +01:00
def start ( self , coroutine : types . coroutine , * args , * * kwargs ) :
2020-03-24 08:42:29 +01:00
""" Starts the event loop """
2020-03-21 10:59:59 +01:00
2020-03-25 23:36:17 +01:00
TaskManager ( self ) . spawn ( coroutine ( * args , * * kwargs ) )
2020-03-19 18:24:06 +01:00
self . loop ( )
def want_read ( self , sock : socket . socket ) :
2020-03-24 08:42:29 +01:00
""" Handler for the ' want_read ' event, registers the socket inside the selector to perform I/0 multiplexing """
2020-03-19 18:24:06 +01:00
self . selector . register ( sock , EVENT_READ , self . running )
def want_write ( self , sock : socket . socket ) :
2020-03-24 08:42:29 +01:00
""" Handler for the ' want_write ' event, registers the socket inside the selector to perform I/0 multiplexing """
2020-03-19 18:24:06 +01:00
self . selector . register ( sock , EVENT_WRITE , self . running )
def wrap_socket ( self , sock ) :
2020-03-24 08:42:29 +01:00
""" Wraps a standard socket into an AsyncSocket object """
2020-03-21 10:59:59 +01:00
2020-03-19 18:24:06 +01:00
return AsyncSocket ( sock , self )
2020-03-20 11:12:44 +01:00
async def read_sock ( self , sock : socket . socket , buffer : int ) :
2020-03-24 08:42:29 +01:00
""" Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes
from the socket
"""
2020-03-25 18:37:57 +01:00
await _want_read ( sock )
2020-03-19 18:24:06 +01:00
return sock . recv ( buffer )
2020-03-20 11:12:44 +01:00
async def accept_sock ( self , sock : socket . socket ) :
2020-03-24 08:42:29 +01:00
""" Accepts a socket connection asynchronously, waiting until the resource is available and returning the
result of the accept ( ) call
"""
2020-03-25 18:37:57 +01:00
await _want_read ( sock )
2020-03-19 18:24:06 +01:00
return sock . accept ( )
2020-03-20 11:12:44 +01:00
async def sock_sendall ( self , sock : socket . socket , data : bytes ) :
2020-03-24 08:42:29 +01:00
""" Sends all the passed data, as bytes, trough the socket asynchronously """
2020-03-19 18:24:06 +01:00
while data :
2020-03-25 18:37:57 +01:00
await _want_write ( sock )
2020-03-19 18:24:06 +01:00
sent_no = sock . send ( data )
data = data [ sent_no : ]
2020-03-20 11:12:44 +01:00
async def close_sock ( self , sock : socket . socket ) :
2020-03-24 08:42:29 +01:00
""" Closes the socket asynchronously """
2020-03-25 18:37:57 +01:00
await _want_write ( sock )
2020-03-19 18:24:06 +01:00
return sock . close ( )
2020-03-20 11:12:44 +01:00
def want_join ( self , coro : types . coroutine ) :
2020-03-24 08:42:29 +01:00
""" Handler for the ' want_join ' event, does some magic to tell the scheduler
to wait until the passed coroutine ends . The result of this call equals whatever the
coroutine returns or , if an exception gets raised , the exception will get propagated inside the
parent task """
2020-03-25 23:36:17 +01:00
2020-03-20 11:12:44 +01:00
if coro not in self . joined :
self . joined [ coro ] . append ( self . running )
2020-03-19 19:48:24 +01:00
else :
2020-03-23 19:09:18 +01:00
self . running . throw ( AlreadyJoinedError ( " Joining the same task multiple times is not allowed! " ) )
2020-03-19 19:48:24 +01:00
2020-03-20 11:12:44 +01:00
def want_sleep ( self , seconds ) :
2020-03-24 08:42:29 +01:00
if seconds > 0 : # If seconds <= 0 this function just acts as a checkpoint
self . sequence + = 1 # Make this specific sleeping task unique to avoid error when comparing identical deadlines
heappush ( self . paused , ( self . clock ( ) + seconds , self . sequence , self . running ) )
else :
self . to_run . append ( self . running ) # Reschedule the task that called sleep
2020-03-20 11:12:44 +01:00
2020-03-20 11:27:26 +01:00
def want_cancel ( self , task ) :
2020-03-25 11:27:29 +01:00
self . to_run . extend ( self . joined . pop ( self . running , ( ) ) )
self . to_run . append ( self . running ) # Reschedules the parent task
2020-03-23 18:52:17 +01:00
task . throw ( CancelledError ( ) )
2020-03-25 23:36:17 +01:00
2020-03-20 16:42:44 +01:00
async def connect_sock ( self , sock : socket . socket , addr : tuple ) :
2020-03-25 08:40:01 +01:00
try : # "Borrowed" from curio
result = sock . connect ( addr )
return result
except WantWrite :
2020-03-25 18:37:57 +01:00
await _want_write ( sock )
2020-03-25 08:40:01 +01:00
err = sock . getsockopt ( SOL_SOCKET , SO_ERROR )
if err != 0 :
2020-04-04 18:43:07 +02:00
raise OSError ( err , f ' Connect call failed: { addr } ' )