Fixups and minor changes. Added (failing) two-way proxy test. Needs debugging
This commit is contained in:
parent
73a8f3b8e3
commit
90e182c28b
|
@ -51,14 +51,18 @@ class DefaultKernel(BaseKernel):
|
|||
tools,
|
||||
restrict_ki_to_checkpoints,
|
||||
)
|
||||
# Tasks that are run when the entry point
|
||||
# exits correctly
|
||||
self.shutdown_tasks: deque[
|
||||
tuple[Any, Callable[[Any, Any], Coroutine[Any, Any, Any]], tuple[Any, ...]]
|
||||
] = deque()
|
||||
# Internal identifier generator for shutdown tasks
|
||||
self._shutdown_task_ident = count(0)
|
||||
# Data to send back to tasks
|
||||
self.data: dict[Task, Any] = {}
|
||||
# Have we handled SIGINT?
|
||||
self._sigint_handled: bool = False
|
||||
# Pool where the entry point and system tasks run
|
||||
self.pool = TaskPool()
|
||||
self.current_scope = self.pool.scope
|
||||
self._shutting_down = False
|
||||
|
@ -408,19 +412,20 @@ class DefaultKernel(BaseKernel):
|
|||
self.io_manager.release_task(task)
|
||||
self.policy.discard(task)
|
||||
|
||||
def _reschedule_scope_tree(self, scope: TaskScope):
|
||||
# Walk up the scope tree and reschedule all necessary
|
||||
# tasks
|
||||
while scope and scope.done() and scope is not self.pool.scope:
|
||||
self.reschedule(scope.owner)
|
||||
scope = scope.outer
|
||||
|
||||
def on_success(self, task: Task):
|
||||
"""
|
||||
The given task has exited gracefully: hooray!
|
||||
"""
|
||||
|
||||
assert task.state == TaskState.FINISHED
|
||||
# Walk up the scope tree and reschedule all necessary
|
||||
# tasks
|
||||
scope = task.pool.scope
|
||||
while scope and scope.done() and scope is not self.pool.scope:
|
||||
if scope.done():
|
||||
self.reschedule(scope.owner)
|
||||
scope = scope.outer
|
||||
self._reschedule_scope_tree(task.scope)
|
||||
self.event("on_task_exit", task)
|
||||
self.release(task)
|
||||
|
||||
|
@ -440,11 +445,12 @@ class DefaultKernel(BaseKernel):
|
|||
self.close(force=True)
|
||||
raise task.exc from StructIOException(f"system task {task} crashed")
|
||||
scope = task.scope
|
||||
# self._reschedule_scope_tree(scope)
|
||||
self.release(task)
|
||||
self.cancel_scope(scope)
|
||||
if task is not scope.owner:
|
||||
self.throw(scope.owner, task.exc)
|
||||
self.reschedule(scope.owner)
|
||||
# self.reschedule(scope.owner)
|
||||
|
||||
def on_cancel(self, task: Task):
|
||||
"""
|
||||
|
@ -453,6 +459,7 @@ class DefaultKernel(BaseKernel):
|
|||
"""
|
||||
|
||||
assert task.state == TaskState.CANCELLED
|
||||
# self._reschedule_scope_tree(task.scope)
|
||||
self.event("after_cancel", task)
|
||||
self.release(task)
|
||||
|
||||
|
@ -474,8 +481,9 @@ class DefaultKernel(BaseKernel):
|
|||
return
|
||||
if task.state == TaskState.RUNNING:
|
||||
# Can't cancel a task while it's
|
||||
# running, will raise ValueError
|
||||
# if we try, so we defer it for later
|
||||
# running: will raise ValueError
|
||||
# if we try, so we defer it for
|
||||
# later
|
||||
task.pending_cancellation = True
|
||||
return
|
||||
err = Cancelled()
|
||||
|
|
|
@ -82,8 +82,6 @@ def socketpair(
|
|||
family = _socket.AF_INET
|
||||
a, b = _socket.socketpair(family, type, proto)
|
||||
result = AsyncSocket(a), AsyncSocket(b)
|
||||
result[0].connected = True
|
||||
result[1].connected = True
|
||||
return result
|
||||
|
||||
|
||||
|
@ -409,7 +407,7 @@ class AsyncSocket(AsyncResource):
|
|||
# release the actual fd, so we
|
||||
# save it separately
|
||||
fd = self._fd
|
||||
self._fd = -1
|
||||
self._fd = FdWrapper(-1)
|
||||
await closing(fd)
|
||||
await release(fd)
|
||||
self.socket.close()
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
# Love you njsmith <3
|
||||
import socket
|
||||
import structio
|
||||
|
||||
|
||||
async def proxy_one_way(source, sink: structio.AsyncSocket):
|
||||
while True:
|
||||
data = await source.receive(1024)
|
||||
if not data:
|
||||
await sink.shutdown(socket.SHUT_WR)
|
||||
break
|
||||
await sink.send_all(data)
|
||||
|
||||
|
||||
async def proxy_two_way(a, b: structio.AsyncSocket):
|
||||
async with structio.create_pool() as pool:
|
||||
pool.spawn(proxy_one_way, a, b)
|
||||
pool.spawn(proxy_one_way, b, a)
|
||||
|
||||
|
||||
async def main():
|
||||
with structio.skip_after(10):
|
||||
a = await structio.socket.connect_tcp_socket("localhost", 12345)
|
||||
b = await structio.socket.connect_tcp_socket("localhost", 54321)
|
||||
async with a, b:
|
||||
await proxy_two_way(a, b)
|
||||
print("all done!")
|
||||
|
||||
|
||||
structio.run(main, tools=[structio.util.debug.SimpleDebugger()])
|
Loading…
Reference in New Issue