structio/tests/files.py

28 lines
921 B
Python

import structio
import tempfile
import os
async def main():
await structio.aprint("[main] Threaded async I/O is working!")
t = structio.clock()
stuff = await structio.ainput("Type something: ")
await structio.aprint(f"[main] Output from ainput(): {stuff}")
print(f"[main] Exited in {structio.clock() - t:.2f} seconds")
async def main_2(data: bytes):
t = structio.clock()
async with await structio.open_file(os.path.join(tempfile.gettempdir(), "structio_test.txt"), "wb+") as f:
print(f"[main] Opened async file {f.name!r}, writing payload of {len(data)} bytes")
await f.write(data)
await f.seek(0)
assert await f.read(len(data)) == data
print(f"[main] Deleting {f.name!r}")
await structio.thread.run_in_worker(os.unlink, f.name)
print(f"[main] Done in {structio.clock() - t:.2f} seconds")
structio.run(main)
structio.run(main_2, b"a" * 10000)