mirror of https://github.com/nocturn9x/giambio.git
Added experimental network channels
This commit is contained in:
parent
ed6aba490f
commit
e76998f29f
|
@ -92,7 +92,7 @@ class TaskManager:
|
||||||
if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout:
|
if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout:
|
||||||
return True
|
return True
|
||||||
except giambio.exceptions.TooSlowError:
|
except giambio.exceptions.TooSlowError:
|
||||||
if not self.raise_on_timeout:
|
if self.raise_on_timeout:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def cancel(self):
|
async def cancel(self):
|
||||||
|
|
|
@ -445,7 +445,8 @@ class AsyncScheduler:
|
||||||
for task in pool.tasks:
|
for task in pool.tasks:
|
||||||
self.join(task)
|
self.join(task)
|
||||||
self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point)))
|
self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point)))
|
||||||
|
if pool.entry_point is self.entry_point:
|
||||||
|
self.run_ready.append(self.entry_point)
|
||||||
|
|
||||||
def schedule_tasks(self, tasks: List[Task]):
|
def schedule_tasks(self, tasks: List[Task]):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -15,12 +15,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
See the License for the specific language governing permissions and
|
See the License for the specific language governing permissions and
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
"""
|
"""
|
||||||
|
from socket import socketpair
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
import abc
|
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
from giambio.traps import event_wait, event_set
|
from giambio.traps import event_wait, event_set
|
||||||
from giambio.exceptions import GiambioError
|
from giambio.exceptions import GiambioError
|
||||||
|
from giambio.socket import wrap_socket
|
||||||
|
|
||||||
|
|
||||||
class Event:
|
class Event:
|
||||||
|
@ -258,5 +259,69 @@ class MemoryChannel(Channel):
|
||||||
class NetworkChannel(Channel):
|
class NetworkChannel(Channel):
|
||||||
"""
|
"""
|
||||||
A two-way communication channel between tasks based
|
A two-way communication channel between tasks based
|
||||||
on giambio's I/O mechanisms. This variant of a channel
|
on giambio's I/O mechanisms instead of in-memory queues
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""
|
||||||
|
Public object constructor
|
||||||
|
"""
|
||||||
|
|
||||||
|
super().__init__(None)
|
||||||
|
# We use a socket as our buffer instead of a queue
|
||||||
|
sockets = socketpair()
|
||||||
|
self.reader = wrap_socket(sockets[0])
|
||||||
|
self.writer = wrap_socket(sockets[1])
|
||||||
|
self._internal_buffer = b""
|
||||||
|
|
||||||
|
|
||||||
|
async def write(self, data: bytes):
|
||||||
|
"""
|
||||||
|
Writes data to the channel. Blocks if the internal
|
||||||
|
socket is not currently available. Does nothing
|
||||||
|
if the channel has been closed
|
||||||
|
"""
|
||||||
|
|
||||||
|
if self.closed:
|
||||||
|
return
|
||||||
|
await self.writer.send_all(data)
|
||||||
|
|
||||||
|
async def read(self, size: int):
|
||||||
|
"""
|
||||||
|
Reads exactly size bytes from the channel. Blocks until
|
||||||
|
enough data arrives. Extra data is cached and used on the
|
||||||
|
next read
|
||||||
|
"""
|
||||||
|
|
||||||
|
data = self._internal_buffer
|
||||||
|
while len(data) < size:
|
||||||
|
data += await self.reader.receive(size)
|
||||||
|
self._internal_buffer = data[size:]
|
||||||
|
data = data[:size]
|
||||||
|
return data
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
"""
|
||||||
|
Closes the memory channel. Any underlying
|
||||||
|
data is flushed out of the internal socket
|
||||||
|
and is lost
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.closed = True
|
||||||
|
await self.reader.close()
|
||||||
|
await self.writer.close()
|
||||||
|
|
||||||
|
async def pending(self):
|
||||||
|
"""
|
||||||
|
Returns if there's pending
|
||||||
|
data to be read
|
||||||
|
"""
|
||||||
|
|
||||||
|
# TODO: Ugly!
|
||||||
|
if self.closed:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
self._internal_buffer += self.reader.sock.recv(1)
|
||||||
|
except BlockingIOError:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
import giambio
|
||||||
|
from debugger import Debugger
|
||||||
|
|
||||||
|
|
||||||
|
async def task(c: giambio.NetworkChannel, name: str):
|
||||||
|
while True:
|
||||||
|
if await c.pending():
|
||||||
|
print(f"[{name}] Received {(await c.read(8)).decode()!r}")
|
||||||
|
else:
|
||||||
|
data = "".join(random.choice(string.ascii_letters) for _ in range(8))
|
||||||
|
print(f"[{name}] Sending {data!r}")
|
||||||
|
await c.write(data.encode())
|
||||||
|
await giambio.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
async def main(channel: giambio.NetworkChannel, delay: int):
|
||||||
|
print(f"[main] Spawning workers, exiting in {delay} seconds")
|
||||||
|
async with giambio.skip_after(delay) as pool:
|
||||||
|
await pool.spawn(task, channel, "one")
|
||||||
|
await pool.spawn(task, channel, "two")
|
||||||
|
await channel.close()
|
||||||
|
print(f"[main] Operation complete, channel closed")
|
||||||
|
if await channel.pending():
|
||||||
|
print(f"[main] Channel has leftover data, clearing it")
|
||||||
|
while await channel.pending():
|
||||||
|
print(f"[main] Cleared {await channel.read(1)!r}")
|
||||||
|
|
||||||
|
|
||||||
|
channel = giambio.NetworkChannel()
|
||||||
|
giambio.run(main, channel, 4, debugger=())
|
Loading…
Reference in New Issue