Made progress bar and most helper
Hosts: - mirrored.to - anonfiles.com - tusfiles.com
This commit is contained in:
parent
ca5cfc3696
commit
574cdcf47b
|
@ -0,0 +1,11 @@
|
|||
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
|
||||
Version 2, December 2004
|
||||
|
||||
Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
|
||||
|
||||
Everyone is permitted to copy and distribute verbatim or modified copies of this license document, and changing it is allowed as long as the name is changed.
|
||||
|
||||
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
|
||||
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
|
||||
|
||||
0. You just DO WHAT THE FUCK YOU WANT TO.
|
8
Makefile
8
Makefile
|
@ -1,3 +1,7 @@
|
|||
format:
|
||||
black mirrorme.py
|
||||
isort mirrorme.py
|
||||
black --exclude env .
|
||||
isort -s env .
|
||||
|
||||
check:
|
||||
mypy --ignore-missing-imports mirrorme
|
||||
flake8 --ignore=E501 mirrorme
|
||||
|
|
303
mirrorme.py
303
mirrorme.py
|
@ -1,303 +0,0 @@
|
|||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import sys
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from io import BytesIO
|
||||
from typing import Callable, List
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
class MirrorManager:
|
||||
def __init__(self):
|
||||
self.hosts = []
|
||||
|
||||
def register_host(
|
||||
self, name: str, short: str
|
||||
) -> Callable[[Callable[[BytesIO], List[str]]], Callable[[BytesIO], List[str]]]:
|
||||
def decorator(fun):
|
||||
self.hosts.append((name, short, fun))
|
||||
return fun
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Mirror files.")
|
||||
parser.add_argument("-all", action="store_true", help="Upload to every host.")
|
||||
|
||||
for name, short, _ in mmanager.hosts:
|
||||
parser.add_argument(f"-{short}", action="store_true", help=f"Upload to {name}")
|
||||
|
||||
parser.add_argument("file", help="name of the file to upload")
|
||||
args = parser.parse_args()
|
||||
fname = args.file
|
||||
|
||||
if not os.path.isfile(fname):
|
||||
print(f"{repr(fname)} does not exist or is a directory!", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
mirrors = {}
|
||||
futures = {}
|
||||
with ThreadPoolExecutor(max_workers=4) as thpool:
|
||||
for name, short, fun in mmanager.hosts:
|
||||
if args.all or getattr(args, short):
|
||||
futures[thpool.submit(fun, open(fname, "rb"))] = short
|
||||
|
||||
for future in as_completed(futures.keys()):
|
||||
exc = future.exception()
|
||||
if exc:
|
||||
print(f"Uploading to {name} failed due to: {exc}", file=sys.stderr)
|
||||
continue
|
||||
mirrors[futures[future]] = future.result()
|
||||
|
||||
print(json.dumps(mirrors))
|
||||
|
||||
|
||||
mmanager = MirrorManager()
|
||||
|
||||
|
||||
@mmanager.register_host("anonfiles.com", "af")
|
||||
def host_anonfiles(fhandle: BytesIO) -> List[str]:
|
||||
j = requests.post(
|
||||
"https://api.anonfiles.com/upload", files={"file": fhandle}
|
||||
).json()
|
||||
return [j["data"]["file"]["url"]["short"]]
|
||||
|
||||
|
||||
@mmanager.register_host("bayfiles.com", "bf")
|
||||
def host_bayfiles(fhandle: BytesIO) -> List[str]:
|
||||
j = requests.post("https://api.bayfiles.com/upload", files={"file": fhandle}).json()
|
||||
return [j["data"]["file"]["url"]["short"]]
|
||||
|
||||
|
||||
@mmanager.register_host("uptobox.com", "utb")
|
||||
def host_uptobox(fhandle: BytesIO) -> List[str]:
|
||||
j = requests.post(
|
||||
"https://www78.uptobox.com/upload",
|
||||
files={"files": fhandle},
|
||||
).json()
|
||||
return [j["files"][0]["url"]]
|
||||
|
||||
|
||||
re_1f = re.compile(r"https://1fichier.com/\?([a-z0-9]{18,})")
|
||||
|
||||
|
||||
@mmanager.register_host("1ficher.com", "1f")
|
||||
def host_1ficher(fhandle: BytesIO) -> List[str]:
|
||||
rid = "".join(random.choices(string.ascii_letters + string.digits, k=10))
|
||||
fname = fhandle.name if hasattr(fhandle, "name") else "file.dat"
|
||||
r = requests.post(
|
||||
"https://up2.1fichier.com/upload.cgi",
|
||||
params={"id": rid},
|
||||
files=[("file[]", (fname, fhandle, "application/octet-stream"))],
|
||||
data={
|
||||
"send_ssl": "on",
|
||||
"domain": 0,
|
||||
"mail": "",
|
||||
"dpass": "",
|
||||
"user": "",
|
||||
"mails": "",
|
||||
"message": "",
|
||||
"submit": "Send",
|
||||
},
|
||||
)
|
||||
rg = re_1f.search(r.text)
|
||||
if rg is None:
|
||||
raise Exception("No download url in final response")
|
||||
return [rg.group(0)]
|
||||
|
||||
|
||||
@mmanager.register_host("siasky.net", "ss")
|
||||
def host_siasky(fhandle: BytesIO) -> List[str]:
|
||||
j = requests.post(
|
||||
"https://siasky.net/skynet/skyfile",
|
||||
files={"file": fhandle},
|
||||
).json()
|
||||
return ["https://siasky.net" + j["skylink"]]
|
||||
|
||||
|
||||
@mmanager.register_host("gofile.io", "go")
|
||||
def host_gofile(fhandle: BytesIO) -> List[str]:
|
||||
fname = fhandle.name if hasattr(fhandle, "name") else "file.dat"
|
||||
j = requests.get("https://api.gofile.io/createAccount").json()
|
||||
token = j["data"]["token"]
|
||||
j = requests.get(
|
||||
"https://api.gofile.io/getAccountDetails", params={"token": token}
|
||||
).json()
|
||||
root_folder = j["data"]["rootFolder"]
|
||||
j = requests.put(
|
||||
"https://api.gofile.io/createFolder",
|
||||
data={"parentFolderId": root_folder, "token": token},
|
||||
).json()
|
||||
folder = j["data"]["id"]
|
||||
j = requests.get("https://api.gofile.io/getServer").json()
|
||||
server = j["data"]["server"]
|
||||
assert server.isalnum() # Let's try to avoid injection
|
||||
j = requests.put(
|
||||
"https://api.gofile.io/setFolderOption",
|
||||
data={"folderId": folder, "token": token, "option": "public", "value": "true"},
|
||||
).json()
|
||||
j = requests.post(
|
||||
f"https://{server}.gofile.io/uploadFile",
|
||||
data={"token": token, "folderId": folder},
|
||||
files=[("file", (fname, fhandle, "application/octet-stream"))],
|
||||
).json()
|
||||
return [j["data"]["downloadPage"]]
|
||||
|
||||
|
||||
@mmanager.register_host("download.gg", "dgg")
|
||||
def host_downloadgg(fhandle: BytesIO) -> List[str]:
|
||||
fname = fhandle.name if hasattr(fhandle, "name") else "file.dat"
|
||||
r = requests.post(
|
||||
"https://download.gg/server/upload.php",
|
||||
files=[("file[]", (fname, fhandle, "application/octet-stream"))],
|
||||
data={"send-id-gf": "undefined"},
|
||||
)
|
||||
return ["https://download.gg/file-" + r.text.replace("&", "_")]
|
||||
|
||||
|
||||
re_mir_token = re.compile(r"'token' : '([a-z0-9]{32})'")
|
||||
re_mir_files = re.compile(r"(https://www\.mirrored\.to/files/[A-Z0-9]+/[A-z0-9._]+)<")
|
||||
re_mir_hfiles = re.compile(
|
||||
r"https://www\.mirrored\.to/files/[A-Z0-9]+/\?hash=[0-9a-f]+&dl=[01]"
|
||||
)
|
||||
re_mir_mirstats = re.compile(
|
||||
r"/mirstats\.php\?uid=[A-Z0-9]+&tmpID=[0-9a-f]+&fn=[A-z0-9.]+&ads=1&gp=1&su=0&pid=0&puid=0&fd=1&s=0&lang=[a-z]+&ftype=[A-z]+"
|
||||
)
|
||||
re_mir_getlink = re.compile(r"/getlink/[A-Z0-9]+/[0-9]+/\?hid=[A-z0-9%]+&tid=[0-9a-f]+")
|
||||
re_mir_hosts = re.compile(
|
||||
r"f=(https?:\/\/(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b(?:[-a-zA-Z0-9()@:%_\+.~#?&//=]*)) target=\"_blank\">"
|
||||
)
|
||||
|
||||
|
||||
@mmanager.register_host("mirrored.to", "mir")
|
||||
def host_mirrored(fhandle: BytesIO) -> List[str]:
|
||||
fname = fhandle.name if hasattr(fhandle, "name") else "file.dat"
|
||||
fhandle.seek(0, 2) # Seek to the end of the file
|
||||
fsize = fhandle.tell()
|
||||
fhandle.seek(0)
|
||||
urls = []
|
||||
for hosts in (
|
||||
(
|
||||
"gofileio",
|
||||
"downloadgg",
|
||||
"onefichier",
|
||||
"turbobit",
|
||||
"zippyshare",
|
||||
"usersdrive",
|
||||
"bayfiles",
|
||||
"anonfiles",
|
||||
"clicknupload",
|
||||
"uptobox",
|
||||
),
|
||||
(
|
||||
"dailyuploads",
|
||||
"uploadee",
|
||||
"dropapk",
|
||||
"mixdropco",
|
||||
"filesim",
|
||||
"megaupnet",
|
||||
"file-upload",
|
||||
"sendcm",
|
||||
"skynet",
|
||||
"pixeldrain",
|
||||
),
|
||||
):
|
||||
r = requests.get("https://www.mirrored.to/")
|
||||
rg = re_mir_token.search(r.text)
|
||||
if rg is None:
|
||||
raise Exception("No token found!")
|
||||
token = rg.group(1)
|
||||
|
||||
r = requests.post(
|
||||
"https://www.mirrored.to/uploadify/uploadifive1.php",
|
||||
files=[("Filedata", (fname, fhandle, "application/octet-stream"))],
|
||||
data={"timestamp": "", "token": token},
|
||||
)
|
||||
if not r.ok and r.text:
|
||||
raise Exception("Failed to upload")
|
||||
|
||||
# B64 ENCODE:
|
||||
# For each filename:
|
||||
# Enter name of file. Be sure to use format of filename listed in upload result as guide. It must be exact. Do not include quotes.
|
||||
# Paste this: #0#
|
||||
# Enter size of file in Bytes. For 1 MiB file, enter 1048576. This must be exact!
|
||||
# Paste this: ;0;
|
||||
# Example for 2 files:
|
||||
# First_File.7z#0#1234567;0;Second_File.7z#0#8901234;0;
|
||||
# Paste this: @e@#H#
|
||||
# Paste host list from box below.
|
||||
|
||||
# onefichier;anonfiles;solidfiles
|
||||
# Paste this: ;#P##SC##T#
|
||||
# Enter some numbers. Example: 1625815023. It should be unique, as it is used for timestamp.
|
||||
|
||||
data = base64.b64encode(
|
||||
f"{fname}#0#{fsize};0;@e@#H#{';'.join(hosts)};#P##SC##T#{int(time.time() * 1000)}".encode()
|
||||
).decode()
|
||||
r = requests.get(
|
||||
"https://www.mirrored.to/upload_complete.php",
|
||||
params={"w": "1", "data": data},
|
||||
)
|
||||
rg = re_mir_files.search(r.text)
|
||||
if rg is None:
|
||||
raise Exception("No link to url list in upload_complete")
|
||||
files_url = rg.group(1)
|
||||
|
||||
r = requests.get(files_url)
|
||||
rg = re_mir_hfiles.search(r.text)
|
||||
if rg is None:
|
||||
raise Exception("No hash link to list in files url")
|
||||
hfiles_url = rg.group(0)
|
||||
|
||||
r = requests.get(hfiles_url)
|
||||
rg = re_mir_mirstats.search(r.text)
|
||||
if rg is None:
|
||||
raise Exception("No mirstats link in hfiles url")
|
||||
mirstats_url = "https://www.mirrored.to" + rg.group(0)
|
||||
|
||||
for i in range(300): # Timeout to avoid waiting forever
|
||||
r = requests.get(mirstats_url)
|
||||
if "id_Uploading" not in r.text:
|
||||
break
|
||||
time.sleep(5)
|
||||
getlinks = re_mir_getlink.findall(r.text)
|
||||
|
||||
for link in getlinks:
|
||||
r = requests.get("https://mirrored.to/" + link)
|
||||
rg = re_mir_hosts.search(r.text)
|
||||
if rg is None:
|
||||
continue # It's not worth trashing all other mirrors
|
||||
urls.append(rg.group(1))
|
||||
|
||||
return urls
|
||||
|
||||
|
||||
@mmanager.register_host("tusfiles.com", "tus")
|
||||
def host_tusfiles(fhandle: BytesIO) -> List[str]:
|
||||
fname = fhandle.name if hasattr(fhandle, "name") else "file.dat"
|
||||
j = requests.post(
|
||||
"https://cloud01.tusfiles.com/cgi-bin/upload.cgi",
|
||||
params={"upload_type": "file", "utype": "anon"},
|
||||
files=[("file_0", (fname, fhandle, "application/octet-stream"))],
|
||||
data={
|
||||
"sess_id": "",
|
||||
"utype": "anon",
|
||||
"link_pass": "",
|
||||
"link_rcpt": "",
|
||||
"link_pass": "",
|
||||
"keepalive": "1",
|
||||
},
|
||||
).json()
|
||||
return ["https://tusfiles.com/" + j[0]["file_code"]]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
|
@ -0,0 +1,69 @@
|
|||
import argparse
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from rich.console import Console
|
||||
from rich.progress import Progress
|
||||
|
||||
from .host_manager import host_manager
|
||||
from .types import File, ProgressHandler
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Mirror a file on multiple file hosting services"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pool",
|
||||
type=int,
|
||||
metavar="N",
|
||||
default=5,
|
||||
help="Number of max workers in the thread pool",
|
||||
)
|
||||
parser.add_argument("file", help="Path to the file to upload")
|
||||
|
||||
for host in host_manager.hosts:
|
||||
parser.add_argument(
|
||||
f"-{host.short}",
|
||||
action="count",
|
||||
default=0,
|
||||
help=f"Upload to {host.site}. Repeat to upload multiple times",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
console = Console()
|
||||
|
||||
file_name = args.file
|
||||
if not os.path.isfile(file_name):
|
||||
console.log(f"[bold red]{repr(file_name)} is not a file!")
|
||||
return 1
|
||||
with open(file_name) as fhandle:
|
||||
file_size = fhandle.seek(0, 2)
|
||||
|
||||
with Progress(console=console, transient=True) as progress, ThreadPoolExecutor(
|
||||
max_workers=args.pool
|
||||
) as thread_pool:
|
||||
for host in host_manager.hosts:
|
||||
for _ in range(getattr(args, host.short)):
|
||||
thread_pool.submit(
|
||||
host(
|
||||
File(open(file_name, "rb"), file_name, file_size),
|
||||
ProgressHandler(
|
||||
console,
|
||||
progress,
|
||||
progress.add_task(
|
||||
f"[{host.short}] Waiting...",
|
||||
start=False,
|
||||
total=file_size,
|
||||
visible=False,
|
||||
),
|
||||
),
|
||||
).upload
|
||||
)
|
||||
|
||||
console.log("[bold green]File mirrored!")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
|
@ -0,0 +1,231 @@
|
|||
import base64
|
||||
import re
|
||||
import time
|
||||
from typing import Type
|
||||
|
||||
import requests
|
||||
|
||||
from .types import File, FileUploader, MultipartProgress, ProgressHandler
|
||||
|
||||
|
||||
class HostManager:
|
||||
def __init__(self):
|
||||
self.hosts = []
|
||||
|
||||
def register_host(self, host: Type[FileUploader]):
|
||||
self.hosts.append(host)
|
||||
return host
|
||||
|
||||
|
||||
host_manager = HostManager()
|
||||
|
||||
|
||||
@host_manager.register_host
|
||||
class MirroredToUploader(FileUploader):
|
||||
site = "mirrored.to"
|
||||
short = "mir"
|
||||
|
||||
def __init__(self, file: File, progress: ProgressHandler):
|
||||
self.file = file
|
||||
self.progress = progress
|
||||
self.re_mir_token = re.compile(r"'token' : '([a-z0-9]{32})'")
|
||||
self.re_mir_files = re.compile(
|
||||
r"(https://www\.mirrored\.to/files/[A-Z0-9]+/[A-z0-9._]+)<"
|
||||
)
|
||||
self.re_mir_hfiles = re.compile(
|
||||
r"https://www\.mirrored\.to/files/[A-Z0-9]+/\?hash=[0-9a-f]+&dl=[01]"
|
||||
)
|
||||
self.re_mir_mirstats = re.compile(
|
||||
r"/mirstats\.php\?uid=[A-Z0-9]+&tmpID=[0-9a-f]+&fn=[A-z0-9.]+&ads=1&gp=1&su=0&pid=0&puid=0&fd=1&s=0&lang=[a-z]+&ftype=[A-z]+"
|
||||
)
|
||||
self.re_mir_getlink = re.compile(
|
||||
r"/getlink/[A-Z0-9]+/[0-9]+/\?hid=[A-z0-9%]+&tid=[0-9a-f]+"
|
||||
)
|
||||
self.re_mir_hosts = re.compile(
|
||||
r"f=(https?:\/\/(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b(?:[-a-zA-Z0-9()@:%_\+.~#?&//=]*)) target=\"_blank\">"
|
||||
)
|
||||
|
||||
def upload(self) -> None:
|
||||
self.progress.make_visible()
|
||||
try:
|
||||
self._upload()
|
||||
self.progress.done()
|
||||
except Exception as e:
|
||||
self.progress.exception(f"Failed uploading to {self.site} due to: {e}")
|
||||
self.progress.failed()
|
||||
|
||||
def _upload(self):
|
||||
self.progress.update_description("Fetching token...")
|
||||
r = requests.get("https://www.mirrored.to/")
|
||||
rg = self.re_mir_token.search(r.text)
|
||||
if rg is None:
|
||||
self.progress.exception("No token found!")
|
||||
return
|
||||
token = rg.group(1)
|
||||
|
||||
self.progress.update_description("Uploading...", True)
|
||||
r = requests.post(
|
||||
"https://www.mirrored.to/uploadify/uploadifive1.php",
|
||||
**MultipartProgress(
|
||||
self.progress,
|
||||
{
|
||||
"timestamp": "",
|
||||
"token": token,
|
||||
"Filedata": (self.file.name, self.file),
|
||||
},
|
||||
),
|
||||
)
|
||||
if not r.ok and r.text:
|
||||
self.progress.exception("Failed to upload")
|
||||
return
|
||||
|
||||
for hosts in (
|
||||
(
|
||||
"gofileio",
|
||||
"downloadgg",
|
||||
"onefichier",
|
||||
"turbobit",
|
||||
"zippyshare",
|
||||
"usersdrive",
|
||||
"bayfiles",
|
||||
"anonfiles",
|
||||
"clicknupload",
|
||||
"uptobox",
|
||||
),
|
||||
(
|
||||
"dailyuploads",
|
||||
"uploadee",
|
||||
"dropapk",
|
||||
"mixdropco",
|
||||
"filesim",
|
||||
"megaupnet",
|
||||
"file-upload",
|
||||
"sendcm",
|
||||
"skynet",
|
||||
"pixeldrain",
|
||||
),
|
||||
):
|
||||
# B64 ENCODE:
|
||||
# For each filename:
|
||||
# Enter name of file. Be sure to use format of filename listed in upload result as guide. It must be exact. Do not include quotes.
|
||||
# Paste this: #0#
|
||||
# Enter size of file in Bytes. For 1 MiB file, enter 1048576. This must be exact!
|
||||
# Paste this: ;0;
|
||||
# Example for 2 files:
|
||||
# First_File.7z#0#1234567;0;Second_File.7z#0#8901234;0;
|
||||
# Paste this: @e@#H#
|
||||
# Paste host list from box below.
|
||||
|
||||
# onefichier;anonfiles;solidfiles
|
||||
# Paste this: ;#P##SC##T#
|
||||
# Enter some numbers. Example: 1625815023. It should be unique, as it is used for timestamp.
|
||||
|
||||
self.progress.update_description("Fetching link page...")
|
||||
data = base64.b64encode(
|
||||
f"{self.file.name}#0#{self.file.size};0;@e@#H#{';'.join(hosts)};#P##SC##T#{int(time.time() * 1000)}".encode()
|
||||
).decode()
|
||||
r = requests.get(
|
||||
"https://www.mirrored.to/upload_complete.php",
|
||||
params={"w": "1", "data": data},
|
||||
)
|
||||
rg = self.re_mir_files.search(r.text)
|
||||
if rg is None:
|
||||
self.progress.exception("No link to url list in upload_complete")
|
||||
return
|
||||
files_url = rg.group(1)
|
||||
|
||||
r = requests.get(files_url)
|
||||
rg = self.re_mir_hfiles.search(r.text)
|
||||
if rg is None:
|
||||
self.progress.exception("No hash link to list in files url")
|
||||
return
|
||||
hfiles_url = rg.group(0)
|
||||
|
||||
self.progress.update_description("Waiting for upload...")
|
||||
r = requests.get(hfiles_url)
|
||||
rg = self.re_mir_mirstats.search(r.text)
|
||||
if rg is None:
|
||||
self.progress.exception("No mirstats link in hfiles url")
|
||||
return
|
||||
mirstats_url = "https://www.mirrored.to" + rg.group(0)
|
||||
|
||||
for i in range(300): # Timeout to avoid waiting forever
|
||||
r = requests.get(mirstats_url)
|
||||
if "id_Uploading" not in r.text:
|
||||
break
|
||||
time.sleep(5)
|
||||
getlinks = self.re_mir_getlink.findall(r.text)
|
||||
|
||||
self.progress.update_description("Fetching links...")
|
||||
for link in getlinks:
|
||||
r = requests.get("https://mirrored.to/" + link)
|
||||
rg = self.re_mir_hosts.search(r.text)
|
||||
if rg is None:
|
||||
continue # It's not worth trashing all other mirrors
|
||||
self.progress.add_url(rg.group(1))
|
||||
|
||||
|
||||
@host_manager.register_host
|
||||
class AnonfilesUploader(FileUploader):
|
||||
site = "anonfiles.com"
|
||||
short = "an"
|
||||
|
||||
def __init__(self, file: File, progress: ProgressHandler):
|
||||
self.file = file
|
||||
self.progress = progress
|
||||
|
||||
def upload(self) -> None:
|
||||
self.progress.make_visible()
|
||||
try:
|
||||
self._upload()
|
||||
self.progress.done()
|
||||
except Exception as e:
|
||||
self.progress.exception(f"Failed uploading to {self.site} due to: {e}")
|
||||
self.progress.failed()
|
||||
|
||||
def _upload(self) -> None:
|
||||
self.progress.update_description("Uploading...", True)
|
||||
j = requests.post(
|
||||
"https://api.anonfiles.com/upload",
|
||||
**MultipartProgress(self.progress, {"file": (self.file.name, self.file)}),
|
||||
).json()
|
||||
self.progress.add_url(j["data"]["file"]["url"]["short"])
|
||||
|
||||
|
||||
@host_manager.register_host
|
||||
class TusfilesUploader(FileUploader):
|
||||
site = "tusfiles.com"
|
||||
short = "tus"
|
||||
|
||||
def __init__(self, file: File, progress: ProgressHandler):
|
||||
self.file = file
|
||||
self.progress = progress
|
||||
|
||||
def upload(self) -> None:
|
||||
self.progress.make_visible()
|
||||
try:
|
||||
self._upload()
|
||||
self.progress.done()
|
||||
except Exception as e:
|
||||
self.progress.exception(f"Failed uploading to {self.site} due to: {e}")
|
||||
self.progress.failed()
|
||||
|
||||
def _upload(self) -> None:
|
||||
self.progress.update_description("Uploading...", True)
|
||||
j = requests.post(
|
||||
"https://cloud01.tusfiles.com/cgi-bin/upload.cgi",
|
||||
params={"upload_type": "file", "utype": "anon"},
|
||||
**MultipartProgress(
|
||||
self.progress,
|
||||
{
|
||||
"file_0": (self.file.name, self.file),
|
||||
"sess_id": "",
|
||||
"utype": "anon",
|
||||
"link_pass": "",
|
||||
"link_rcpt": "",
|
||||
"link_pass": "",
|
||||
"keepalive": "1",
|
||||
}
|
||||
),
|
||||
).json()
|
||||
self.progress.add_url("https://tusfiles.com/" + j[0]["file_code"])
|
|
@ -0,0 +1,170 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import BinaryIO, List, Optional
|
||||
|
||||
import urllib3
|
||||
from rich.console import Console
|
||||
from rich.progress import Progress
|
||||
|
||||
|
||||
@dataclass
|
||||
class File:
|
||||
handle: BinaryIO
|
||||
name: str
|
||||
size: int # in bytes
|
||||
|
||||
|
||||
class ProgressHandler:
|
||||
# I'm not entirely sure if this is thread safe
|
||||
def __init__(self, console: Console, rich_progress: Progress, taskid: int):
|
||||
self.console = console
|
||||
self.rich_progress = rich_progress
|
||||
self.taskid = taskid
|
||||
self.urls: List[str] = []
|
||||
|
||||
def log(self, *a, **kw):
|
||||
self.console.log(*a, **kw)
|
||||
|
||||
def exception(self, *a, **kw):
|
||||
self.console.log(*a, **kw, style="bold red")
|
||||
|
||||
def warning(self, *a, **kw):
|
||||
self.console.log(*a, **kw, style="bold yellow")
|
||||
|
||||
def add_url(self, url: str) -> None:
|
||||
self.console.log(url)
|
||||
self.urls.append(url)
|
||||
|
||||
def make_visible(self):
|
||||
self.rich_progress.update(self.taskid, visible=True)
|
||||
|
||||
def start(self) -> None:
|
||||
self.rich_progress.start_task(self.taskid)
|
||||
|
||||
def reset(self) -> None:
|
||||
self.rich_progress.reset(self.taskid, start=False)
|
||||
|
||||
def done(self) -> None:
|
||||
self.rich_progress.update(self.taskid, description="Done", total=1, completed=1)
|
||||
|
||||
def failed(self) -> None:
|
||||
self.rich_progress.update(
|
||||
self.taskid, description="Failed", total=1, completed=1
|
||||
)
|
||||
|
||||
def update_description(self, description: str, started: bool = False):
|
||||
self.rich_progress.update(self.taskid, description=description)
|
||||
if started:
|
||||
self.start()
|
||||
else:
|
||||
self.reset()
|
||||
|
||||
def advance(self, step: int) -> None:
|
||||
self.rich_progress.advance(self.taskid, step)
|
||||
|
||||
def update(
|
||||
self,
|
||||
total: Optional[float] = None,
|
||||
completed: Optional[float] = None,
|
||||
advance: Optional[float] = None,
|
||||
description: Optional[str] = None,
|
||||
visible: Optional[bool] = None,
|
||||
refresh: bool = False,
|
||||
**fields,
|
||||
) -> None:
|
||||
self.rich_progress.update(
|
||||
total, completed, advance, description, visible, refresh, **fields
|
||||
)
|
||||
|
||||
|
||||
class FileUploader(ABC):
|
||||
@classmethod
|
||||
@property
|
||||
@abstractmethod
|
||||
def site(self) -> str:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
@property
|
||||
@abstractmethod
|
||||
def short(self) -> str:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, file: File, progress: ProgressHandler):
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def _upload(self):
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def upload(self):
|
||||
...
|
||||
|
||||
|
||||
class MultipartProgress:
|
||||
def __init__(
|
||||
self, progress: ProgressHandler, fields: dict, chunk: Optional[int] = None
|
||||
):
|
||||
self.progress = progress
|
||||
self.fields = fields
|
||||
self.boundary = urllib3.filepost.choose_boundary()
|
||||
self.content_type = f"multipart/form-data; boundary={self.boundary}"
|
||||
self.chunk = chunk
|
||||
|
||||
def __iter__(self):
|
||||
for field in urllib3.filepost.iter_field_objects(self.fields):
|
||||
yield f"--{self.boundary}\r\n".encode()
|
||||
yield field.render_headers().encode()
|
||||
|
||||
data = field.data
|
||||
|
||||
if isinstance(data, int):
|
||||
data = str(data) # Backwards compatibility
|
||||
if isinstance(data, str):
|
||||
yield data.encode()
|
||||
elif isinstance(data, File):
|
||||
# 1/100th the size if the condition is met, else 150k or 1MB:
|
||||
# 150000 <= 1/100th the size <= 1MB
|
||||
chunk = self.chunk or int(min(max(data.size / 100, 150000), 1048576))
|
||||
while True:
|
||||
d = data.handle.read(chunk)
|
||||
if not d:
|
||||
break
|
||||
yield d
|
||||
self.progress.advance(chunk)
|
||||
else:
|
||||
yield data
|
||||
yield b"\r\n"
|
||||
yield f"--{self.boundary}--\r\n".encode()
|
||||
|
||||
def __len__(self):
|
||||
result = 36
|
||||
for k, v in self.fields.items():
|
||||
result += 36
|
||||
result += len(k)
|
||||
if isinstance(v, tuple):
|
||||
result += 74 # All the fluff
|
||||
result += len(v[0])
|
||||
result += v[1].size
|
||||
if len(v) == 3:
|
||||
result += len(v[2])
|
||||
else:
|
||||
result += len(urllib3.fields.guess_content_type(v[0]))
|
||||
elif isinstance(v, str):
|
||||
result += 45 # All the fluff
|
||||
result += len(v)
|
||||
else:
|
||||
self.progress.warning(f"Got unexpected type {type(v)}.")
|
||||
return result
|
||||
|
||||
# This is to allow requests.post(**MultipartProgress(...))
|
||||
def keys(self):
|
||||
return ("headers", "data")
|
||||
|
||||
def __getitem__(self, name: str):
|
||||
if name == "headers":
|
||||
return {"Content-Type": self.content_type}
|
||||
elif name == "data":
|
||||
return self
|
Loading…
Reference in New Issue