import re import subprocess from typing import Any, Callable, Dict, List, Tuple, Union from rich.progress import Progress, TaskID, Task from pathlib import Path from rich.progress import ( Progress, TextColumn, BarColumn, TaskProgressColumn, TimeRemainingColumn, SpinnerColumn, DownloadColumn, ) # ---------------------------------------------------------------------------- # # General Functions # # ---------------------------------------------------------------------------- # def args2string(args: List[str]) -> str: out = "" for item in args: # separate flags/ named arguments by a space out += f" {item}" return out def run_cmd( command: str, args: List[str] = (), shell=True, encoding="utf-8" ) -> subprocess.CompletedProcess: # add optional arguments and flags to the command args_str = args2string(args) command = f"{command} {args_str}" return subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, encoding=encoding, ) def shorten_filepath(in_path: str, max_length: int) -> str: if len(in_path) > max_length: if ":" in in_path: in_path = ( in_path[in_path.index(":") + 1 :] if in_path.index(":") + 1 < len(in_path) else in_path[0 : in_path.index(":")] ) return Path(in_path).name else: return in_path def convert2bits(value: float, unit: str) -> float: """Returns the corresponding bit value to a value with a certain binary prefix (based on powers of 2) like KiB or MiB. Args: value (float): Bit value using a certain binary prefix like KiB or MiB. unit (str): The binary prefix. Returns: float: The corresponding bit value. """ exp = { "B": 0, "KiB": 1, "MiB": 2, "GiB": 3, "TiB": 4, "PiB": 5, "EiB": 6, "ZiB": 7, "YiB": 8, } return value * 1024 ** exp[unit] # ---------------------------------------------------------------------------- # # Progressbar related functions # # ---------------------------------------------------------------------------- # def rclone_progress( command: str, pbar_title: str, stderr=subprocess.PIPE, show_progress=True, listener: Callable[[Dict], None] = None, debug=False, ) -> subprocess.Popen: buffer = "" pbar = None total_progress_id = None subprocesses = {} if show_progress: pbar, total_progress_id = create_progress_bar(pbar_title) process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=stderr, shell=True ) for line in iter(process.stdout.readline, b""): var = line.decode() valid, update_dict = extract_rclone_progress(buffer) if valid: if show_progress: update_tasks(pbar, total_progress_id, update_dict, subprocesses) # call the listener if listener: listener(update_dict) if debug: pbar.log(buffer) # reset the buffer buffer = "" else: # buffer until we buffer += var if show_progress: complete_task(total_progress_id, pbar) for _, task_id in subprocesses.items(): # hide all subprocesses pbar.update(task_id=task_id, visible=False) pbar.stop() return process def extract_rclone_progress(buffer: str) -> Tuple[bool, Union[Dict[str, Any], None]]: # matcher that checks if the progress update block is completely buffered yet (defines start and stop) # it gets the sent bits, total bits, progress, transfer-speed and eta reg_transferred = re.findall( r"Transferred:\s+(\d+.\d+ \w+) \/ (\d+.\d+ \w+), (\d{1,3})%, (\d+.\d+ \w+\/\w+), ETA (\S+)", buffer, ) if reg_transferred: # transferred block is completely buffered # get the progress of the individual files # matcher gets the currently transferring files and their individual progress # returns list of tuples: (name, progress, file_size, unit) prog_transferring = [] prog_regex = re.findall( r"\* +(.+):[ ]+(\d{1,3})% \/(\d+.\d+)([a-zA-Z]+),", buffer ) for item in prog_regex: prog_transferring.append( ( item[0], int(item[1]), float(item[2]), # the suffix B of the unit is missing for subprocesses item[3] + "B", ) ) out = {"prog_transferring": prog_transferring} sent_bits, total_bits, progress, transfer_speed_str, eta = reg_transferred[0] out["progress"] = float(progress.strip()) out["total_bits"] = float(re.findall(r"\d+.\d+", total_bits)[0]) out["sent_bits"] = float(re.findall(r"\d+.\d+", sent_bits)[0]) out["unit_sent"] = re.findall(r"[a-zA-Z]+", sent_bits)[0] out["unit_total"] = re.findall(r"[a-zA-Z]+", total_bits)[0] out["transfer_speed"] = float(re.findall(r"\d+.\d+", transfer_speed_str)[0]) out["transfer_speed_unit"] = re.findall( r"[a-zA-Z]+/[a-zA-Z]+", transfer_speed_str )[0] out["eta"] = eta return True, out else: return False, None def create_progress_bar(pbar_title: str) -> Tuple[Progress, TaskID]: pbar = Progress( TextColumn("[progress.description]{task.description}"), SpinnerColumn(), BarColumn(), TaskProgressColumn(), DownloadColumn(binary_units=True), TimeRemainingColumn(), ) pbar.start() total_progress = pbar.add_task(pbar_title, total=None) return pbar, total_progress def get_task(id: TaskID, progress: Progress) -> Task: """Returns the task with the specified TaskID. Args: id (TaskID): The id of the task. progress (Progress): The rich progress. Returns: Task: The task with the specified TaskID. """ for task in progress.tasks: if task.id == id: return task return None def complete_task(id: TaskID, progress: Progress): """Manually sets the progress of the task with the specified TaskID to 100%. Args: id (TaskID): The task that should be completed. progress (Progress): The rich progress. """ task = get_task(id, progress) if task.total is None: # reset columns to hide file size (we don't know it) progress.columns = Progress.get_default_columns() total = task.total or 1 progress.update(id, completed=total, total=total) def update_tasks( pbar: Progress, total_progress: TaskID, update_dict: Dict[str, Any], subprocesses: Dict[str, TaskID], ): """Updates the total progress as well as all subprocesses (the individual files that are currently uploading). Args: pbar (Progress): The rich progress. total_progress (TaskID): The TaskID of the total progress. update_dict (Dict): The update dict generated by the _extract_rclone_progress function. subprocesses (Dict): A dictionary containing all the subprocesses. """ pbar.update( total_progress, completed=convert2bits(update_dict["sent_bits"], update_dict["unit_sent"]), total=convert2bits(update_dict["total_bits"], update_dict["unit_total"]), ) sp_names = set() for sp_file_name, sp_progress, sp_size, sp_unit in update_dict["prog_transferring"]: task_id = None sp_names.add(sp_file_name) if sp_file_name not in subprocesses: task_id = pbar.add_task(" ", visible=False) subprocesses[sp_file_name] = task_id else: task_id = subprocesses[sp_file_name] pbar.update( task_id, # set the description every time to reset the '├' description=f" ├─{sp_file_name}", completed=convert2bits(sp_size, sp_unit) * sp_progress / 100.0, total=convert2bits(sp_size, sp_unit), # hide subprocesses if we only upload a single file visible=len(subprocesses) > 1, ) # make all processes invisible that are no longer provided by rclone (bc. their upload completed) missing = list(sorted(subprocesses.keys() - sp_names)) for missing_sp_id in missing: pbar.update(subprocesses[missing_sp_id], visible=False) # change symbol for the last visible process for task in reversed(pbar.tasks): if task.visible: pbar.update(task.id, description=task.description.replace("├", "└")) break