Fix issues with memory channels
This commit is contained in:
parent
2aecb7f440
commit
d77ddcf6a6
117
structio/sync.py
117
structio/sync.py
|
@ -185,86 +185,6 @@ class Queue:
|
|||
self.putters.clear()
|
||||
|
||||
|
||||
class MemorySendChannel(ChannelWriter):
|
||||
"""
|
||||
An in-memory one-way channel to send
|
||||
data
|
||||
"""
|
||||
|
||||
def __init__(self, buffer: Queue):
|
||||
self._buffer = buffer
|
||||
self._closed = False
|
||||
|
||||
@enable_ki_protection
|
||||
async def send(self, value):
|
||||
if self._closed:
|
||||
raise ResourceClosed("cannot operate on a closed channel")
|
||||
await self._buffer.put(value)
|
||||
|
||||
@enable_ki_protection
|
||||
async def close(self):
|
||||
self._closed = True
|
||||
await checkpoint()
|
||||
|
||||
def writers(self):
|
||||
return len(self._buffer.putters)
|
||||
|
||||
|
||||
class MemoryReceiveChannel(ChannelReader):
|
||||
"""
|
||||
An in-memory one-way channel to read
|
||||
data
|
||||
"""
|
||||
|
||||
def __init__(self, buffer):
|
||||
self._buffer = buffer
|
||||
self._closed = False
|
||||
|
||||
@enable_ki_protection
|
||||
async def receive(self):
|
||||
if self._closed:
|
||||
raise ResourceClosed("cannot operate on a closed channel")
|
||||
return await self._buffer.get()
|
||||
|
||||
@enable_ki_protection
|
||||
async def close(self):
|
||||
self._closed = True
|
||||
await checkpoint()
|
||||
|
||||
def pending(self):
|
||||
return bool(self._buffer)
|
||||
|
||||
def readers(self):
|
||||
return len(self._buffer.getters)
|
||||
|
||||
|
||||
class NetworkSendChannel(ChannelWriter):
|
||||
"""
|
||||
A socket-based one-way channel to send
|
||||
data
|
||||
"""
|
||||
|
||||
def __init__(self, buffer):
|
||||
self._buffer = buffer
|
||||
self._closed = False
|
||||
self._write_lock = ThereCanBeOnlyOne("another task is writing on this channel")
|
||||
|
||||
@enable_ki_protection
|
||||
async def send(self, value):
|
||||
if self._closed:
|
||||
raise ResourceClosed("cannot operate on a closed channel")
|
||||
with self._write_lock:
|
||||
await self._buffer.put(value)
|
||||
|
||||
@enable_ki_protection
|
||||
async def close(self):
|
||||
self._closed = True
|
||||
await checkpoint()
|
||||
|
||||
def writers(self):
|
||||
return len(self._buffer.putters)
|
||||
|
||||
|
||||
class MemoryReceiveChannel(ChannelReader):
|
||||
"""
|
||||
An in-memory one-way channel to read
|
||||
|
@ -295,6 +215,36 @@ class MemoryReceiveChannel(ChannelReader):
|
|||
return len(self._buffer.getters)
|
||||
|
||||
|
||||
class MemorySendChannel(ChannelWriter):
|
||||
"""
|
||||
An in-memory one-way channel to send
|
||||
data
|
||||
"""
|
||||
|
||||
def __init__(self, buffer):
|
||||
self._buffer = buffer
|
||||
self._closed = False
|
||||
self._write_lock = ThereCanBeOnlyOne("another task is writing to this channel")
|
||||
|
||||
@enable_ki_protection
|
||||
async def send(self, item):
|
||||
if self._closed:
|
||||
raise ResourceClosed("cannot operate on a closed channel")
|
||||
with self._write_lock:
|
||||
return await self._buffer.put(item)
|
||||
|
||||
@enable_ki_protection
|
||||
async def close(self):
|
||||
self._closed = True
|
||||
await checkpoint()
|
||||
|
||||
def pending(self):
|
||||
return bool(self._buffer)
|
||||
|
||||
def writers(self):
|
||||
return len(self._buffer.putters)
|
||||
|
||||
|
||||
class MemoryChannel(Channel):
|
||||
"""
|
||||
An in-memory, two-way channel between
|
||||
|
@ -302,10 +252,9 @@ class MemoryChannel(Channel):
|
|||
"""
|
||||
|
||||
def __init__(self, buffer_size):
|
||||
self._send_buffer = Queue(buffer_size)
|
||||
self._receive_buffer = Queue(buffer_size)
|
||||
self.reader = MemoryReceiveChannel(self._receive_buffer)
|
||||
self.writer = MemorySendChannel(self._send_buffer)
|
||||
self._buffer = Queue(buffer_size)
|
||||
self.reader = MemoryReceiveChannel(self._buffer)
|
||||
self.writer = MemorySendChannel(self._buffer)
|
||||
|
||||
def pending(self):
|
||||
return self.reader.pending()
|
||||
|
|
Loading…
Reference in New Issue