structio/tests/processes.py

23 lines
618 B
Python

import structio
import subprocess
import shlex
async def main(data: str):
cmd = shlex.split("python3 -c 'print(input())'")
data = data.encode(errors="ignore")
# This will print data to stdout
await structio.parallel.run(cmd, input=data)
# Other option
out = await structio.parallel.check_output(cmd, input=data)
print(out.rstrip(b"\n") == data)
# Other, other option :D
process = structio.parallel.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
out, _ = await process.communicate(data)
print(out.rstrip(b"\n") == data)
structio.run(main, "owo")