From 574cdcf47be65b5d16a292427cc8db33e40bf667 Mon Sep 17 00:00:00 2001 From: GodSaveTheDoge <51802433+GodSaveTheDoge@users.noreply.github.com> Date: Thu, 10 Mar 2022 02:26:21 +0100 Subject: [PATCH] Made progress bar and most helper Hosts: - mirrored.to - anonfiles.com - tusfiles.com --- LICENSE.md | 11 ++ Makefile | 8 +- mirrorme.py | 303 --------------------------------------- mirrorme/__init__.py | 0 mirrorme/__main__.py | 69 +++++++++ mirrorme/host_manager.py | 231 +++++++++++++++++++++++++++++ mirrorme/types.py | 170 ++++++++++++++++++++++ 7 files changed, 487 insertions(+), 305 deletions(-) create mode 100644 LICENSE.md delete mode 100644 mirrorme.py create mode 100644 mirrorme/__init__.py create mode 100644 mirrorme/__main__.py create mode 100644 mirrorme/host_manager.py create mode 100644 mirrorme/types.py diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..7a3094a --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,11 @@ +DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE +Version 2, December 2004 + +Copyright (C) 2004 Sam Hocevar + +Everyone is permitted to copy and distribute verbatim or modified copies of this license document, and changing it is allowed as long as the name is changed. + +DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE +TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. You just DO WHAT THE FUCK YOU WANT TO. diff --git a/Makefile b/Makefile index caf9b3e..743347b 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,7 @@ format: - black mirrorme.py - isort mirrorme.py + black --exclude env . + isort -s env . + +check: + mypy --ignore-missing-imports mirrorme + flake8 --ignore=E501 mirrorme diff --git a/mirrorme.py b/mirrorme.py deleted file mode 100644 index 27164b7..0000000 --- a/mirrorme.py +++ /dev/null @@ -1,303 +0,0 @@ -import argparse -import base64 -import json -import os -import random -import re -import string -import sys -import time -from concurrent.futures import ThreadPoolExecutor, as_completed -from io import BytesIO -from typing import Callable, List - -import requests - - -class MirrorManager: - def __init__(self): - self.hosts = [] - - def register_host( - self, name: str, short: str - ) -> Callable[[Callable[[BytesIO], List[str]]], Callable[[BytesIO], List[str]]]: - def decorator(fun): - self.hosts.append((name, short, fun)) - return fun - - return decorator - - -def main(): - parser = argparse.ArgumentParser(description="Mirror files.") - parser.add_argument("-all", action="store_true", help="Upload to every host.") - - for name, short, _ in mmanager.hosts: - parser.add_argument(f"-{short}", action="store_true", help=f"Upload to {name}") - - parser.add_argument("file", help="name of the file to upload") - args = parser.parse_args() - fname = args.file - - if not os.path.isfile(fname): - print(f"{repr(fname)} does not exist or is a directory!", file=sys.stderr) - return 1 - - mirrors = {} - futures = {} - with ThreadPoolExecutor(max_workers=4) as thpool: - for name, short, fun in mmanager.hosts: - if args.all or getattr(args, short): - futures[thpool.submit(fun, open(fname, "rb"))] = short - - for future in as_completed(futures.keys()): - exc = future.exception() - if exc: - print(f"Uploading to {name} failed due to: {exc}", file=sys.stderr) - continue - mirrors[futures[future]] = future.result() - - print(json.dumps(mirrors)) - - -mmanager = MirrorManager() - - -@mmanager.register_host("anonfiles.com", "af") -def host_anonfiles(fhandle: BytesIO) -> List[str]: - j = requests.post( - "https://api.anonfiles.com/upload", files={"file": fhandle} - ).json() - return [j["data"]["file"]["url"]["short"]] - - -@mmanager.register_host("bayfiles.com", "bf") -def host_bayfiles(fhandle: BytesIO) -> List[str]: - j = requests.post("https://api.bayfiles.com/upload", files={"file": fhandle}).json() - return [j["data"]["file"]["url"]["short"]] - - -@mmanager.register_host("uptobox.com", "utb") -def host_uptobox(fhandle: BytesIO) -> List[str]: - j = requests.post( - "https://www78.uptobox.com/upload", - files={"files": fhandle}, - ).json() - return [j["files"][0]["url"]] - - -re_1f = re.compile(r"https://1fichier.com/\?([a-z0-9]{18,})") - - -@mmanager.register_host("1ficher.com", "1f") -def host_1ficher(fhandle: BytesIO) -> List[str]: - rid = "".join(random.choices(string.ascii_letters + string.digits, k=10)) - fname = fhandle.name if hasattr(fhandle, "name") else "file.dat" - r = requests.post( - "https://up2.1fichier.com/upload.cgi", - params={"id": rid}, - files=[("file[]", (fname, fhandle, "application/octet-stream"))], - data={ - "send_ssl": "on", - "domain": 0, - "mail": "", - "dpass": "", - "user": "", - "mails": "", - "message": "", - "submit": "Send", - }, - ) - rg = re_1f.search(r.text) - if rg is None: - raise Exception("No download url in final response") - return [rg.group(0)] - - -@mmanager.register_host("siasky.net", "ss") -def host_siasky(fhandle: BytesIO) -> List[str]: - j = requests.post( - "https://siasky.net/skynet/skyfile", - files={"file": fhandle}, - ).json() - return ["https://siasky.net" + j["skylink"]] - - -@mmanager.register_host("gofile.io", "go") -def host_gofile(fhandle: BytesIO) -> List[str]: - fname = fhandle.name if hasattr(fhandle, "name") else "file.dat" - j = requests.get("https://api.gofile.io/createAccount").json() - token = j["data"]["token"] - j = requests.get( - "https://api.gofile.io/getAccountDetails", params={"token": token} - ).json() - root_folder = j["data"]["rootFolder"] - j = requests.put( - "https://api.gofile.io/createFolder", - data={"parentFolderId": root_folder, "token": token}, - ).json() - folder = j["data"]["id"] - j = requests.get("https://api.gofile.io/getServer").json() - server = j["data"]["server"] - assert server.isalnum() # Let's try to avoid injection - j = requests.put( - "https://api.gofile.io/setFolderOption", - data={"folderId": folder, "token": token, "option": "public", "value": "true"}, - ).json() - j = requests.post( - f"https://{server}.gofile.io/uploadFile", - data={"token": token, "folderId": folder}, - files=[("file", (fname, fhandle, "application/octet-stream"))], - ).json() - return [j["data"]["downloadPage"]] - - -@mmanager.register_host("download.gg", "dgg") -def host_downloadgg(fhandle: BytesIO) -> List[str]: - fname = fhandle.name if hasattr(fhandle, "name") else "file.dat" - r = requests.post( - "https://download.gg/server/upload.php", - files=[("file[]", (fname, fhandle, "application/octet-stream"))], - data={"send-id-gf": "undefined"}, - ) - return ["https://download.gg/file-" + r.text.replace("&", "_")] - - -re_mir_token = re.compile(r"'token' : '([a-z0-9]{32})'") -re_mir_files = re.compile(r"(https://www\.mirrored\.to/files/[A-Z0-9]+/[A-z0-9._]+)<") -re_mir_hfiles = re.compile( - r"https://www\.mirrored\.to/files/[A-Z0-9]+/\?hash=[0-9a-f]+&dl=[01]" -) -re_mir_mirstats = re.compile( - r"/mirstats\.php\?uid=[A-Z0-9]+&tmpID=[0-9a-f]+&fn=[A-z0-9.]+&ads=1&gp=1&su=0&pid=0&puid=0&fd=1&s=0&lang=[a-z]+&ftype=[A-z]+" -) -re_mir_getlink = re.compile(r"/getlink/[A-Z0-9]+/[0-9]+/\?hid=[A-z0-9%]+&tid=[0-9a-f]+") -re_mir_hosts = re.compile( - r"f=(https?:\/\/(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b(?:[-a-zA-Z0-9()@:%_\+.~#?&//=]*)) target=\"_blank\">" -) - - -@mmanager.register_host("mirrored.to", "mir") -def host_mirrored(fhandle: BytesIO) -> List[str]: - fname = fhandle.name if hasattr(fhandle, "name") else "file.dat" - fhandle.seek(0, 2) # Seek to the end of the file - fsize = fhandle.tell() - fhandle.seek(0) - urls = [] - for hosts in ( - ( - "gofileio", - "downloadgg", - "onefichier", - "turbobit", - "zippyshare", - "usersdrive", - "bayfiles", - "anonfiles", - "clicknupload", - "uptobox", - ), - ( - "dailyuploads", - "uploadee", - "dropapk", - "mixdropco", - "filesim", - "megaupnet", - "file-upload", - "sendcm", - "skynet", - "pixeldrain", - ), - ): - r = requests.get("https://www.mirrored.to/") - rg = re_mir_token.search(r.text) - if rg is None: - raise Exception("No token found!") - token = rg.group(1) - - r = requests.post( - "https://www.mirrored.to/uploadify/uploadifive1.php", - files=[("Filedata", (fname, fhandle, "application/octet-stream"))], - data={"timestamp": "", "token": token}, - ) - if not r.ok and r.text: - raise Exception("Failed to upload") - - # B64 ENCODE: - # For each filename: - # Enter name of file. Be sure to use format of filename listed in upload result as guide. It must be exact. Do not include quotes. - # Paste this: #0# - # Enter size of file in Bytes. For 1 MiB file, enter 1048576. This must be exact! - # Paste this: ;0; - # Example for 2 files: - # First_File.7z#0#1234567;0;Second_File.7z#0#8901234;0; - # Paste this: @e@#H# - # Paste host list from box below. - - # onefichier;anonfiles;solidfiles - # Paste this: ;#P##SC##T# - # Enter some numbers. Example: 1625815023. It should be unique, as it is used for timestamp. - - data = base64.b64encode( - f"{fname}#0#{fsize};0;@e@#H#{';'.join(hosts)};#P##SC##T#{int(time.time() * 1000)}".encode() - ).decode() - r = requests.get( - "https://www.mirrored.to/upload_complete.php", - params={"w": "1", "data": data}, - ) - rg = re_mir_files.search(r.text) - if rg is None: - raise Exception("No link to url list in upload_complete") - files_url = rg.group(1) - - r = requests.get(files_url) - rg = re_mir_hfiles.search(r.text) - if rg is None: - raise Exception("No hash link to list in files url") - hfiles_url = rg.group(0) - - r = requests.get(hfiles_url) - rg = re_mir_mirstats.search(r.text) - if rg is None: - raise Exception("No mirstats link in hfiles url") - mirstats_url = "https://www.mirrored.to" + rg.group(0) - - for i in range(300): # Timeout to avoid waiting forever - r = requests.get(mirstats_url) - if "id_Uploading" not in r.text: - break - time.sleep(5) - getlinks = re_mir_getlink.findall(r.text) - - for link in getlinks: - r = requests.get("https://mirrored.to/" + link) - rg = re_mir_hosts.search(r.text) - if rg is None: - continue # It's not worth trashing all other mirrors - urls.append(rg.group(1)) - - return urls - - -@mmanager.register_host("tusfiles.com", "tus") -def host_tusfiles(fhandle: BytesIO) -> List[str]: - fname = fhandle.name if hasattr(fhandle, "name") else "file.dat" - j = requests.post( - "https://cloud01.tusfiles.com/cgi-bin/upload.cgi", - params={"upload_type": "file", "utype": "anon"}, - files=[("file_0", (fname, fhandle, "application/octet-stream"))], - data={ - "sess_id": "", - "utype": "anon", - "link_pass": "", - "link_rcpt": "", - "link_pass": "", - "keepalive": "1", - }, - ).json() - return ["https://tusfiles.com/" + j[0]["file_code"]] - - -if __name__ == "__main__": - exit(main()) diff --git a/mirrorme/__init__.py b/mirrorme/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mirrorme/__main__.py b/mirrorme/__main__.py new file mode 100644 index 0000000..f1a45b9 --- /dev/null +++ b/mirrorme/__main__.py @@ -0,0 +1,69 @@ +import argparse +import os +from concurrent.futures import ThreadPoolExecutor + +from rich.console import Console +from rich.progress import Progress + +from .host_manager import host_manager +from .types import File, ProgressHandler + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Mirror a file on multiple file hosting services" + ) + parser.add_argument( + "--pool", + type=int, + metavar="N", + default=5, + help="Number of max workers in the thread pool", + ) + parser.add_argument("file", help="Path to the file to upload") + + for host in host_manager.hosts: + parser.add_argument( + f"-{host.short}", + action="count", + default=0, + help=f"Upload to {host.site}. Repeat to upload multiple times", + ) + + args = parser.parse_args() + console = Console() + + file_name = args.file + if not os.path.isfile(file_name): + console.log(f"[bold red]{repr(file_name)} is not a file!") + return 1 + with open(file_name) as fhandle: + file_size = fhandle.seek(0, 2) + + with Progress(console=console, transient=True) as progress, ThreadPoolExecutor( + max_workers=args.pool + ) as thread_pool: + for host in host_manager.hosts: + for _ in range(getattr(args, host.short)): + thread_pool.submit( + host( + File(open(file_name, "rb"), file_name, file_size), + ProgressHandler( + console, + progress, + progress.add_task( + f"[{host.short}] Waiting...", + start=False, + total=file_size, + visible=False, + ), + ), + ).upload + ) + + console.log("[bold green]File mirrored!") + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/mirrorme/host_manager.py b/mirrorme/host_manager.py new file mode 100644 index 0000000..0658675 --- /dev/null +++ b/mirrorme/host_manager.py @@ -0,0 +1,231 @@ +import base64 +import re +import time +from typing import Type + +import requests + +from .types import File, FileUploader, MultipartProgress, ProgressHandler + + +class HostManager: + def __init__(self): + self.hosts = [] + + def register_host(self, host: Type[FileUploader]): + self.hosts.append(host) + return host + + +host_manager = HostManager() + + +@host_manager.register_host +class MirroredToUploader(FileUploader): + site = "mirrored.to" + short = "mir" + + def __init__(self, file: File, progress: ProgressHandler): + self.file = file + self.progress = progress + self.re_mir_token = re.compile(r"'token' : '([a-z0-9]{32})'") + self.re_mir_files = re.compile( + r"(https://www\.mirrored\.to/files/[A-Z0-9]+/[A-z0-9._]+)<" + ) + self.re_mir_hfiles = re.compile( + r"https://www\.mirrored\.to/files/[A-Z0-9]+/\?hash=[0-9a-f]+&dl=[01]" + ) + self.re_mir_mirstats = re.compile( + r"/mirstats\.php\?uid=[A-Z0-9]+&tmpID=[0-9a-f]+&fn=[A-z0-9.]+&ads=1&gp=1&su=0&pid=0&puid=0&fd=1&s=0&lang=[a-z]+&ftype=[A-z]+" + ) + self.re_mir_getlink = re.compile( + r"/getlink/[A-Z0-9]+/[0-9]+/\?hid=[A-z0-9%]+&tid=[0-9a-f]+" + ) + self.re_mir_hosts = re.compile( + r"f=(https?:\/\/(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b(?:[-a-zA-Z0-9()@:%_\+.~#?&//=]*)) target=\"_blank\">" + ) + + def upload(self) -> None: + self.progress.make_visible() + try: + self._upload() + self.progress.done() + except Exception as e: + self.progress.exception(f"Failed uploading to {self.site} due to: {e}") + self.progress.failed() + + def _upload(self): + self.progress.update_description("Fetching token...") + r = requests.get("https://www.mirrored.to/") + rg = self.re_mir_token.search(r.text) + if rg is None: + self.progress.exception("No token found!") + return + token = rg.group(1) + + self.progress.update_description("Uploading...", True) + r = requests.post( + "https://www.mirrored.to/uploadify/uploadifive1.php", + **MultipartProgress( + self.progress, + { + "timestamp": "", + "token": token, + "Filedata": (self.file.name, self.file), + }, + ), + ) + if not r.ok and r.text: + self.progress.exception("Failed to upload") + return + + for hosts in ( + ( + "gofileio", + "downloadgg", + "onefichier", + "turbobit", + "zippyshare", + "usersdrive", + "bayfiles", + "anonfiles", + "clicknupload", + "uptobox", + ), + ( + "dailyuploads", + "uploadee", + "dropapk", + "mixdropco", + "filesim", + "megaupnet", + "file-upload", + "sendcm", + "skynet", + "pixeldrain", + ), + ): + # B64 ENCODE: + # For each filename: + # Enter name of file. Be sure to use format of filename listed in upload result as guide. It must be exact. Do not include quotes. + # Paste this: #0# + # Enter size of file in Bytes. For 1 MiB file, enter 1048576. This must be exact! + # Paste this: ;0; + # Example for 2 files: + # First_File.7z#0#1234567;0;Second_File.7z#0#8901234;0; + # Paste this: @e@#H# + # Paste host list from box below. + + # onefichier;anonfiles;solidfiles + # Paste this: ;#P##SC##T# + # Enter some numbers. Example: 1625815023. It should be unique, as it is used for timestamp. + + self.progress.update_description("Fetching link page...") + data = base64.b64encode( + f"{self.file.name}#0#{self.file.size};0;@e@#H#{';'.join(hosts)};#P##SC##T#{int(time.time() * 1000)}".encode() + ).decode() + r = requests.get( + "https://www.mirrored.to/upload_complete.php", + params={"w": "1", "data": data}, + ) + rg = self.re_mir_files.search(r.text) + if rg is None: + self.progress.exception("No link to url list in upload_complete") + return + files_url = rg.group(1) + + r = requests.get(files_url) + rg = self.re_mir_hfiles.search(r.text) + if rg is None: + self.progress.exception("No hash link to list in files url") + return + hfiles_url = rg.group(0) + + self.progress.update_description("Waiting for upload...") + r = requests.get(hfiles_url) + rg = self.re_mir_mirstats.search(r.text) + if rg is None: + self.progress.exception("No mirstats link in hfiles url") + return + mirstats_url = "https://www.mirrored.to" + rg.group(0) + + for i in range(300): # Timeout to avoid waiting forever + r = requests.get(mirstats_url) + if "id_Uploading" not in r.text: + break + time.sleep(5) + getlinks = self.re_mir_getlink.findall(r.text) + + self.progress.update_description("Fetching links...") + for link in getlinks: + r = requests.get("https://mirrored.to/" + link) + rg = self.re_mir_hosts.search(r.text) + if rg is None: + continue # It's not worth trashing all other mirrors + self.progress.add_url(rg.group(1)) + + +@host_manager.register_host +class AnonfilesUploader(FileUploader): + site = "anonfiles.com" + short = "an" + + def __init__(self, file: File, progress: ProgressHandler): + self.file = file + self.progress = progress + + def upload(self) -> None: + self.progress.make_visible() + try: + self._upload() + self.progress.done() + except Exception as e: + self.progress.exception(f"Failed uploading to {self.site} due to: {e}") + self.progress.failed() + + def _upload(self) -> None: + self.progress.update_description("Uploading...", True) + j = requests.post( + "https://api.anonfiles.com/upload", + **MultipartProgress(self.progress, {"file": (self.file.name, self.file)}), + ).json() + self.progress.add_url(j["data"]["file"]["url"]["short"]) + + +@host_manager.register_host +class TusfilesUploader(FileUploader): + site = "tusfiles.com" + short = "tus" + + def __init__(self, file: File, progress: ProgressHandler): + self.file = file + self.progress = progress + + def upload(self) -> None: + self.progress.make_visible() + try: + self._upload() + self.progress.done() + except Exception as e: + self.progress.exception(f"Failed uploading to {self.site} due to: {e}") + self.progress.failed() + + def _upload(self) -> None: + self.progress.update_description("Uploading...", True) + j = requests.post( + "https://cloud01.tusfiles.com/cgi-bin/upload.cgi", + params={"upload_type": "file", "utype": "anon"}, + **MultipartProgress( + self.progress, + { + "file_0": (self.file.name, self.file), + "sess_id": "", + "utype": "anon", + "link_pass": "", + "link_rcpt": "", + "link_pass": "", + "keepalive": "1", + } + ), + ).json() + self.progress.add_url("https://tusfiles.com/" + j[0]["file_code"]) diff --git a/mirrorme/types.py b/mirrorme/types.py new file mode 100644 index 0000000..1bfb4c2 --- /dev/null +++ b/mirrorme/types.py @@ -0,0 +1,170 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import BinaryIO, List, Optional + +import urllib3 +from rich.console import Console +from rich.progress import Progress + + +@dataclass +class File: + handle: BinaryIO + name: str + size: int # in bytes + + +class ProgressHandler: + # I'm not entirely sure if this is thread safe + def __init__(self, console: Console, rich_progress: Progress, taskid: int): + self.console = console + self.rich_progress = rich_progress + self.taskid = taskid + self.urls: List[str] = [] + + def log(self, *a, **kw): + self.console.log(*a, **kw) + + def exception(self, *a, **kw): + self.console.log(*a, **kw, style="bold red") + + def warning(self, *a, **kw): + self.console.log(*a, **kw, style="bold yellow") + + def add_url(self, url: str) -> None: + self.console.log(url) + self.urls.append(url) + + def make_visible(self): + self.rich_progress.update(self.taskid, visible=True) + + def start(self) -> None: + self.rich_progress.start_task(self.taskid) + + def reset(self) -> None: + self.rich_progress.reset(self.taskid, start=False) + + def done(self) -> None: + self.rich_progress.update(self.taskid, description="Done", total=1, completed=1) + + def failed(self) -> None: + self.rich_progress.update( + self.taskid, description="Failed", total=1, completed=1 + ) + + def update_description(self, description: str, started: bool = False): + self.rich_progress.update(self.taskid, description=description) + if started: + self.start() + else: + self.reset() + + def advance(self, step: int) -> None: + self.rich_progress.advance(self.taskid, step) + + def update( + self, + total: Optional[float] = None, + completed: Optional[float] = None, + advance: Optional[float] = None, + description: Optional[str] = None, + visible: Optional[bool] = None, + refresh: bool = False, + **fields, + ) -> None: + self.rich_progress.update( + total, completed, advance, description, visible, refresh, **fields + ) + + +class FileUploader(ABC): + @classmethod + @property + @abstractmethod + def site(self) -> str: + ... + + @classmethod + @property + @abstractmethod + def short(self) -> str: + ... + + @abstractmethod + def __init__(self, file: File, progress: ProgressHandler): + ... + + @abstractmethod + def _upload(self): + ... + + @abstractmethod + def upload(self): + ... + + +class MultipartProgress: + def __init__( + self, progress: ProgressHandler, fields: dict, chunk: Optional[int] = None + ): + self.progress = progress + self.fields = fields + self.boundary = urllib3.filepost.choose_boundary() + self.content_type = f"multipart/form-data; boundary={self.boundary}" + self.chunk = chunk + + def __iter__(self): + for field in urllib3.filepost.iter_field_objects(self.fields): + yield f"--{self.boundary}\r\n".encode() + yield field.render_headers().encode() + + data = field.data + + if isinstance(data, int): + data = str(data) # Backwards compatibility + if isinstance(data, str): + yield data.encode() + elif isinstance(data, File): + # 1/100th the size if the condition is met, else 150k or 1MB: + # 150000 <= 1/100th the size <= 1MB + chunk = self.chunk or int(min(max(data.size / 100, 150000), 1048576)) + while True: + d = data.handle.read(chunk) + if not d: + break + yield d + self.progress.advance(chunk) + else: + yield data + yield b"\r\n" + yield f"--{self.boundary}--\r\n".encode() + + def __len__(self): + result = 36 + for k, v in self.fields.items(): + result += 36 + result += len(k) + if isinstance(v, tuple): + result += 74 # All the fluff + result += len(v[0]) + result += v[1].size + if len(v) == 3: + result += len(v[2]) + else: + result += len(urllib3.fields.guess_content_type(v[0])) + elif isinstance(v, str): + result += 45 # All the fluff + result += len(v) + else: + self.progress.warning(f"Got unexpected type {type(v)}.") + return result + + # This is to allow requests.post(**MultipartProgress(...)) + def keys(self): + return ("headers", "data") + + def __getitem__(self, name: str): + if name == "headers": + return {"Content-Type": self.content_type} + elif name == "data": + return self