structio/tests/files.py

44 lines
1.5 KiB
Python

import structio
import tempfile
import os
from structio import aprint
async def main_2(data: bytes):
t = structio.clock()
await aprint("[main] Using low level os module")
async with await structio.open_file(os.path.join(tempfile.gettempdir(), "structio_test.txt"), "wb+") as f:
await aprint(f"[main] Opened async file {f.name!r}, writing payload of {len(data)} bytes")
await f.write(data)
await f.seek(0)
assert await f.read(len(data)) == data
await f.flush()
await aprint(f"[main] Deleting {f.name!r}")
await structio.thread.run_in_worker(os.unlink, f.name)
assert not await structio.thread.run_in_worker(os.path.isfile, f.name)
await aprint(f"[main] Done in {structio.clock() - t:.2f} seconds")
async def main_3(data: bytes):
t = structio.clock()
await aprint("[main] Using high level pathlib wrapper")
path = structio.Path(tempfile.gettempdir()) / "structio_test.txt"
async with await path.open("wb+") as f:
await aprint(f"[main] Opened async file {f.name!r}, writing payload of {len(data)} bytes")
await f.write(data)
await f.seek(0)
assert await f.read(len(data)) == data
await f.flush()
await aprint(f"[main] Deleting {f.name!r}")
await path.unlink()
assert not await path.exists()
await aprint(f"[main] Done in {structio.clock() - t:.2f} seconds")
MB = 1048576
payload = b"a" * MB * 100
# Write 100MiB of data (too much?)
structio.run(main_2, payload)
structio.run(main_3, payload)