Updated I/O mechanism and task synchronization primitives

This commit is contained in:
Mattia Giambirtone 2022-10-19 15:28:06 +02:00
parent 8f3d7056b7
commit 99656eedbe
2 changed files with 28 additions and 23 deletions

View File

@ -151,8 +151,8 @@ class FIFOKernel:
def wait_io(self):
"""
Waits for I/O and implements part of the sleeping mechanism
for the event loop
Waits for I/O and schedules tasks when their
associated resource is ready to be used
"""
before_time = self.clock() # Used for the debugger
@ -168,7 +168,7 @@ class FIFOKernel:
io_ready = self.selector.select(timeout)
# Get sockets that are ready and schedule their tasks
for key, _ in io_ready:
self.run_ready.extend(key.data) # Resource ready? Schedule its tasks
self.run_ready.append(key.data) # Resource ready? Schedule its task
self.debugger.after_io(self.clock() - before_time)
def awake_tasks(self):
@ -207,7 +207,8 @@ class FIFOKernel:
Suspends execution of the current task. This is basically
a do-nothing method, since it will not reschedule the task
before returning. The task will stay suspended as long as
something else outside the loop reschedules it
something else outside the loop reschedules it (possibly
forever)
"""
self.current_task.state = TaskState.PAUSED
@ -336,12 +337,9 @@ class FIFOKernel:
for each I/O resource the given task owns
"""
for key in dict(self.selector.get_map()).values():
if task in key.data:
key.data.remove(task)
if not key.data:
self.notify_closing(key.fileobj, broken=True)
self.selector.unregister(key.fileobj)
for key in filter(lambda k: k.data == task, dict(self.selector.get_map()).values()):
self.notify_closing(key.fileobj, broken=True)
self.selector.unregister(key.fileobj)
task.last_io = ()
def notify_closing(self, stream, broken: bool = False):
@ -551,14 +549,21 @@ class FIFOKernel:
try:
self.selector.register(resource, evt_type, [self.current_task])
except KeyError:
# The resource is already registered doing something else: we try
# to see if we can modify the event
# The stream is already being used
key = self.selector.get_key(resource)
if evt_type != key.events:
self.selector.modify(
resource, evt_type | key.events, key.data + [self.current_task]
)
# If we get here, two tasks are trying to read or write on the same resource at the same time
raise ResourceBusy(
"The given resource is being read from/written to from another task"
)
if key.data == self.current_task or evt_type != key.events:
# If the task that registered the stream
# changed their mind on what they want
# to do with it, who are we to deny their
# request? We also modify the the event in
# our selector so that one task can read
# off a given stream while another one is
# writing to it
self.selector.modify(resource, evt_type, self.current_task)
else:
# One task reading and one writing on the same
# resource is fine (think producer-consumer),
# but having two tasks reading/writing at the
# same time can't lead to anything good, better
# disallow it
self.current_task.throw(ResourceBusy(f"The resource is being read from/written by another task"))

View File

@ -123,7 +123,7 @@ class Queue:
if not self.maxsize or len(self.container) < self.maxsize:
self.container.append(item)
if self.getters:
await self.getters.popleft().trigger(self.container.popleft())
await self.getters.popleft().trigger()
else:
ev = Event()
self.putters.append(ev)
@ -140,11 +140,11 @@ class Queue:
if self.container:
if self.putters:
await self.putters.popleft().trigger()
return self.container.popleft()
else:
ev = Event()
self.getters.append(ev)
return await ev.wait()
await ev.wait()
return self.container.popleft()
async def clear(self):
"""