171 lines
4.8 KiB
Python
171 lines
4.8 KiB
Python
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass
|
|
from typing import BinaryIO, List, Optional
|
|
|
|
import urllib3
|
|
from rich.console import Console
|
|
from rich.progress import Progress
|
|
|
|
|
|
@dataclass
|
|
class File:
|
|
handle: BinaryIO
|
|
name: str
|
|
size: int # in bytes
|
|
|
|
|
|
class ProgressHandler:
|
|
# I'm not entirely sure if this is thread safe
|
|
def __init__(self, console: Console, rich_progress: Progress, taskid: int):
|
|
self.console = console
|
|
self.rich_progress = rich_progress
|
|
self.taskid = taskid
|
|
self.urls: List[str] = []
|
|
|
|
def log(self, *a, **kw):
|
|
self.console.log(*a, **kw)
|
|
|
|
def exception(self, *a, **kw):
|
|
self.console.log(*a, **kw, style="bold red")
|
|
|
|
def warning(self, *a, **kw):
|
|
self.console.log(*a, **kw, style="bold yellow")
|
|
|
|
def add_url(self, url: str) -> None:
|
|
self.console.log(url)
|
|
self.urls.append(url)
|
|
|
|
def make_visible(self):
|
|
self.rich_progress.update(self.taskid, visible=True)
|
|
|
|
def start(self) -> None:
|
|
self.rich_progress.start_task(self.taskid)
|
|
|
|
def reset(self) -> None:
|
|
self.rich_progress.reset(self.taskid, start=False)
|
|
|
|
def done(self) -> None:
|
|
self.rich_progress.update(self.taskid, description="Done", total=1, completed=1)
|
|
|
|
def failed(self) -> None:
|
|
self.rich_progress.update(
|
|
self.taskid, description="Failed", total=1, completed=1
|
|
)
|
|
|
|
def update_description(self, description: str, started: bool = False):
|
|
self.rich_progress.update(self.taskid, description=description)
|
|
if started:
|
|
self.start()
|
|
else:
|
|
self.reset()
|
|
|
|
def advance(self, step: int) -> None:
|
|
self.rich_progress.advance(self.taskid, step)
|
|
|
|
def update(
|
|
self,
|
|
total: Optional[float] = None,
|
|
completed: Optional[float] = None,
|
|
advance: Optional[float] = None,
|
|
description: Optional[str] = None,
|
|
visible: Optional[bool] = None,
|
|
refresh: bool = False,
|
|
**fields,
|
|
) -> None:
|
|
self.rich_progress.update(
|
|
total, completed, advance, description, visible, refresh, **fields
|
|
)
|
|
|
|
|
|
class FileUploader(ABC):
|
|
@classmethod
|
|
@property
|
|
@abstractmethod
|
|
def site(self) -> str:
|
|
...
|
|
|
|
@classmethod
|
|
@property
|
|
@abstractmethod
|
|
def short(self) -> str:
|
|
...
|
|
|
|
@abstractmethod
|
|
def __init__(self, file: File, progress: ProgressHandler):
|
|
...
|
|
|
|
@abstractmethod
|
|
def _upload(self):
|
|
...
|
|
|
|
@abstractmethod
|
|
def upload(self):
|
|
...
|
|
|
|
|
|
class MultipartProgress:
|
|
def __init__(
|
|
self, progress: ProgressHandler, fields: dict, chunk: Optional[int] = None
|
|
):
|
|
self.progress = progress
|
|
self.fields = fields
|
|
self.boundary = urllib3.filepost.choose_boundary()
|
|
self.content_type = f"multipart/form-data; boundary={self.boundary}"
|
|
self.chunk = chunk
|
|
|
|
def __iter__(self):
|
|
for field in urllib3.filepost.iter_field_objects(self.fields):
|
|
yield f"--{self.boundary}\r\n".encode()
|
|
yield field.render_headers().encode()
|
|
|
|
data = field.data
|
|
|
|
if isinstance(data, int):
|
|
data = str(data) # Backwards compatibility
|
|
if isinstance(data, str):
|
|
yield data.encode()
|
|
elif isinstance(data, File):
|
|
# 1/100th the size if the condition is met, else 150k or 1MB:
|
|
# 150000 <= 1/100th the size <= 1MB
|
|
chunk = self.chunk or int(min(max(data.size / 100, 150000), 1048576))
|
|
while True:
|
|
d = data.handle.read(chunk)
|
|
if not d:
|
|
break
|
|
yield d
|
|
self.progress.advance(chunk)
|
|
else:
|
|
yield data
|
|
yield b"\r\n"
|
|
yield f"--{self.boundary}--\r\n".encode()
|
|
|
|
def __len__(self):
|
|
result = 36
|
|
for k, v in self.fields.items():
|
|
result += 36
|
|
result += len(k)
|
|
if isinstance(v, tuple):
|
|
result += 74 # All the fluff
|
|
result += len(v[0])
|
|
result += v[1].size
|
|
if len(v) == 3:
|
|
result += len(v[2])
|
|
else:
|
|
result += len(urllib3.fields.guess_content_type(v[0]))
|
|
elif isinstance(v, str):
|
|
result += 45 # All the fluff
|
|
result += len(v)
|
|
else:
|
|
self.progress.warning(f"Got unexpected type {type(v)}.")
|
|
return result
|
|
|
|
# This is to allow requests.post(**MultipartProgress(...))
|
|
def keys(self):
|
|
return ("headers", "data")
|
|
|
|
def __getitem__(self, name: str):
|
|
if name == "headers":
|
|
return {"Content-Type": self.content_type}
|
|
elif name == "data":
|
|
return self
|