mirror of https://github.com/nocturn9x/giambio.git
Compare commits
5 Commits
ad34be8754
...
ec9c4cf1c9
Author | SHA1 | Date |
---|---|---|
Mattia Giambirtone | ec9c4cf1c9 | |
Nocturn9x | 66d7c51268 | |
Mattia Giambirtone | 07b6621796 | |
Mattia Giambirtone | 5c05de495d | |
Mattia Giambirtone | 584f762d61 |
|
@ -49,5 +49,5 @@ __all__ = [
|
|||
"skip_after",
|
||||
"task",
|
||||
"io",
|
||||
"socket"
|
||||
"socket",
|
||||
]
|
||||
|
|
|
@ -55,7 +55,7 @@ class TaskManager:
|
|||
self._proper_init = False
|
||||
self.enclosed_pool: Optional["giambio.context.TaskManager"] = None
|
||||
self.raise_on_timeout: bool = raise_on_timeout
|
||||
self.entry_point: Optional[Task] = None
|
||||
self.entry_point: Optional[giambio.Task] = None
|
||||
|
||||
async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task":
|
||||
"""
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
"""
|
||||
|
||||
# Import libraries and internal resources
|
||||
from lib2to3.pytree import Base
|
||||
import types
|
||||
from giambio.task import Task
|
||||
from collections import deque
|
||||
|
@ -298,7 +299,10 @@ class AsyncScheduler:
|
|||
# We need to make sure we don't try to execute
|
||||
# exited tasks that are on the running queue
|
||||
return
|
||||
if not self.current_pool:
|
||||
if self.current_pool:
|
||||
if self.current_task.pool and self.current_task.pool is not self.current_pool:
|
||||
self.current_task.pool.enclosed_pool = self.current_pool
|
||||
else:
|
||||
self.current_pool = self.current_task.pool
|
||||
pool = self.current_pool
|
||||
while pool:
|
||||
|
@ -462,7 +466,8 @@ class AsyncScheduler:
|
|||
|
||||
for task in tasks:
|
||||
self.paused.discard(task)
|
||||
self.suspended.remove(task)
|
||||
if task in self.suspended:
|
||||
self.suspended.remove(task)
|
||||
self.run_ready.extend(tasks)
|
||||
self.reschedule_running()
|
||||
|
||||
|
@ -493,7 +498,7 @@ class AsyncScheduler:
|
|||
:return: The closest deadline according to our clock
|
||||
:rtype: float
|
||||
"""
|
||||
|
||||
|
||||
if not self.deadlines:
|
||||
# If there are no deadlines just wait until the first task wakeup
|
||||
timeout = max(0.0, self.paused.get_closest_deadline() - self.clock())
|
||||
|
@ -616,8 +621,9 @@ class AsyncScheduler:
|
|||
If ensure_done equals False, the loop will cancel ALL
|
||||
running and scheduled tasks and then tear itself down.
|
||||
If ensure_done equals True, which is the default behavior,
|
||||
this method will raise a GiambioError if the loop hasn't
|
||||
finished running.
|
||||
this method will raise a GiambioError exception if the loop
|
||||
hasn't finished running. The state of the event loop is reset
|
||||
so it can be reused with another run() call
|
||||
"""
|
||||
|
||||
if ensure_done:
|
||||
|
@ -644,18 +650,11 @@ class AsyncScheduler:
|
|||
|
||||
if task.pool and task.pool.enclosed_pool and not task.pool.enclosed_pool.done():
|
||||
return
|
||||
for t in task.joiners:
|
||||
if t not in self.run_ready:
|
||||
# Since a task can be the parent
|
||||
# of multiple children, we need to
|
||||
# make sure we reschedule it only
|
||||
# once, otherwise a RuntimeError will
|
||||
# occur
|
||||
self.run_ready.append(t)
|
||||
self.run_ready.extend(task.joiners)
|
||||
|
||||
def join(self, task: Task):
|
||||
"""
|
||||
Joins a task to its callers (implicitly, the parent
|
||||
Joins a task to its callers (implicitly the parent
|
||||
task, but also every other task who called await
|
||||
task.join() on the task object)
|
||||
"""
|
||||
|
@ -668,8 +667,9 @@ class AsyncScheduler:
|
|||
self.io_release_task(task)
|
||||
if task in self.suspended:
|
||||
self.suspended.remove(task)
|
||||
# If the pool has finished executing or we're at the first parent
|
||||
# task that kicked the loop, we can safely reschedule the parent(s)
|
||||
# If the pool (including any enclosing pools) has finished executing
|
||||
# or we're at the first task that kicked the loop, we can safely
|
||||
# reschedule the parent(s)
|
||||
if task.pool is None:
|
||||
return
|
||||
if task.pool.done():
|
||||
|
@ -680,7 +680,7 @@ class AsyncScheduler:
|
|||
task.status = "crashed"
|
||||
if task.exc.__traceback__:
|
||||
# TODO: We might want to do a bit more complex traceback hacking to remove any extra
|
||||
# frames from the exception call stack, but for now removing at least the first one
|
||||
# frames from the exception call stack, but for now removing at least the first few
|
||||
# seems a sensible approach (it's us catching it so we don't care about that)
|
||||
for _ in range(5):
|
||||
if task.exc.__traceback__.tb_next:
|
||||
|
|
|
@ -44,6 +44,13 @@ class TimeQueue:
|
|||
self.sequence = 0
|
||||
self.container: List[Tuple[float, int, Task]] = []
|
||||
|
||||
def __len__(self):
|
||||
"""
|
||||
Returns len(self)
|
||||
"""
|
||||
|
||||
return len(self.container)
|
||||
|
||||
def __contains__(self, item: Task):
|
||||
"""
|
||||
Implements item in self. This method behaves
|
||||
|
@ -263,6 +270,13 @@ class DeadlinesQueue:
|
|||
|
||||
return f"DeadlinesQueue({self.container})"
|
||||
|
||||
def __len__(self):
|
||||
"""
|
||||
Returns len(self)
|
||||
"""
|
||||
|
||||
return len(self.container)
|
||||
|
||||
def put(self, pool: "giambio.context.TaskManager"):
|
||||
"""
|
||||
Pushes a pool with its deadline onto the queue. The
|
||||
|
@ -273,7 +287,7 @@ class DeadlinesQueue:
|
|||
:param pool: The pool object to store
|
||||
"""
|
||||
|
||||
if pool and pool not in self.pools and not pool.done() and not pool.timed_out and pool.timeout:
|
||||
if pool and pool not in self.pools and not pool.timed_out and pool.timeout:
|
||||
self.pools.add(pool)
|
||||
heappush(self.container, (pool.timeout, self.sequence, pool))
|
||||
self.sequence += 1
|
||||
|
|
|
@ -246,7 +246,7 @@ class AsyncSocket:
|
|||
await want_write(self.sock)
|
||||
except WantRead:
|
||||
await want_read(self.sock)
|
||||
|
||||
|
||||
async def getpeername(self):
|
||||
"""
|
||||
Wrapper socket method
|
||||
|
|
|
@ -95,19 +95,16 @@ def create_pool():
|
|||
return TaskManager()
|
||||
|
||||
|
||||
|
||||
def with_timeout(timeout: int or float):
|
||||
"""
|
||||
Creates an async pool with an associated timeout
|
||||
"""
|
||||
|
||||
assert timeout > 0, "The timeout must be greater than 0"
|
||||
mgr = TaskManager(timeout)
|
||||
loop = get_event_loop()
|
||||
mgr = TaskManager(timeout)
|
||||
if loop.current_task is not loop.entry_point:
|
||||
mgr.tasks.append(loop.current_task)
|
||||
if loop.current_pool and loop.current_pool is not mgr:
|
||||
loop.current_pool.enclosed_pool = mgr
|
||||
if loop.current_task is loop.entry_point:
|
||||
loop.current_pool = mgr
|
||||
return mgr
|
||||
|
||||
|
||||
|
@ -119,11 +116,8 @@ def skip_after(timeout: int or float):
|
|||
"""
|
||||
|
||||
assert timeout > 0, "The timeout must be greater than 0"
|
||||
mgr = TaskManager(timeout, False)
|
||||
loop = get_event_loop()
|
||||
mgr = TaskManager(timeout, False)
|
||||
if loop.current_task is not loop.entry_point:
|
||||
mgr.tasks.append(loop.current_task)
|
||||
if loop.current_pool and loop.current_pool is not mgr:
|
||||
loop.current_pool.enclosed_pool = mgr
|
||||
if loop.current_task is loop.entry_point:
|
||||
loop.current_pool = mgr
|
||||
return mgr
|
||||
|
||||
|
|
|
@ -54,8 +54,8 @@ class Task:
|
|||
# when the task has been created but not started running yet--, "run"-- when
|
||||
# the task is running synchronous code--, "io"-- when the task is waiting on
|
||||
# an I/O resource--, "sleep"-- when the task is either asleep, waiting on
|
||||
# an event or otherwise suspended, "crashed"-- when the task has exited because
|
||||
# of an exception and "cancelled" when-- when the task has been explicitly cancelled
|
||||
# an event or otherwise suspended, "crashed"-- when the task has exited because
|
||||
# of an exception and "cancelled" when-- when the task has been explicitly cancelled
|
||||
# with its cancel() method or as a result of an exception
|
||||
status: str = "init"
|
||||
# This attribute counts how many times the task's run() method has been called
|
||||
|
@ -112,7 +112,6 @@ class Task:
|
|||
self.joiners.add(task)
|
||||
return await giambio.traps.join(self)
|
||||
|
||||
|
||||
async def cancel(self):
|
||||
"""
|
||||
Cancels the task
|
||||
|
|
|
@ -24,8 +24,7 @@ limitations under the License.
|
|||
import types
|
||||
import inspect
|
||||
from giambio.task import Task
|
||||
from types import FunctionType
|
||||
from typing import Any, Union, Iterable
|
||||
from typing import Any, Union, Iterable, Coroutine, Callable
|
||||
from giambio.exceptions import GiambioError
|
||||
|
||||
|
||||
|
@ -49,7 +48,7 @@ async def suspend() -> Any:
|
|||
return await create_trap("suspend")
|
||||
|
||||
|
||||
async def create_task(coro: FunctionType, pool, *args):
|
||||
async def create_task(coro: Callable[..., Coroutine[Any, Any, Any]], pool, *args):
|
||||
"""
|
||||
Spawns a new task in the current event loop from a bare coroutine
|
||||
function. All extra positional arguments are passed to the function
|
||||
|
|
|
@ -11,10 +11,11 @@ async def child(name: int):
|
|||
async def main():
|
||||
start = giambio.clock()
|
||||
async with giambio.create_pool() as pool:
|
||||
await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately!
|
||||
# await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately!
|
||||
task = await pool.spawn(child, 2)
|
||||
await task.cancel()
|
||||
print("[main] Children spawned, awaiting completion")
|
||||
await task.cancel()
|
||||
print("[main] Second child cancelled")
|
||||
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
|
||||
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ async def receiver(sock: giambio.socket.AsyncSocket, q: giambio.Queue):
|
|||
data, rest = data.split(b"\n", maxsplit=2)
|
||||
buffer = b"".join(rest)
|
||||
await q.put((1, data.decode()))
|
||||
data = buffer
|
||||
data = buffer
|
||||
|
||||
|
||||
async def main(host: Tuple[str, int]):
|
||||
|
|
|
@ -23,15 +23,15 @@ async def serve(bind_address: tuple):
|
|||
logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}")
|
||||
async with giambio.create_pool() as pool:
|
||||
async with sock:
|
||||
while True:
|
||||
try:
|
||||
conn, address_tuple = await sock.accept()
|
||||
clients.append(conn)
|
||||
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
|
||||
await pool.spawn(handler, conn, address_tuple)
|
||||
except Exception as err:
|
||||
# Because exceptions just *work*
|
||||
logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}")
|
||||
while True:
|
||||
try:
|
||||
conn, address_tuple = await sock.accept()
|
||||
clients.append(conn)
|
||||
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
|
||||
await pool.spawn(handler, conn, address_tuple)
|
||||
except Exception as err:
|
||||
# Because exceptions just *work*
|
||||
logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}")
|
||||
|
||||
|
||||
async def handler(sock: AsyncSocket, client_address: tuple):
|
||||
|
|
|
@ -20,14 +20,14 @@ async def serve(bind_address: tuple):
|
|||
logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}")
|
||||
async with giambio.create_pool() as pool:
|
||||
async with sock:
|
||||
while True:
|
||||
try:
|
||||
conn, address_tuple = await sock.accept()
|
||||
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
|
||||
await pool.spawn(handler, conn, address_tuple)
|
||||
except Exception as err:
|
||||
# Because exceptions just *work*
|
||||
logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}")
|
||||
while True:
|
||||
try:
|
||||
conn, address_tuple = await sock.accept()
|
||||
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
|
||||
await pool.spawn(handler, conn, address_tuple)
|
||||
except Exception as err:
|
||||
# Because exceptions just *work*
|
||||
logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}")
|
||||
|
||||
|
||||
async def handler(sock: AsyncSocket, client_address: tuple):
|
||||
|
|
|
@ -15,7 +15,9 @@ async def child(ev: giambio.Event, pause: int):
|
|||
await giambio.sleep(pause)
|
||||
end_sleep = giambio.clock() - start_sleep
|
||||
end_total = giambio.clock() - start_total
|
||||
print(f"[child] Done! Slept for {end_total:.2f} seconds total ({end_pause:.2f} waiting, {end_sleep:.2f} sleeping), nice nap!")
|
||||
print(
|
||||
f"[child] Done! Slept for {end_total:.2f} seconds total ({end_pause:.2f} waiting, {end_sleep:.2f} sleeping), nice nap!"
|
||||
)
|
||||
|
||||
|
||||
async def parent(pause: int = 1):
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
from debugger import Debugger
|
||||
import giambio
|
||||
|
||||
|
||||
async def child(start: int, limit: int, q: giambio.Queue, l: giambio.Lock):
|
||||
async with l: # If you comment this line, the resulting queue will
|
||||
# be all mixed up!
|
||||
# Locked! If any other task
|
||||
# tries to acquire the lock,
|
||||
# they'll wait for us to finish
|
||||
while start <= limit:
|
||||
print(f"[locked ({limit})] {start}")
|
||||
await q.put(start)
|
||||
start += 1
|
||||
await giambio.sleep(1)
|
||||
|
||||
|
||||
async def other(stop: int):
|
||||
# Other tasks are unaffected
|
||||
# by the lock if they don't
|
||||
# acquire it
|
||||
n = stop
|
||||
while n:
|
||||
print(f"[unlocked ({stop})] {n}")
|
||||
n -= 1
|
||||
await giambio.sleep(1)
|
||||
|
||||
|
||||
async def main():
|
||||
queue = giambio.Queue()
|
||||
lock = giambio.Lock()
|
||||
async with giambio.create_pool() as pool:
|
||||
await pool.spawn(child, 1, 5, queue, lock)
|
||||
await pool.spawn(child, 6, 10, queue, lock)
|
||||
await pool.spawn(other, 10)
|
||||
await pool.spawn(other, 5)
|
||||
print(f"Result: {queue}") # Queue is ordered!
|
||||
print("[main] Done")
|
||||
|
||||
|
||||
giambio.run(main, debugger=())
|
|
@ -36,7 +36,7 @@ async def main():
|
|||
print("[main] First 2 children spawned, awaiting completion")
|
||||
async with giambio.create_pool() as new_pool:
|
||||
# This pool will be cancelled by the exception
|
||||
# in the other pool
|
||||
# in the outer pool
|
||||
await new_pool.spawn(child2)
|
||||
await new_pool.spawn(child3)
|
||||
print("[main] Third and fourth children spawned")
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
## Example for nested task pools
|
||||
import giambio
|
||||
from debugger import Debugger
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
## Two-way proxy example stolen from njsmith's talk about trio
|
||||
|
||||
from debugger import Debugger
|
||||
import giambio
|
||||
import socket
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
## Producer-consumer code using giambio's async queue
|
||||
import giambio
|
||||
from debugger import Debugger
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import time
|
|||
|
||||
_print = print
|
||||
|
||||
|
||||
def print(*args, **kwargs):
|
||||
sys.stdout.write(f"[{time.strftime('%H:%M:%S')}] ")
|
||||
_print(*args, **kwargs)
|
||||
|
@ -14,18 +15,19 @@ def print(*args, **kwargs):
|
|||
|
||||
async def test(host: str, port: int, bufsize: int = 4096):
|
||||
socket = giambio.socket.wrap_socket(
|
||||
ssl.create_default_context().wrap_socket(
|
||||
sock=sock.socket(),
|
||||
# Note: do_handshake_on_connect MUST
|
||||
# be set to False on the synchronous socket!
|
||||
# Giambio handles the TLS handshake asynchronously
|
||||
# and making the SSL library handle it blocks
|
||||
# the entire event loop. To perform the TLS
|
||||
# handshake upon connection, set the this
|
||||
# parameter in the AsyncSocket class instead
|
||||
do_handshake_on_connect=False,
|
||||
server_hostname=host)
|
||||
)
|
||||
ssl.create_default_context().wrap_socket(
|
||||
sock=sock.socket(),
|
||||
# Note: do_handshake_on_connect MUST
|
||||
# be set to False on the synchronous socket!
|
||||
# Giambio handles the TLS handshake asynchronously
|
||||
# and making the SSL library handle it blocks
|
||||
# the entire event loop. To perform the TLS
|
||||
# handshake upon connection, set the this
|
||||
# parameter in the AsyncSocket class instead
|
||||
do_handshake_on_connect=False,
|
||||
server_hostname=host,
|
||||
)
|
||||
)
|
||||
print(f"Attempting a connection to {host}:{port}")
|
||||
await socket.connect((host, port))
|
||||
print("Connected")
|
||||
|
@ -34,18 +36,20 @@ async def test(host: str, port: int, bufsize: int = 4096):
|
|||
async with socket:
|
||||
# Closes the socket automatically
|
||||
print("Entered socket context manager, sending request data")
|
||||
await socket.send_all(b"""GET / HTTP/1.1\r\nHost: google.com\r\nUser-Agent: owo\r\nAccept: text/html\r\nConnection: keep-alive\r\nAccept: */*\r\n\r\n""")
|
||||
await socket.send_all(
|
||||
b"""GET / HTTP/1.1\r\nHost: google.com\r\nUser-Agent: owo\r\nAccept: text/html\r\nConnection: keep-alive\r\nAccept: */*\r\n\r\n"""
|
||||
)
|
||||
print("Data sent")
|
||||
buffer = b""
|
||||
while not buffer.endswith(b"\r\n\r\n"):
|
||||
print(f"Requesting up to {bufsize} bytes (current response size: {len(buffer)})")
|
||||
data = await socket.receive(bufsize)
|
||||
print(f"Received {len(data)} bytes")
|
||||
if data:
|
||||
buffer += data
|
||||
else:
|
||||
print("Received empty stream, closing connection")
|
||||
break
|
||||
print(f"Requesting up to {bufsize} bytes (current response size: {len(buffer)})")
|
||||
data = await socket.receive(bufsize)
|
||||
print(f"Received {len(data)} bytes")
|
||||
if data:
|
||||
buffer += data
|
||||
else:
|
||||
print("Received empty stream, closing connection")
|
||||
break
|
||||
print(f"Request has{' not' if not p.timed_out else ''} timed out!")
|
||||
if buffer:
|
||||
data = buffer.decode().split("\r\n")
|
||||
|
@ -70,4 +74,3 @@ async def test(host: str, port: int, bufsize: int = 4096):
|
|||
|
||||
|
||||
giambio.run(test, "google.com", 443, 256, debugger=())
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
## SImple task IPC using giambio's MemoryChannel class
|
||||
## Simple task IPC using giambio's MemoryChannel class
|
||||
import random
|
||||
import string
|
||||
import giambio
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
## Simple task IPC using giambio's NetworkChannel class
|
||||
import random
|
||||
import string
|
||||
import giambio
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
import giambio
|
||||
from debugger import Debugger
|
||||
|
||||
|
||||
async def child(name: int):
|
||||
print(f"[child {name}] Child spawned!! Sleeping for {name} seconds")
|
||||
await giambio.sleep(name)
|
||||
print(f"[child {name}] Had a nice nap!")
|
||||
<<<<<<< HEAD
|
||||
return name
|
||||
=======
|
||||
|
||||
|
||||
async def worker(coro, *args):
|
||||
try:
|
||||
async with giambio.with_timeout(10):
|
||||
await coro(*args)
|
||||
except giambio.exceptions.TooSlowError:
|
||||
return True
|
||||
return False
|
||||
>>>>>>> origin/master
|
||||
|
||||
|
||||
async def main():
|
||||
start = giambio.clock()
|
||||
<<<<<<< HEAD
|
||||
try:
|
||||
async with giambio.with_timeout(5) as pool:
|
||||
task = await pool.spawn(child, 2)
|
||||
print(f"Child has returned: {await task.join()}")
|
||||
await giambio.sleep(5) # This will trigger the timeout
|
||||
except giambio.exceptions.TooSlowError:
|
||||
print("[main] One or more children have timed out!")
|
||||
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
|
||||
return 12
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
giambio.run(main, debugger=())
|
||||
=======
|
||||
async with giambio.skip_after(10) as pool:
|
||||
t = await pool.spawn(worker, child, 7)
|
||||
await giambio.sleep(2)
|
||||
t2 = await pool.spawn(worker, child, 15)
|
||||
if any([await t.join(), await t2.join()]):
|
||||
print("[main] One or more children have timed out!")
|
||||
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
giambio.run(main, debugger=())
|
||||
>>>>>>> origin/master
|
Loading…
Reference in New Issue