Compare commits

...

5 Commits

Author SHA1 Message Date
Mattia Giambirtone ec9c4cf1c9 Merge remote-tracking branch 'origin/master'
# Conflicts:
#	tests/timeout3.py
2022-05-14 11:22:57 +02:00
Nocturn9x 66d7c51268 Locks stuff + fixes + bugs 2022-05-14 11:19:55 +02:00
Mattia Giambirtone 07b6621796 Merge remote-tracking branch 'origin/master'
# Conflicts:
#	giambio/context.py
#	giambio/core.py
#	giambio/runtime.py
#	giambio/sync.py
#	tests/queue.py
#	tests/timeout.py
2022-05-14 11:16:59 +02:00
Mattia Giambirtone 5c05de495d Fixed some issues with join() not properly rescheduling its caller when appropriate 2022-02-05 16:14:21 +01:00
Mattia Giambirtone 584f762d61 Fixed some edge cases with timeouts. Task.join() now returns the task's return value 2022-02-05 15:47:01 +01:00
22 changed files with 194 additions and 84 deletions

View File

@ -49,5 +49,5 @@ __all__ = [
"skip_after",
"task",
"io",
"socket"
"socket",
]

View File

@ -55,7 +55,7 @@ class TaskManager:
self._proper_init = False
self.enclosed_pool: Optional["giambio.context.TaskManager"] = None
self.raise_on_timeout: bool = raise_on_timeout
self.entry_point: Optional[Task] = None
self.entry_point: Optional[giambio.Task] = None
async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task":
"""

View File

@ -17,6 +17,7 @@ limitations under the License.
"""
# Import libraries and internal resources
from lib2to3.pytree import Base
import types
from giambio.task import Task
from collections import deque
@ -298,7 +299,10 @@ class AsyncScheduler:
# We need to make sure we don't try to execute
# exited tasks that are on the running queue
return
if not self.current_pool:
if self.current_pool:
if self.current_task.pool and self.current_task.pool is not self.current_pool:
self.current_task.pool.enclosed_pool = self.current_pool
else:
self.current_pool = self.current_task.pool
pool = self.current_pool
while pool:
@ -462,7 +466,8 @@ class AsyncScheduler:
for task in tasks:
self.paused.discard(task)
self.suspended.remove(task)
if task in self.suspended:
self.suspended.remove(task)
self.run_ready.extend(tasks)
self.reschedule_running()
@ -493,7 +498,7 @@ class AsyncScheduler:
:return: The closest deadline according to our clock
:rtype: float
"""
if not self.deadlines:
# If there are no deadlines just wait until the first task wakeup
timeout = max(0.0, self.paused.get_closest_deadline() - self.clock())
@ -616,8 +621,9 @@ class AsyncScheduler:
If ensure_done equals False, the loop will cancel ALL
running and scheduled tasks and then tear itself down.
If ensure_done equals True, which is the default behavior,
this method will raise a GiambioError if the loop hasn't
finished running.
this method will raise a GiambioError exception if the loop
hasn't finished running. The state of the event loop is reset
so it can be reused with another run() call
"""
if ensure_done:
@ -644,18 +650,11 @@ class AsyncScheduler:
if task.pool and task.pool.enclosed_pool and not task.pool.enclosed_pool.done():
return
for t in task.joiners:
if t not in self.run_ready:
# Since a task can be the parent
# of multiple children, we need to
# make sure we reschedule it only
# once, otherwise a RuntimeError will
# occur
self.run_ready.append(t)
self.run_ready.extend(task.joiners)
def join(self, task: Task):
"""
Joins a task to its callers (implicitly, the parent
Joins a task to its callers (implicitly the parent
task, but also every other task who called await
task.join() on the task object)
"""
@ -668,8 +667,9 @@ class AsyncScheduler:
self.io_release_task(task)
if task in self.suspended:
self.suspended.remove(task)
# If the pool has finished executing or we're at the first parent
# task that kicked the loop, we can safely reschedule the parent(s)
# If the pool (including any enclosing pools) has finished executing
# or we're at the first task that kicked the loop, we can safely
# reschedule the parent(s)
if task.pool is None:
return
if task.pool.done():
@ -680,7 +680,7 @@ class AsyncScheduler:
task.status = "crashed"
if task.exc.__traceback__:
# TODO: We might want to do a bit more complex traceback hacking to remove any extra
# frames from the exception call stack, but for now removing at least the first one
# frames from the exception call stack, but for now removing at least the first few
# seems a sensible approach (it's us catching it so we don't care about that)
for _ in range(5):
if task.exc.__traceback__.tb_next:

