mirror of https://github.com/nocturn9x/giambio.git
Fixed self._data-based methods
This commit is contained in:
parent
0916d9634c
commit
b9ed99e3ee
|
@ -21,7 +21,7 @@ import types
|
||||||
from giambio.task import Task
|
from giambio.task import Task
|
||||||
from timeit import default_timer
|
from timeit import default_timer
|
||||||
from giambio.context import TaskManager
|
from giambio.context import TaskManager
|
||||||
from typing import List, Optional, Any
|
from typing import List, Optional, Any, Dict
|
||||||
from giambio.util.debug import BaseDebugger
|
from giambio.util.debug import BaseDebugger
|
||||||
from giambio.internal import TimeQueue, DeadlinesQueue
|
from giambio.internal import TimeQueue, DeadlinesQueue
|
||||||
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
|
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
|
||||||
|
@ -119,7 +119,7 @@ class AsyncScheduler:
|
||||||
# A heap queue of deadlines to be checked
|
# A heap queue of deadlines to be checked
|
||||||
self.deadlines: DeadlinesQueue = DeadlinesQueue()
|
self.deadlines: DeadlinesQueue = DeadlinesQueue()
|
||||||
# Data to send back to a trap
|
# Data to send back to a trap
|
||||||
self._data: Optional[Any] = None
|
self._data: Dict[Task, Any] = {}
|
||||||
# The I/O skip limit. TODO: Back up this value with euristics
|
# The I/O skip limit. TODO: Back up this value with euristics
|
||||||
self.io_skip_limit = io_skip_limit or 5
|
self.io_skip_limit = io_skip_limit or 5
|
||||||
# The max. I/O timeout
|
# The max. I/O timeout
|
||||||
|
@ -252,7 +252,7 @@ class AsyncScheduler:
|
||||||
task = Task(corofunc.__name__ or str(corofunc), corofunc(*args, **kwargs), pool)
|
task = Task(corofunc.__name__ or str(corofunc), corofunc(*args, **kwargs), pool)
|
||||||
task.next_deadline = pool.timeout or 0.0
|
task.next_deadline = pool.timeout or 0.0
|
||||||
task.joiners = {self.current_task}
|
task.joiners = {self.current_task}
|
||||||
self._data = task
|
self._data[self.current_task] = task
|
||||||
self.tasks.append(task)
|
self.tasks.append(task)
|
||||||
self.run_ready.append(task)
|
self.run_ready.append(task)
|
||||||
self.debugger.on_task_spawn(task)
|
self.debugger.on_task_spawn(task)
|
||||||
|
@ -287,14 +287,10 @@ class AsyncScheduler:
|
||||||
# We perform the deferred cancellation
|
# We perform the deferred cancellation
|
||||||
# if it was previously scheduled
|
# if it was previously scheduled
|
||||||
self.cancel(self.current_task)
|
self.cancel(self.current_task)
|
||||||
# Little boilerplate to send data back to an async trap
|
|
||||||
if self.current_task.status != "init":
|
|
||||||
data = self._data
|
|
||||||
# Run a single step with the calculation (i.e. until a yield
|
# Run a single step with the calculation (i.e. until a yield
|
||||||
# somewhere)
|
# somewhere)
|
||||||
method, *args = self.current_task.run(data)
|
method, *args = self.current_task.run(self._data.get(self.current_task))
|
||||||
if data is self._data:
|
self._data.pop(self.current_task, None)
|
||||||
self._data = None
|
|
||||||
# Some debugging and internal chatter here
|
# Some debugging and internal chatter here
|
||||||
self.current_task.status = "run"
|
self.current_task.status = "run"
|
||||||
self.current_task.steps += 1
|
self.current_task.steps += 1
|
||||||
|
@ -378,7 +374,7 @@ class AsyncScheduler:
|
||||||
'Returns' the current task to an async caller
|
'Returns' the current task to an async caller
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self._data = self.current_task
|
self._data[self.current_task] = self.current_task
|
||||||
self.reschedule_running()
|
self.reschedule_running()
|
||||||
|
|
||||||
def get_current_pool(self):
|
def get_current_pool(self):
|
||||||
|
@ -386,7 +382,7 @@ class AsyncScheduler:
|
||||||
'Returns' the current pool to an async caller
|
'Returns' the current pool to an async caller
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self._data = self.current_pool
|
self._data[self.current_task] = self.current_pool
|
||||||
self.reschedule_running()
|
self.reschedule_running()
|
||||||
|
|
||||||
def get_current_loop(self):
|
def get_current_loop(self):
|
||||||
|
@ -394,7 +390,7 @@ class AsyncScheduler:
|
||||||
'Returns' self to an async caller
|
'Returns' self to an async caller
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self._data = self
|
self._data[self.current_task] = self
|
||||||
self.reschedule_running()
|
self.reschedule_running()
|
||||||
|
|
||||||
def expire_deadlines(self):
|
def expire_deadlines(self):
|
||||||
|
|
|
@ -176,8 +176,9 @@ async def event_wait(event):
|
||||||
|
|
||||||
if event.set:
|
if event.set:
|
||||||
return
|
return
|
||||||
event.waiters.add(await current_task())
|
task = await current_task()
|
||||||
await create_trap("suspend")
|
event.waiters.add(task)
|
||||||
|
await create_trap("suspend", task)
|
||||||
|
|
||||||
|
|
||||||
async def event_set(event):
|
async def event_set(event):
|
||||||
|
|
Loading…
Reference in New Issue