Fixed I/O, TODO fix cancellation and add context manager

This commit is contained in:
nocturn9x 2020-03-23 22:26:09 +00:00
parent 74863525fd
commit 3fee3890e6
3 changed files with 7 additions and 11 deletions

View File

@ -4,16 +4,17 @@ loop = giambio.EventLoop()
""" """
What works and what does not (23rd March 2020 22:38 PM) What works and what does not (23rd March 2020 23:24 PM)
- Run tasks concurrently: V - Run tasks concurrently: V
- Join mechanism: V - Join mechanism: V
- Sleep mechanism: V - Sleep mechanism: V
- Cancellation mechanism: X Note: Figure out how to rescheule parent task - Cancellation mechanism: X Note: Figure out how to rescheule parent task
- Exception propagation: V - Exception propagation: V
- Concurrent I/O: X Note: I/O would work only when a task is joined (weird) - Concurrent I/O: V
- Return values of coroutines: V - Return values of coroutines: V
- Scheduling tasks for future execution: V - Scheduling tasks for future execution: V
- Task Spawner (context manager): X Note: Not Implemented
""" """

View File

@ -4,7 +4,6 @@ from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from heapq import heappush, heappop from heapq import heappush, heappop
import socket import socket
from .exceptions import AlreadyJoinedError, CancelledError from .exceptions import AlreadyJoinedError, CancelledError
import traceback
from timeit import default_timer from timeit import default_timer
from time import sleep as wait from time import sleep as wait
from .socket import AsyncSocket from .socket import AsyncSocket
@ -32,9 +31,9 @@ class EventLoop:
while True: while True:
if not self.selector.get_map() and not any((self.to_run + deque(self.paused))): if not self.selector.get_map() and not any((self.to_run + deque(self.paused))):
break break
while self.selector.get_map(): # If there are sockets ready, (re)schedule their associated task while not self.to_run: # If there are sockets ready, (re)schedule their associated task
timeout = 0.0 if self.to_run else None timeout = 0.0 if self.to_run else None
tasks = deque(self.selector.select(timeout)) tasks = self.selector.select(timeout)
for key, _ in tasks: for key, _ in tasks:
self.to_run.append(key.data) # Socket ready? Schedule the task self.to_run.append(key.data) # Socket ready? Schedule the task
self.selector.unregister(key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) self.selector.unregister(key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now)

View File

@ -3,6 +3,7 @@ from giambio.socket import AsyncSocket
import socket import socket
import logging import logging
loop = giambio.core.EventLoop() loop = giambio.core.EventLoop()
logging.basicConfig(level=20, logging.basicConfig(level=20,
@ -17,19 +18,14 @@ async def make_srv(address: tuple):
sock.listen(5) sock.listen(5)
asock = loop.wrap_socket(sock) asock = loop.wrap_socket(sock)
logging.info(f"Echo server serving asynchronously at {address}") logging.info(f"Echo server serving asynchronously at {address}")
logging.info("Sleeping for 2 secs")
await giambio.sleep(2)
logging.info("Done!")
while True: while True:
conn, addr = await asock.accept() # TODO: Figure out why this I/O operation actually works while other don't conn, addr = await asock.accept()
logging.info(f"{addr} connected") logging.info(f"{addr} connected")
task = loop.spawn(echo_server(conn, addr)) task = loop.spawn(echo_server(conn, addr))
# await task.cancel() # Cancel task!
async def echo_server(sock: AsyncSocket, addr: tuple): async def echo_server(sock: AsyncSocket, addr: tuple):
with sock: with sock:
# Without the try/except block and the call to giambio.join(), the task would block here
await sock.send_all(b"Welcome to the server pal!\n") await sock.send_all(b"Welcome to the server pal!\n")
while True: while True:
data = await sock.receive(1000) data = await sock.receive(1000)