View File

@ -44,6 +44,13 @@ class TimeQueue:
self.sequence = 0
self.container: List[Tuple[float, int, Task]] = []
def __len__(self):
"""
Returns len(self)
"""
return len(self.container)
def __contains__(self, item: Task):
"""
Implements item in self. This method behaves
@ -263,6 +270,13 @@ class DeadlinesQueue:
return f"DeadlinesQueue({self.container})"
def __len__(self):
"""
Returns len(self)
"""
return len(self.container)
def put(self, pool: "giambio.context.TaskManager"):
"""
Pushes a pool with its deadline onto the queue. The
@ -273,7 +287,7 @@ class DeadlinesQueue:
:param pool: The pool object to store
"""
if pool and pool not in self.pools and not pool.done() and not pool.timed_out and pool.timeout:
if pool and pool not in self.pools and not pool.timed_out and pool.timeout:
self.pools.add(pool)
heappush(self.container, (pool.timeout, self.sequence, pool))
self.sequence += 1

View File

@ -246,7 +246,7 @@ class AsyncSocket:
await want_write(self.sock)
except WantRead:
await want_read(self.sock)
async def getpeername(self):
"""
Wrapper socket method

View File

@ -95,19 +95,16 @@ def create_pool():
return TaskManager()
def with_timeout(timeout: int or float):
"""
Creates an async pool with an associated timeout
"""
assert timeout > 0, "The timeout must be greater than 0"
mgr = TaskManager(timeout)
loop = get_event_loop()
mgr = TaskManager(timeout)
if loop.current_task is not loop.entry_point:
mgr.tasks.append(loop.current_task)
if loop.current_pool and loop.current_pool is not mgr:
loop.current_pool.enclosed_pool = mgr
if loop.current_task is loop.entry_point:
loop.current_pool = mgr
return mgr
@ -119,11 +116,8 @@ def skip_after(timeout: int or float):
"""
assert timeout > 0, "The timeout must be greater than 0"
mgr = TaskManager(timeout, False)
loop = get_event_loop()
mgr = TaskManager(timeout, False)
if loop.current_task is not loop.entry_point:
mgr.tasks.append(loop.current_task)
if loop.current_pool and loop.current_pool is not mgr:
loop.current_pool.enclosed_pool = mgr
if loop.current_task is loop.entry_point:
loop.current_pool = mgr
return mgr

View File

@ -54,8 +54,8 @@ class Task:
# when the task has been created but not started running yet--, "run"-- when
# the task is running synchronous code--, "io"-- when the task is waiting on
# an I/O resource--, "sleep"-- when the task is either asleep, waiting on
# an event or otherwise suspended, "crashed"-- when the task has exited because
# of an exception and "cancelled" when-- when the task has been explicitly cancelled
# an event or otherwise suspended, "crashed"-- when the task has exited because
# of an exception and "cancelled" when-- when the task has been explicitly cancelled
# with its cancel() method or as a result of an exception
status: str = "init"
# This attribute counts how many times the task's run() method has been called
@ -112,7 +112,6 @@ class Task:
self.joiners.add(task)
return await giambio.traps.join(self)
async def cancel(self):
"""
Cancels the task

View File

