structio/tests/files.py

33 lines
1.1 KiB
Python

import structio
import tempfile
import os
async def main():
await structio.aprint("[main] Threaded async I/O is working!")
t = structio.clock()
stuff = await structio.ainput("Type something: ")
await structio.aprint(f"[main] Output from ainput(): {stuff}")
print(f"[main] Exited in {structio.clock() - t:.2f} seconds")
async def main_2(data: bytes):
t = structio.clock()
async with await structio.open_file(os.path.join(tempfile.gettempdir(), "structio_test.txt"), "wb+") as f:
print(f"[main] Opened async file {f.name!r}, writing payload of {len(data)} bytes")
await f.write(data)
await f.seek(0)
assert await f.read(len(data)) == data
await f.flush()
print(f"[main] Deleting {f.name!r}")
await structio.thread.run_in_worker(os.unlink, f.name)
assert not await structio.thread.run_in_worker(os.path.isfile, f.name)
print(f"[main] Done in {structio.clock() - t:.2f} seconds")
#structio.run(main)
MB = 1_048_576
GB = 1
# Write 1GB of data (too much?)
structio.run(main_2, b"a" * (GB * 1024 * MB))