Source code for tools.file_utils

"""Tools to work with files
"""
import difflib
import functools
import logging
import os
import errno
import pathlib
import re
import shutil
import uuid
import warnings
import zipfile
from sys import platform
from pathlib import Path
import typing
from typing import Optional, Union, List, Sequence, Dict
import sys


[docs]def create_unique_file_path(parent_dir: Optional[Union[str, Path]] = None, extension: Optional[Union[str, Path]] = None) -> str: if not parent_dir: parent_dir = Path.cwd() if not extension: extension = "" while True: name = f"{uuid.uuid4()}{extension}" file_path = Path.joinpath(Path(parent_dir).resolve(), name) if not file_path.exists(): return str(file_path)
[docs]def create_dir(dir_path: str) -> str: """Returns the directory **dir_path** and create it if path does not exist. Args: dir_path (str): Path to the directory that will be created. Returns: str: Directory dir path. """ if not Path(dir_path).exists(): Path(dir_path).mkdir(exist_ok=True, parents=True) return str(Path(dir_path))
[docs]def create_stdin_file(intput_string: str) -> str: file_path = create_unique_file_path(extension=".stdin") with open(file_path, "w") as file_handler: file_handler.write(intput_string) return file_path
[docs]def create_unique_dir( path: str = "", prefix: str = "", number_attempts: int = 10, out_log: Optional[logging.Logger] = None, ) -> str: """Create a directory with a prefix + computed unique name. If the computed name collides with an existing file name it attemps **number_attempts** times to create another unique id and create the directory with the new name. Args: path (str): ('') Parent path of the new directory. prefix (str): ('') String to be added before the computed unique dir name. number_attempts (int): (10) number of times creating the directory if there's a name conflict. out_log (logger): (None) Python logger object. Returns: str: Directory dir path. """ new_dir = prefix + str(uuid.uuid4()) if path: new_dir = str(Path(path).joinpath(new_dir)) for i in range(number_attempts): try: oldumask = os.umask(0) Path(new_dir).mkdir(mode=0o777, parents=True, exist_ok=False) if out_log: out_log.info("%s directory successfully created" % new_dir) os.umask(oldumask) return new_dir except OSError: if out_log: out_log.info(new_dir + " Already exists") out_log.info("Retrying %i times more" % (number_attempts - i)) new_dir = prefix + str(uuid.uuid4().hex) if path: new_dir = str(Path(path).joinpath(new_dir)) if out_log: out_log.info("Trying with: " + new_dir) raise FileExistsError
[docs]def get_working_dir_path(working_dir_path: Optional[Union[str, Path]] = None, restart: bool = False) -> str: """Return the directory **working_dir_path** and create it if working_dir_path does not exist. If **working_dir_path** exists a consecutive numerical suffix is added to the end of the **working_dir_path** and is returned. Args: working_dir_path (str): Path to the workflow results. restart (bool): If step result exists do not execute the step again. Returns: str: Path to the workflow results directory. """ if not working_dir_path: return str(Path.cwd().resolve()) working_dir_path = str(Path(working_dir_path).resolve()) if (not Path(working_dir_path).exists()) or restart: return str(Path(working_dir_path)) cont = 1 while Path(str(working_dir_path)).exists(): working_dir_path = ( re.split(r"_[0-9]+$", str(working_dir_path))[0] + "_" + str(cont) ) cont += 1 return str(working_dir_path)
[docs]def zip_list( zip_file: Union[str, Path], file_list: Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None ): """Compress all files listed in **file_list** into **zip_file** zip file. Args: zip_file (str): Output compressed zip file. file_list (:obj:`list` of :obj:`str`): Input list of files to be compressed. out_log (:obj:`logging.Logger`): Input log object. """ file_list = list(file_list) file_list.sort() Path(zip_file).parent.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_file, "w") as zip_f: inserted = [] for index, f in enumerate(file_list): base_name = Path(f).name if base_name in inserted: base_name = "file_" + str(index) + "_" + base_name inserted.append(base_name) zip_f.write(f, arcname=base_name) if out_log: out_log.info("Adding:") out_log.info(str(file_list)) out_log.info("to: " + str(Path(zip_file).resolve()))
[docs]def unzip_list( zip_file: Union[str, Path], dest_dir: Optional[Union[str, Path]] = None, out_log: Optional[logging.Logger] = None ) -> List[str]: """Extract all files in the zipball file and return a list containing the absolute path of the extracted files. Args: zip_file (str): Input compressed zip file. dest_dir (str): Path to directory where the files will be extracted. out_log (:obj:`logging.Logger`): Input log object. Returns: :obj:`list` of :obj:`str`: List of paths of the extracted files. """ with zipfile.ZipFile(zip_file, "r") as zip_f: zip_f.extractall(path=dest_dir) file_list = [str(Path(str(dest_dir)).joinpath(f)) for f in zip_f.namelist()] if out_log: out_log.info("Extracting: " + str(Path(zip_file).resolve())) out_log.info("to:") out_log.info(str(file_list)) return file_list
[docs]def search_topology_files( top_file: Union[str, Path], out_log: Optional[logging.Logger] = None ) -> List[str]: """Search the top and itp files to create a list of the topology files Args: top_file (str): Topology GROMACS top file. out_log (:obj:`logging.Logger`): Input log object. Returns: :obj:`list` of :obj:`str`: List of paths of the extracted files. """ top_dir_name = str(Path(top_file).parent) file_list = [] pattern = re.compile(r"#include\s+\"(.+)\"") if Path(top_file).exists(): with open(top_file) as tf: for line in tf: include_file = pattern.match(line.strip()) if include_file: found_file = str(Path(top_dir_name).joinpath(include_file.group(1))) file_list += search_topology_files(found_file, out_log) else: if out_log: out_log.info("Ignored file %s" % top_file) return file_list return file_list + [str(top_file)]
[docs]def zip_top( zip_file: Union[str, Path], top_file: Union[str, Path], out_log: Optional[logging.Logger] = None, remove_original_files: bool = True, ) -> List[str]: """Compress all *.itp and *.top files in the cwd into **zip_file** zip file. Args: zip_file (str): Output compressed zip file. top_file (str): Topology TOP GROMACS file. out_log (:obj:`logging.Logger`): Input log object. Returns: :obj:`list` of :obj:`str`: List of compressed paths. """ file_list = search_topology_files(top_file, out_log) zip_list(zip_file, file_list, out_log) if remove_original_files: rm_file_list(file_list, out_log) return file_list
[docs]def unzip_top( zip_file: Union[str, Path], out_log: Optional[logging.Logger] = None, unique_dir: Optional[typing.Union[pathlib.Path, str]] = None, ) -> str: """Extract all files in the zip_file and copy the file extracted ".top" file to top_file. Args: zip_file (str): Input topology zipball file path. out_log (:obj:`logging.Logger`): Input log object. unique_dir (str): Directory where the topology will be extracted. Returns: str: Path to the extracted ".top" file. """ unique_dir = unique_dir or create_unique_dir() top_list = unzip_list(zip_file, unique_dir, out_log) top_file = next(name for name in top_list if name.endswith(".top")) if out_log: out_log.info("Unzipping: ") out_log.info(zip_file) out_log.info("To: ") for file_name in top_list: out_log.info(file_name) return top_file
[docs]def get_logs_prefix(): return 4 * " "
[docs]def get_logs( path: Optional[Union[str, Path]] = None, prefix: Optional[str] = None, step: Optional[str] = None, can_write_console: bool = True, level: str = "INFO", light_format: bool = False, ) -> typing.Tuple[logging.Logger, logging.Logger]: """Get the error and and out Python Logger objects. Args: path (str): (current working directory) Path to the log file directory. prefix (str): Prefix added to the name of the log file. step (str): String added between the **prefix** arg and the name of the log file. can_write_console (bool): (False) If True, show log in the execution terminal. level (str): ('INFO') Set Logging level. ['CRITICAL','ERROR','WARNING','INFO','DEBUG','NOTSET'] light_format (bool): (False) Minimalist log format. Returns: :obj:`tuple` of :obj:`logging.Logger` and :obj:`logging.Logger`: Out and err Logger objects. """ prefix = prefix if prefix else "" step = step if step else "" path = path if path else str(Path.cwd()) out_log_path = create_name(path=path, prefix=prefix, step=step, name="log.out") err_log_path = create_name(path=path, prefix=prefix, step=step, name="log.err") # If logfile exists create a new one adding a number at the end if Path(out_log_path).exists(): name = "log.out" cont = 1 while Path(out_log_path).exists(): name = name.split(".")[0].rstrip("\\/0123456789_") + str(cont) + ".out" out_log_path = create_name(path=path, prefix=prefix, step=step, name=name) cont += 1 if Path(err_log_path).exists(): name = "log.err" cont = 1 while Path(err_log_path).exists(): name = name.split(".")[0].rstrip("\\/0123456789_") + str(cont) + ".err" err_log_path = create_name(path=path, prefix=prefix, step=step, name=name) cont += 1 # Create dir if it not exists create_dir(str(Path(out_log_path).resolve().parent)) # Create logging format logFormatter = logging.Formatter( "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s" ) if light_format: logFormatter = logging.Formatter("%(asctime)s %(message)s", "%H:%M:%S") # Create logging objects out_Logger = logging.getLogger(out_log_path) err_Logger = logging.getLogger(err_log_path) # Create FileHandler out_fileHandler = logging.FileHandler( out_log_path, mode="a", encoding=None, delay=False ) err_fileHandler = logging.FileHandler( err_log_path, mode="a", encoding=None, delay=False ) # Asign format to FileHandler out_fileHandler.setFormatter(logFormatter) err_fileHandler.setFormatter(logFormatter) # Assign FileHandler to logging object if not len(out_Logger.handlers): out_Logger.addHandler(out_fileHandler) err_Logger.addHandler(err_fileHandler) # Create consoleHandler consoleHandler = logging.StreamHandler(stream=sys.stdout) # Assign format to consoleHandler consoleHandler.setFormatter(logFormatter) # Assign consoleHandler to logging objects as aditional output if can_write_console and len(out_Logger.handlers) < 2: out_Logger.addHandler(consoleHandler) err_Logger.addHandler(consoleHandler) # Set logging level level out_Logger.setLevel(level) err_Logger.setLevel(level) return out_Logger, err_Logger
[docs]def launchlogger(func): @functools.wraps(func) def wrapper_log(*args, **kwargs): args[0].out_log, args[0].err_log = get_logs( path=args[0].path, prefix=args[0].prefix, step=args[0].step, can_write_console=args[0].can_write_console_log, ) value = func(*args, **kwargs) handlers = args[0].out_log.handlers[ : ] # Create a copy [:] of the handler list to be able to modify it while we are iterating for handler in handlers: handler.close() args[0].out_log.removeHandler(handler) handlers = args[0].err_log.handlers[ : ] # Create a copy [:] of the handler list to be able to modify it while we are iterating for handler in handlers: handler.close() args[0].err_log.removeHandler(handler) return value return wrapper_log
[docs]def log(string: str, local_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None): """Checks if log exists Args: string (str): Message to log. local_log (:obj:`logging.Logger`): local log object. global_log (:obj:`logging.Logger`): global log object. """ if local_log: local_log.info(string) if global_log: global_log.info(get_logs_prefix() + string)
[docs]def human_readable_time(time_ps: int) -> str: """Transform **time_ps** to a human readable string. Args: time_ps (int): Time in pico seconds. Returns: str: Human readable time. """ time_units = [ "femto seconds", "pico seconds", "nano seconds", "micro seconds", "mili seconds", ] t = time_ps * 1000 for tu in time_units: if t < 1000: return str(t) + " " + tu t = int(t/1000) return str(time_ps)
[docs]def check_properties(obj: object, properties: dict, reserved_properties: Optional[Sequence[str]] = None): if not reserved_properties: reserved_properties = [] error_properties = set( [prop for prop in properties.keys() if prop not in obj.__dict__.keys()] ) error_properties -= set(["system", "working_dir_path"] + list(reserved_properties)) for error_property in error_properties: close_property = difflib.get_close_matches( error_property, obj.__dict__.keys(), n=1, cutoff=0.01 ) close_property = close_property[0] if close_property else "" warnings.warn( "Warning: %s is not a recognized property. The most similar property is: %s" % (error_property, close_property) )
[docs]def create_name( path: Optional[Union[str, Path]] = None, prefix: Optional[str] = None, step: Optional[str] = None, name: Optional[str] = None ) -> str: """Return file name. Args: path (str): Path to the file directory. prefix (str): Prefix added to the name of the file. step (str): String added between the **prefix** arg and the **name** arg of the file. name (str): Name of the file. Returns: str: Composed file name. """ name = "" if name is None else name.strip() if step: if name: name = step + "_" + name else: name = step if prefix: prefix = prefix.replace("/", "_") if name: name = prefix + "_" + name else: name = prefix if path: if name: name = str(Path(path).joinpath(name)) else: name = str(path) return name
[docs]def write_failed_output(file_name: str): with open(file_name, "w") as f: f.write("Error\n")
[docs]def rm(file_name: Union[str, Path]) -> Optional[Union[str, Path]]: try: file_path = pathlib.Path(file_name) if file_path.exists(): if file_path.is_dir(): shutil.rmtree(file_name) return file_name if file_path.is_file(): Path(file_name).unlink() return file_name except Exception: pass return None
[docs]def rm_file_list( file_list: typing.Iterable[Union[str, Path]], out_log: Optional[logging.Logger] = None ) -> List[str]: removed_files = [str(f) for f in file_list if rm(f)] if out_log: log("Removed: %s" % str(removed_files), out_log) return removed_files
[docs]def check_complete_files(output_file_list: typing.Iterable[str]) -> bool: for output_file in filter(None, output_file_list): if not (Path(output_file).is_file() and Path(output_file).stat().st_size > 0): return False return True
[docs]def copy_to_container(container_path: Optional[Union[str, Path]], container_volume_path: str, io_dict: Dict, out_log: Optional[logging.Logger] = None) -> Dict: if not container_path: return io_dict unique_dir = str(Path(create_unique_dir()).resolve()) container_io_dict: Dict = {"in": {}, "out": {}, "unique_dir": unique_dir} # IN files COPY and assign INTERNAL PATH for file_ref, file_path in io_dict["in"].items(): if file_path: if Path(file_path).exists(): shutil.copy2(file_path, unique_dir) log(f"Copy: {file_path} to {unique_dir}") container_io_dict["in"][file_ref] = str( Path(container_volume_path).joinpath(Path(file_path).name) ) else: # Default files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro) container_io_dict["in"][file_ref] = file_path # OUT files assign INTERNAL PATH for file_ref, file_path in io_dict["out"].items(): if file_path: container_io_dict["out"][file_ref] = str( Path(container_volume_path).joinpath(Path(file_path).name) ) return container_io_dict
[docs]def copy_to_host(container_path: str, container_io_dict: dict, io_dict: dict): if not container_path: return # OUT files COPY for file_ref, file_path in container_io_dict["out"].items(): if file_path: container_file_path = str( Path(container_io_dict["unique_dir"]).joinpath(Path(file_path).name) ) if Path(container_file_path).exists(): shutil.copy2(container_file_path, io_dict["out"][file_ref])
[docs]def create_cmd_line( cmd: List[str], container_path: Optional[Union[str, Path]] = "", host_volume: Optional[Union[str, Path]] = None, container_volume: Optional[Union[str, Path]] = None, container_working_dir: Optional[Union[str, Path]] = None, container_user_uid: Optional[str] = None, container_shell_path: Optional[Union[str, Path]] = None, container_image: Optional[Union[str, Path]] = None, out_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None ) -> List[str]: container_path = container_path or "" if str(container_path).endswith("singularity"): log("Using Singularity image %s" % container_image, out_log, global_log) if not Path(str(container_image)).exists(): log( f"{container_image} does not exist trying to pull it", out_log, global_log, ) container_image_name = str(Path(str(container_image)).with_suffix(".sif").name) singularity_pull_cmd = [ str(container_path), "pull", "--name", str(container_image_name), str(container_image), ] try: from biobb_common.command_wrapper import cmd_wrapper cmd_wrapper.CmdWrapper(cmd=singularity_pull_cmd, out_log=out_log).launch() if Path(container_image_name).exists(): container_image = container_image_name else: raise FileNotFoundError except FileNotFoundError: log(f"{' '.join(singularity_pull_cmd)} not found", out_log, global_log) raise FileNotFoundError singularity_cmd: List[str] = [ str(container_path), "exec", "-e", "--bind", str(host_volume) + ":" + str(container_volume), str(container_image), ] # If we are working on a mac remove -e option because is still no available if platform == "darwin": if "-e" in singularity_cmd: singularity_cmd.remove("-e") cmd = ['"' + " ".join(cmd) + '"'] singularity_cmd.extend([str(container_shell_path), "-c"]) return singularity_cmd + cmd elif str(container_path).endswith("docker"): log("Using Docker image %s" % container_image, out_log, global_log) docker_cmd = [str(container_path), "run"] if container_working_dir: docker_cmd.append("-w") docker_cmd.append(str(container_working_dir)) if container_volume: docker_cmd.append("-v") docker_cmd.append(str(host_volume) + ":" + str(container_volume)) if container_user_uid: docker_cmd.append("--user") docker_cmd.append(container_user_uid) docker_cmd.append(str(container_image)) cmd = ['"' + " ".join(cmd) + '"'] docker_cmd.extend([str(container_shell_path), "-c"]) return docker_cmd + cmd elif str(container_path).endswith("pcocc"): # pcocc run -I racov56:pmx cli.py mutate -h log("Using pcocc image %s" % container_image, out_log, global_log) pcocc_cmd = [str(container_path), "run", "-I", str(container_image)] if container_working_dir: pcocc_cmd.append("--cwd") pcocc_cmd.append(str(container_working_dir)) if container_volume: pcocc_cmd.append("--mount") pcocc_cmd.append(str(host_volume) + ":" + str(container_volume)) if container_user_uid: pcocc_cmd.append("--user") pcocc_cmd.append(container_user_uid) cmd = ['\\"' + " ".join(cmd) + '\\"'] pcocc_cmd.extend([str(container_shell_path), "-c"]) return pcocc_cmd + cmd else: # log('Not using any container', out_log, global_log) return cmd
[docs]def get_doc_dicts(doc: Optional[str]): regex_argument = re.compile( r"(?P<argument>\w*)\ *(?:\()(?P<type>\w*)(?:\)):?\ *(?P<optional>\(\w*\):)?\ *(?P<description>.*?)(?:\.)\ *(?:File type:\ *)(?P<input_output>\w+)\.\ *(\`(?:.+)\<(?P<sample_file>.*?)\>\`\_\.)?\ *(?:Accepted formats:\ *)(?P<formats>.+)(?:\.)?" ) regex_argument_formats = re.compile( r"(?P<extension>\w*)\ *(\(\ *)\ *edam\ *:\ *(?P<edam>\w*)" ) regex_property = re.compile( r"(?:\*\ *\*\*)(?P<property>.*?)(?:\*\*)\ *(?:\(\*)(?P<type>\w*)(?:\*\))\ *\-\ ?(?:\()(?P<default_value>.*?)(?:\))\ *(?:(?:\[)(?P<wf_property>WF property)(?:\]))?\ *(?:(?:\[)(?P<range_start>[\-]?\d+(?:\.\d+)?)\~(?P<range_stop>[\-]?\d+(?:\.\d+)?)(?:\|)?(?P<range_step>\d+(?:\.\d+)?)?(?:\]))?\ *(?:(?:\[)(.*?)(?:\]))?\ *(?P<description>.*)" ) regex_property_value = re.compile( r"(?P<value>\w*)\ *(?:(?:\()(?P<description>.*?)?(?:\)))?" ) doc_lines = list( map(str.strip, filter(lambda line: line.strip(), str(doc).splitlines())) ) args_index = doc_lines.index( next(filter(lambda line: line.lower().startswith("args"), doc_lines)) ) properties_index = doc_lines.index( next(filter(lambda line: line.lower().startswith("properties"), doc_lines)) ) examples_index = doc_lines.index( next(filter(lambda line: line.lower().startswith("examples"), doc_lines)) ) arguments_lines_list = doc_lines[args_index + 1: properties_index] properties_lines_list = doc_lines[properties_index + 1: examples_index] doc_arguments_dict = {} for argument_line in arguments_lines_list: match_argument = regex_argument.match(argument_line) argument_dict = match_argument.groupdict() if match_argument is not None else {} argument_dict["formats"] = { match.group("extension"): match.group("edam") for match in regex_argument_formats.finditer(argument_dict["formats"]) } doc_arguments_dict[argument_dict.pop("argument")] = argument_dict doc_properties_dict = {} for property_line in properties_lines_list: match_property = regex_property.match(property_line) property_dict = match_property.groupdict() if match_property is not None else {} property_dict["values"] = None if "Values:" in property_dict["description"]: property_dict["description"], property_dict["values"] = property_dict[ "description" ].split("Values:") property_dict["values"] = { match.group("value"): match.group("description") for match in regex_property_value.finditer(property_dict["values"]) if match.group("value") } doc_properties_dict[property_dict.pop("property")] = property_dict return doc_arguments_dict, doc_properties_dict
[docs]def check_argument( path: Optional[pathlib.Path], argument: str, optional: bool, module_name: str, input_output: Optional[str] = None, output_files_created: bool = False, extension_list: Optional[Sequence[str]] = None, raise_exception: bool = True, check_extensions: bool = True, out_log: Optional[logging.Logger] = None, ) -> None: if optional and not path: return None if input_output in ["in", "input"]: input_file = True elif input_output in ["out", "output"]: input_file = False else: unable_to_determine_string = ( f"{module_name} {argument}: Unable to determine if input or output file." ) log(unable_to_determine_string, out_log) if raise_exception: raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), unable_to_determine_string ) warnings.warn(unable_to_determine_string) if input_file or output_files_created: not_found_error_string = ( f"Path {path} --- {module_name}: Unexisting {argument} file." ) if not Path(str(path)).exists(): log(not_found_error_string, out_log) if raise_exception: raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), not_found_error_string ) warnings.warn(not_found_error_string) # else: # if not path.parent.exists(): # not_found_dir_error_string = f"Path {path.parent} --- {module_name}: Unexisting {argument} directory." # log(not_found_dir_error_string, out_log) # if raise_exception: # raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), not_found_dir_error_string) # warnings.warn(not_found_dir_error_string) if check_extensions and extension_list: no_extension_error_string = f"{module_name} {argument}: {path} has no extension. If you want to suppress this message, please set the check_extensions property to False" if not Path(str(path)).suffix: log(no_extension_error_string) warnings.warn(no_extension_error_string) else: not_valid_extension_error_string = f"{module_name} {argument}: {path} extension is not in the valid extensions list: {extension_list}. If you want to suppress this message, please set the check_extensions property to False" if not Path(str(path)).suffix[1:].lower() in extension_list: log(not_valid_extension_error_string) warnings.warn(not_valid_extension_error_string)