structio/tests/processes.py

34 lines
1.1 KiB
Python

import structio
import subprocess
import shlex
# In the interest of compatibility, structio.parallel
# tries to mirror the subprocess module. You can even
# pass the constants such as DEVNULL, PIPE, etc. to it
# and it'll work
async def main(data: str):
cmd = shlex.split("python -c 'print(input())'")
to_send = data.encode(errors="ignore")
# This will print data to stdout
await structio.parallel.run(cmd, input=to_send)
# Other option
out = await structio.parallel.check_output(cmd, input=to_send)
# Thanks to Linux, Mac OS X and Windows all using different
# line endings, we have to do this abomination
out = out.decode().rstrip("\r").rstrip("\r\n").rstrip("\n")
assert out == data
# Other, other option :D
process = structio.parallel.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
# Note that the process is spawned as soon as the object is
# created!
out, _ = await process.communicate(to_send)
out = out.decode().rstrip("\r").rstrip("\r\n").rstrip("\n")
assert out == data
structio.run(main, "owo")