Files

291 lines
8.7 KiB
Python

import re
import subprocess
from typing import Any, Callable, Dict, List, Tuple, Union
from rich.progress import Progress, TaskID, Task
from pathlib import Path
from rich.progress import (
Progress,
TextColumn,
BarColumn,
TaskProgressColumn,
TimeRemainingColumn,
SpinnerColumn,
DownloadColumn,
)
# ---------------------------------------------------------------------------- #
# General Functions #
# ---------------------------------------------------------------------------- #
def args2string(args: List[str]) -> str:
out = ""
for item in args:
# separate flags/ named arguments by a space
out += f" {item}"
return out
def run_cmd(
command: str, args: List[str] = (), shell=True, encoding="utf-8"
) -> subprocess.CompletedProcess:
# add optional arguments and flags to the command
args_str = args2string(args)
command = f"{command} {args_str}"
return subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=shell,
encoding=encoding,
)
def shorten_filepath(in_path: str, max_length: int) -> str:
if len(in_path) > max_length:
if ":" in in_path:
in_path = (
in_path[in_path.index(":") + 1 :]
if in_path.index(":") + 1 < len(in_path)
else in_path[0 : in_path.index(":")]
)
return Path(in_path).name
else:
return in_path
def convert2bits(value: float, unit: str) -> float:
"""Returns the corresponding bit value to a value with a certain binary prefix (based on powers of 2) like KiB or MiB.
Args:
value (float): Bit value using a certain binary prefix like KiB or MiB.
unit (str): The binary prefix.
Returns:
float: The corresponding bit value.
"""
exp = {
"B": 0,
"KiB": 1,
"MiB": 2,
"GiB": 3,
"TiB": 4,
"PiB": 5,
"EiB": 6,
"ZiB": 7,
"YiB": 8,
}
return value * 1024 ** exp[unit]
# ---------------------------------------------------------------------------- #
# Progressbar related functions #
# ---------------------------------------------------------------------------- #
def rclone_progress(
command: str,
pbar_title: str,
stderr=subprocess.PIPE,
show_progress=True,
listener: Callable[[Dict], None] = None,
debug=False,
) -> subprocess.Popen:
buffer = ""
pbar = None
total_progress_id = None
subprocesses = {}
if show_progress:
pbar, total_progress_id = create_progress_bar(pbar_title)
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=stderr, shell=True
)
for line in iter(process.stdout.readline, b""):
var = line.decode()
valid, update_dict = extract_rclone_progress(buffer)
if valid:
if show_progress:
update_tasks(pbar, total_progress_id, update_dict, subprocesses)
# call the listener
if listener:
listener(update_dict)
if debug:
pbar.log(buffer)
# reset the buffer
buffer = ""
else:
# buffer until we
buffer += var
if show_progress:
complete_task(total_progress_id, pbar)
for _, task_id in subprocesses.items():
# hide all subprocesses
pbar.update(task_id=task_id, visible=False)
pbar.stop()
return process
def extract_rclone_progress(buffer: str) -> Tuple[bool, Union[Dict[str, Any], None]]:
# matcher that checks if the progress update block is completely buffered yet (defines start and stop)
# it gets the sent bits, total bits, progress, transfer-speed and eta
reg_transferred = re.findall(
r"Transferred:\s+(\d+.\d+ \w+) \/ (\d+.\d+ \w+), (\d{1,3})%, (\d+.\d+ \w+\/\w+), ETA (\S+)",
buffer,
)
if reg_transferred: # transferred block is completely buffered
# get the progress of the individual files
# matcher gets the currently transferring files and their individual progress
# returns list of tuples: (name, progress, file_size, unit)
prog_transferring = []
prog_regex = re.findall(
r"\* +(.+):[ ]+(\d{1,3})% \/(\d+.\d+)([a-zA-Z]+),", buffer
)
for item in prog_regex:
prog_transferring.append(
(
item[0],
int(item[1]),
float(item[2]),
# the suffix B of the unit is missing for subprocesses
item[3] + "B",
)
)
out = {"prog_transferring": prog_transferring}
sent_bits, total_bits, progress, transfer_speed_str, eta = reg_transferred[0]
out["progress"] = float(progress.strip())
out["total_bits"] = float(re.findall(r"\d+.\d+", total_bits)[0])
out["sent_bits"] = float(re.findall(r"\d+.\d+", sent_bits)[0])
out["unit_sent"] = re.findall(r"[a-zA-Z]+", sent_bits)[0]
out["unit_total"] = re.findall(r"[a-zA-Z]+", total_bits)[0]
out["transfer_speed"] = float(re.findall(r"\d+.\d+", transfer_speed_str)[0])
out["transfer_speed_unit"] = re.findall(
r"[a-zA-Z]+/[a-zA-Z]+", transfer_speed_str
)[0]
out["eta"] = eta
return True, out
else:
return False, None
def create_progress_bar(pbar_title: str) -> Tuple[Progress, TaskID]:
pbar = Progress(
TextColumn("[progress.description]{task.description}"),
SpinnerColumn(),
BarColumn(),
TaskProgressColumn(),
DownloadColumn(binary_units=True),
TimeRemainingColumn(),
)
pbar.start()
total_progress = pbar.add_task(pbar_title, total=None)
return pbar, total_progress
def get_task(id: TaskID, progress: Progress) -> Task:
"""Returns the task with the specified TaskID.
Args:
id (TaskID): The id of the task.
progress (Progress): The rich progress.
Returns:
Task: The task with the specified TaskID.
"""
for task in progress.tasks:
if task.id == id:
return task
return None
def complete_task(id: TaskID, progress: Progress):
"""Manually sets the progress of the task with the specified TaskID to 100%.
Args:
id (TaskID): The task that should be completed.
progress (Progress): The rich progress.
"""
task = get_task(id, progress)
if task.total is None:
# reset columns to hide file size (we don't know it)
progress.columns = Progress.get_default_columns()
total = task.total or 1
progress.update(id, completed=total, total=total)
def update_tasks(
pbar: Progress,
total_progress: TaskID,
update_dict: Dict[str, Any],
subprocesses: Dict[str, TaskID],
):
"""Updates the total progress as well as all subprocesses (the individual files that are currently uploading).
Args:
pbar (Progress): The rich progress.
total_progress (TaskID): The TaskID of the total progress.
update_dict (Dict): The update dict generated by the _extract_rclone_progress function.
subprocesses (Dict): A dictionary containing all the subprocesses.
"""
pbar.update(
total_progress,
completed=convert2bits(update_dict["sent_bits"], update_dict["unit_sent"]),
total=convert2bits(update_dict["total_bits"], update_dict["unit_total"]),
)
sp_names = set()
for sp_file_name, sp_progress, sp_size, sp_unit in update_dict["prog_transferring"]:
task_id = None
sp_names.add(sp_file_name)
if sp_file_name not in subprocesses:
task_id = pbar.add_task(" ", visible=False)
subprocesses[sp_file_name] = task_id
else:
task_id = subprocesses[sp_file_name]
pbar.update(
task_id,
# set the description every time to reset the '├'
description=f" ├─{sp_file_name}",
completed=convert2bits(sp_size, sp_unit) * sp_progress / 100.0,
total=convert2bits(sp_size, sp_unit),
# hide subprocesses if we only upload a single file
visible=len(subprocesses) > 1,
)
# make all processes invisible that are no longer provided by rclone (bc. their upload completed)
missing = list(sorted(subprocesses.keys() - sp_names))
for missing_sp_id in missing:
pbar.update(subprocesses[missing_sp_id], visible=False)
# change symbol for the last visible process
for task in reversed(pbar.tasks):
if task.visible:
pbar.update(task.id, description=task.description.replace("", ""))
break