@ -24,8 +24,7 @@ limitations under the License.
import types
import inspect
from giambio.task import Task
from types import FunctionType
from typing import Any, Union, Iterable
from typing import Any, Union, Iterable, Coroutine, Callable
from giambio.exceptions import GiambioError
@ -49,7 +48,7 @@ async def suspend() -> Any:
return await create_trap("suspend")
async def create_task(coro: FunctionType, pool, *args):
async def create_task(coro: Callable[..., Coroutine[Any, Any, Any]], pool, *args):
"""
Spawns a new task in the current event loop from a bare coroutine
function. All extra positional arguments are passed to the function

View File

@ -11,10 +11,11 @@ async def child(name: int):
async def main():
start = giambio.clock()
async with giambio.create_pool() as pool:
await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately!
# await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately!
task = await pool.spawn(child, 2)
await task.cancel()
print("[main] Children spawned, awaiting completion")
await task.cancel()
print("[main] Second child cancelled")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")

View File

@ -20,7 +20,7 @@ async def receiver(sock: giambio.socket.AsyncSocket, q: giambio.Queue):
data, rest = data.split(b"\n", maxsplit=2)
buffer = b"".join(rest)
await q.put((1, data.decode()))
data = buffer
data = buffer
async def main(host: Tuple[str, int]):

View File

@ -23,15 +23,15 @@ async def serve(bind_address: tuple):
logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}")
async with giambio.create_pool() as pool:
async with sock:
while True:
try:
conn, address_tuple = await sock.accept()
clients.append(conn)
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
await pool.spawn(handler, conn, address_tuple)
except Exception as err:
# Because exceptions just *work*
logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}")
while True:
try:
conn, address_tuple = await sock.accept()
clients.append(conn)
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
await pool.spawn(handler, conn, address_tuple)
except Exception as err:
# Because exceptions just *work*
logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}")
async def handler(sock: AsyncSocket, client_address: tuple):

View File

@ -20,14 +20,14 @@ async def serve(bind_address: tuple):
logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}")
async with giambio.create_pool() as pool:
async with sock:
while True:
try:
conn, address_tuple = await sock.accept()
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
await pool.spawn(handler, conn, address_tuple)
except Exception as err:
# Because exceptions just *work*
logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}")
while True:
try:
conn, address_tuple = await sock.accept()
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
await pool.spawn(handler, conn, address_tuple)
except Exception as err:
# Because exceptions just *work*
logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}")
async def handler(sock: AsyncSocket, client_address: tuple):

View File

@ -15,7 +15,9 @@ async def child(ev: giambio.Event, pause: int):
await giambio.sleep(pause)
end_sleep = giambio.clock() - start_sleep
end_total = giambio.clock() - start_total
print(f"[child] Done! Slept for {end_total:.2f} seconds total ({end_pause:.2f} waiting, {end_sleep:.2f} sleeping), nice nap!")
print(
f"[child] Done! Slept for {end_total:.2f} seconds total ({end_pause:.2f} waiting, {end_sleep:.2f} sleeping), nice nap!"
)
async def parent(pause: int = 1):

41
tests/lock.py Normal file
View File

@ -0,0 +1,41 @@
from debugger import Debugger
import giambio
async def child(start: int, limit: int, q: giambio.Queue, l: giambio.Lock):
async with l: # If you comment this line, the resulting queue will
# be all mixed up!
# Locked! If any other task
# tries to acquire the lock,
# they'll wait for us to finish
while start <= limit:
print(f"[locked ({limit})] {start}")
await q.put(start)
start += 1
await giambio.sleep(1)
async def other(stop: int):
# Other tasks are unaffected
# by the lock if they don't
# acquire it
n = stop
while n:
print(f"[unlocked ({stop})] {n}")
n -= 1
await giambio.sleep(1)
async def main():
queue = giambio.Queue()
lock = giambio.Lock()
async with giambio.create_pool() as pool:
await pool.spawn(child, 1, 5, queue, lock)
await pool.spawn(child, 6, 10, queue, lock)
await pool.spawn(other, 10)
await pool.spawn(other, 5)
print(f"Result: {queue}") # Queue is ordered!
print("[main] Done")
giambio.run(main, debugger=())

View File

@ -36,7 +36,7 @@ async def main():
print("[main] First 2 children spawned, awaiting completion")
async with giambio.create_pool() as new_pool:
# This pool will be cancelled by the exception
# in the other pool
# in the outer pool
await new_pool.spawn(child2)
await new_pool.spawn(child3)
print("[main] Third and fourth children spawned")

View File

@ -1,3 +1,4 @@
## Example for nested task pools
import giambio
from debugger import Debugger

View File

@ -1,3 +1,5 @@
## Two-way proxy example stolen from njsmith's talk about trio
from debugger import Debugger
import giambio
import socket

View File

@ -1,3 +1,4 @@
## Producer-consumer code using giambio's async queue
import giambio
from debugger import Debugger

View File

@ -7,6 +7,7 @@ import time
_print = print
def print(*args, **kwargs):
sys.stdout.write(f"[{time.strftime('%H:%M:%S')}] ")
_print(*args, **kwargs)
@ -14,18 +15,19 @@ def print(*args, **kwargs):
async def test(host: str, port: int, bufsize: int = 4096):
socket = giambio.socket.wrap_socket(
ssl.create_default_context().wrap_socket(
sock=sock.socket(),
# Note: do_handshake_on_connect MUST
# be set to False on the synchronous socket!
# Giambio handles the TLS handshake asynchronously
# and making the SSL library handle it blocks
# the entire event loop. To perform the TLS
# handshake upon connection, set the this
# parameter in the AsyncSocket class instead
do_handshake_on_connect=False,
server_hostname=host)
)
ssl.create_default_context().wrap_socket(
sock=sock.socket(),
# Note: do_handshake_on_connect MUST
# be set to False on the synchronous socket!
# Giambio handles the TLS handshake asynchronously
# and making the SSL library handle it blocks
# the entire event loop. To perform the TLS
# handshake upon connection, set the this
# parameter in the AsyncSocket class instead
do_handshake_on_connect=False,
server_hostname=host,
)
)
print(f"Attempting a connection to {host}:{port}")
await socket.connect((host, port))
print("Connected")
@ -34,18 +36,20 @@ async def test(host: str, port: int, bufsize: int = 4096):
async with socket:
# Closes the socket automatically
print("Entered socket context manager, sending request data")
await socket.send_all(b"""GET / HTTP/1.1\r\nHost: google.com\r\nUser-Agent: owo\r\nAccept: text/html\r\nConnection: keep-alive\r\nAccept: */*\r\n\r\n""")
await socket.send_all(
b"""GET / HTTP/1.1\r\nHost: google.com\r\nUser-Agent: owo\r\nAccept: text/html\r\nConnection: keep-alive\r\nAccept: */*\r\n\r\n"""
)
print("Data sent")
buffer = b""
while not buffer.endswith(b"\r\n\r\n"):
print(f"Requesting up to {bufsize} bytes (current response size: {len(buffer)})")
data = await socket.receive(bufsize)
print(f"Received {len(data)} bytes")
if data:
buffer += data
else:
print("Received empty stream, closing connection")
break
print(f"Requesting up to {bufsize} bytes (current response size: {len(buffer)})")
data = await socket.receive(bufsize)
print(f"Received {len(data)} bytes")
if data:
buffer += data
else:
print("Received empty stream, closing connection")
break
print(f"Request has{' not' if not p.timed_out else ''} timed out!")
if buffer:
data = buffer.decode().split("\r\n")
@ -70,4 +74,3 @@ async def test(host: str, port: int, bufsize: int = 4096):
giambio.run(test, "google.com", 443, 256, debugger=())

View File

@ -1,4 +1,4 @@
## SImple task IPC using giambio's MemoryChannel class
## Simple task IPC using giambio's MemoryChannel class
import random
import string
import giambio

View File

@ -1,3 +1,4 @@
## Simple task IPC using giambio's NetworkChannel class
import random
import string
import giambio

52
tests/timeout3.py Normal file
View File

@ -0,0 +1,52 @@
import giambio
from debugger import Debugger
async def child(name: int):
print(f"[child {name}] Child spawned!! Sleeping for {name} seconds")
await giambio.sleep(name)
print(f"[child {name}] Had a nice nap!")
<<<<<<< HEAD
return name
=======
async def worker(coro, *args):
try:
async with giambio.with_timeout(10):
await coro(*args)
except giambio.exceptions.TooSlowError:
return True
return False
>>>>>>> origin/master
async def main():
start = giambio.clock()
<<<<<<< HEAD
try:
async with giambio.with_timeout(5) as pool:
task = await pool.spawn(child, 2)
print(f"Child has returned: {await task.join()}")
await giambio.sleep(5) # This will trigger the timeout
except giambio.exceptions.TooSlowError:
print("[main] One or more children have timed out!")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
return 12
if __name__ == "__main__":
giambio.run(main, debugger=())
=======
async with giambio.skip_after(10) as pool:
t = await pool.spawn(worker, child, 7)
await giambio.sleep(2)
t2 = await pool.spawn(worker, child, 15)
if any([await t.join(), await t2.join()]):
print("[main] One or more children have timed out!")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":
giambio.run(main, debugger=())
>>>>>>> origin/master