Source code for tools.file_utils

"""Tools to work with files
"""
import difflib
import functools
import logging
import os
import errno
import pathlib
import re
import shutil
import uuid
import warnings
import zipfile
from sys import platform
from pathlib import Path
import typing
from typing import Optional, Union
import sys
from contextlib import contextmanager


[docs] def create_unique_file_path(parent_dir: Optional[Union[str, Path]] = None, extension: Optional[Union[str, Path]] = None) -> str: if not parent_dir: parent_dir = Path.cwd() if not extension: extension = "" while True: name = f"{uuid.uuid4()}{extension}" file_path = Path.joinpath(Path(parent_dir).resolve(), name) if not file_path.exists(): return str(file_path)
[docs] def create_dir(dir_path: str) -> str: """Returns the directory **dir_path** and create it if path does not exist. Args: dir_path (str): Path to the directory that will be created. Returns: str: Directory dir path. """ if not Path(dir_path).exists(): Path(dir_path).mkdir(exist_ok=True, parents=True) return str(Path(dir_path))
[docs] def create_stdin_file(intput_string: str) -> str: file_path = create_unique_file_path(extension=".stdin") with open(file_path, "w") as file_handler: file_handler.write(intput_string) return file_path
[docs] def create_unique_dir( path: str = "", prefix: str = "", number_attempts: int = 10, out_log: Optional[logging.Logger] = None, ) -> str: """Create a directory with a prefix + computed unique name. If the computed name collides with an existing file name it attemps **number_attempts** times to create another unique id and create the directory with the new name. Args: path (str): ('') Parent path of the new directory. prefix (str): ('') String to be added before the computed unique dir name. number_attempts (int): (10) number of times creating the directory if there's a name conflict. out_log (logger): (None) Python logger object. Returns: str: Directory dir path. """ new_dir = prefix + str(uuid.uuid4()) if path: new_dir = str(Path(path).joinpath(new_dir)) for i in range(number_attempts): try: oldumask = os.umask(0) Path(new_dir).mkdir(mode=0o777, parents=True, exist_ok=False) if out_log: out_log.info("Directory successfully created: %s" % new_dir) os.umask(oldumask) return new_dir except OSError: if out_log: out_log.info(new_dir + " Already exists") out_log.info("Retrying %i times more" % (number_attempts - i)) new_dir = prefix + str(uuid.uuid4().hex) if path: new_dir = str(Path(path).joinpath(new_dir)) if out_log: out_log.info("Trying with: " + new_dir) raise FileExistsError
[docs] def get_working_dir_path(working_dir_path: Optional[Union[str, Path]] = None, restart: bool = False) -> str: """Return the directory **working_dir_path** and create it if working_dir_path does not exist. If **working_dir_path** exists a consecutive numerical suffix is added to the end of the **working_dir_path** and is returned. Args: working_dir_path (str): Path to the workflow results. restart (bool): If step result exists do not execute the step again. Returns: str: Path to the workflow results directory. """ if not working_dir_path: return str(Path.cwd().resolve()) working_dir_path = str(Path(working_dir_path).resolve()) if (not Path(working_dir_path).exists()) or restart: return str(Path(working_dir_path)) cont = 1 while Path(str(working_dir_path)).exists(): working_dir_path = ( re.split(r"_[0-9]+$", str(working_dir_path))[0] + "_" + str(cont) ) cont += 1 return str(working_dir_path)
[docs] def zip_list( zip_file: Union[str, Path], file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None ): """Compress all files listed in **file_list** into **zip_file** zip file. Args: zip_file (str): Output compressed zip file. file_list (:obj:`list` of :obj:`str`): Input list of files to be compressed. out_log (:obj:`logging.Logger`): Input log object. """ file_list = list(file_list) file_list.sort() Path(zip_file).parent.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_file, "w") as zip_f: inserted = [] for index, f in enumerate(file_list): base_name = Path(f).name if base_name in inserted: base_name = "file_" + str(index) + "_" + base_name inserted.append(base_name) zip_f.write(f, arcname=base_name) if out_log: out_log.info("Adding:") # out_log.info(list(map(lambda x: str(Path(x).resolve().relative_to(Path.cwd())), file_list))) out_log.info(str(file_list)) out_log.info("to: " + str(Path(zip_file).resolve()))
[docs] def unzip_list( zip_file: Union[str, Path], dest_dir: Optional[Union[str, Path]] = None, out_log: Optional[logging.Logger] = None ) -> list[str]: """Extract all files in the zipball file and return a list containing the absolute path of the extracted files. Args: zip_file (str): Input compressed zip file. dest_dir (str): Path to directory where the files will be extracted. out_log (:obj:`logging.Logger`): Input log object. Returns: :obj:`list` of :obj:`str`: list of paths of the extracted files. """ with zipfile.ZipFile(zip_file, "r") as zip_f: zip_f.extractall(path=dest_dir) file_list = [str(Path(str(dest_dir)).joinpath(f)) for f in zip_f.namelist()] if out_log: out_log.info("Extracting: " + str(Path(zip_file).resolve())) out_log.info("to:") out_log.info(str(file_list)) return file_list
[docs] def search_topology_files( top_file: Union[str, Path], out_log: Optional[logging.Logger] = None ) -> list[str]: """Search the top and itp files to create a list of the topology files Args: top_file (str): Topology GROMACS top file. out_log (:obj:`logging.Logger`): Input log object. Returns: :obj:`list` of :obj:`str`: list of paths of the extracted files. """ top_dir_name = str(Path(top_file).parent) file_list = [] pattern = re.compile(r"#include\s+\"(.+)\"") if Path(top_file).exists(): with open(top_file) as tf: for line in tf: include_file = pattern.match(line.strip()) if include_file: found_file = str(Path(top_dir_name).joinpath(include_file.group(1))) file_list += search_topology_files(found_file, out_log) else: if out_log: out_log.info("Ignored file %s" % top_file) return file_list return file_list + [str(top_file)]
[docs] def zip_top( zip_file: Union[str, Path], top_file: Union[str, Path], out_log: Optional[logging.Logger] = None, remove_original_files: bool = True, ) -> list[str]: """Compress all *.itp and *.top files in the cwd into **zip_file** zip file. Args: zip_file (str): Output compressed zip file. top_file (str): Topology TOP GROMACS file. out_log (:obj:`logging.Logger`): Input log object. Returns: :obj:`list` of :obj:`str`: list of compressed paths. """ file_list = search_topology_files(top_file, out_log) zip_list(zip_file, file_list, out_log) # Only remove files on the same directory of the top file rm_list = [f for f in file_list if Path(f).parent == Path(top_file).parent] if remove_original_files: rm_file_list(rm_list, out_log) return file_list
[docs] def unzip_top( zip_file: Union[str, Path], out_log: Optional[logging.Logger] = None, unique_dir: Optional[Union[pathlib.Path, str]] = None, ) -> str: """Extract all files in the zip_file and copy the file extracted ".top" file to top_file. Args: zip_file (str): Input topology zipball file path. out_log (:obj:`logging.Logger`): Input log object. unique_dir (str): Directory where the topology will be extracted. Returns: str: Path to the extracted ".top" file. """ unique_dir = unique_dir or create_unique_dir() top_list = unzip_list(zip_file, unique_dir, out_log) top_file = next(name for name in top_list if name.endswith(".top")) if out_log: out_log.info("Unzipping: ") out_log.info(zip_file) out_log.info("To: ") for file_name in top_list: out_log.info(file_name) return top_file
[docs] def get_logs_prefix(): return 4 * " "
[docs] def create_incremental_name(path: Union[Path, str]) -> str: """Increment the name of the file by adding a number at the end. Args: path (str): path of the file. Returns: str: Incremented name of the file. """ if (path_obj := Path(path)).exists(): cont = 1 while path_obj.exists(): new_name = f'{path_obj.stem.rstrip("0123456789_")}_{cont}{path_obj.suffix}' path_obj = path_obj.with_name(new_name) cont += 1 return str(path_obj)
[docs] def get_logs( path: Optional[Union[str, Path]] = None, prefix: Optional[str] = None, step: Optional[str] = None, can_write_console: bool = True, can_write_file: bool = True, out_log_path: Optional[Union[str, Path]] = None, err_log_path: Optional[Union[str, Path]] = None, level: str = "INFO", light_format: bool = False, ) -> tuple[logging.Logger, logging.Logger]: """Get the error and and out Python Logger objects. Args: path (str): (current working directory) Path to the log file directory. prefix (str): Prefix added to the name of the log file. step (str): String added between the **prefix** arg and the name of the log file. can_write_console (bool): (True) If True, show log in the execution terminal. can_write_file (bool): (True) If True, write log to the log files. out_log_path (str): (None) Path to the out log file. err_log_path (str): (None) Path to the err log file. level (str): ('INFO') Set Logging level. ['CRITICAL','ERROR','WARNING','INFO','DEBUG','NOTSET'] light_format (bool): (False) Minimalist log format. Returns: :obj:`tuple` of :obj:`logging.Logger` and :obj:`logging.Logger`: Out and err Logger objects. """ out_log_path = out_log_path or "log.out" err_log_path = err_log_path or "log.err" # If paths are not absolute create and return them if not Path(out_log_path).is_absolute(): out_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(out_log_path))) if not Path(err_log_path).is_absolute(): err_log_path = create_incremental_name(create_name(path=path, prefix=prefix, step=step, name=str(err_log_path))) # Create logging objects out_Logger = logging.getLogger(str(out_log_path)) err_Logger = logging.getLogger(str(err_log_path)) # Create logging format logFormatter = logging.Formatter( "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s" ) if light_format: logFormatter = logging.Formatter("%(asctime)s %(message)s", "%H:%M:%S") if can_write_file: prefix = prefix if prefix else "" step = step if step else "" path = path if path else str(Path.cwd()) # Create dir if it not exists create_dir(str(Path(out_log_path).resolve().parent)) # Create FileHandler out_fileHandler = logging.FileHandler(out_log_path, mode="a", encoding=None, delay=True) err_fileHandler = logging.FileHandler(err_log_path, mode="a", encoding=None, delay=True) # Asign format to FileHandler out_fileHandler.setFormatter(logFormatter) err_fileHandler.setFormatter(logFormatter) # Assign FileHandler to logging object if not len(out_Logger.handlers): out_Logger.addHandler(out_fileHandler) err_Logger.addHandler(err_fileHandler) if can_write_console: console_out = logging.StreamHandler(stream=sys.stdout) console_err = logging.StreamHandler(stream=sys.stderr) console_out.setFormatter(logFormatter) console_err.setFormatter(logFormatter) # Assign consoleHandler to logging objects as aditional output if len(out_Logger.handlers) < 2: out_Logger.addHandler(console_out) err_Logger.addHandler(console_err) # Set logging level level out_Logger.setLevel(level) err_Logger.setLevel(level) return out_Logger, err_Logger
[docs] def launchlogger(func): """Decorator to create the out_log and err_log""" @functools.wraps(func) def wrapper_log(*args, **kwargs): create_dir(create_name(path=args[0].path)) if args[0].disable_logs: return func(*args, **kwargs) # Create local out_log and err_log args[0].out_log, args[0].err_log = get_logs( path=args[0].path, prefix=args[0].prefix, step=args[0].step, can_write_console=args[0].can_write_console_log, can_write_file=args[0].can_write_file_log, out_log_path=args[0].out_log_path, err_log_path=args[0].err_log_path ) # Run the function and capture its return value value = func(*args, **kwargs) # Close and remove handlers from out_log and err_log for log in [args[0].out_log, args[0].err_log]: # Create a copy [:] of the handler list to be able to modify it while we are iterating handlers = log.handlers[:] for handler in handlers: handler.close() log.removeHandler(handler) return value return wrapper_log
[docs] def log(string: str, local_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None): """Checks if log exists Args: string (str): Message to log. local_log (:obj:`logging.Logger`): local log object. global_log (:obj:`logging.Logger`): global log object. """ if local_log: local_log.info(string) if global_log: global_log.info(get_logs_prefix() + string)
[docs] def human_readable_time(time_ps: int) -> str: """Transform **time_ps** to a human readable string. Args: time_ps (int): Time in pico seconds. Returns: str: Human readable time. """ time_units = [ "femto seconds", "pico seconds", "nano seconds", "micro seconds", "mili seconds", ] t = time_ps * 1000 for tu in time_units: if t < 1000: return str(t) + " " + tu t = int(t/1000) return str(time_ps)
[docs] def check_properties(obj: object, properties: dict, reserved_properties: Optional[list[str]] = None): if not reserved_properties: reserved_properties = [] error_properties = set( [prop for prop in properties.keys() if prop not in obj.__dict__.keys()] ) error_properties -= set(["system", "working_dir_path"] + list(reserved_properties)) for error_property in error_properties: close_property_list = difflib.get_close_matches( error_property, obj.__dict__.keys(), n=1, cutoff=0.01 ) close_property = close_property_list[0] if close_property_list else "" warnings.warn( "Warning: %s is not a recognized property. The most similar property is: %s" % (error_property, close_property) )
[docs] def create_name( path: Optional[Union[str, Path]] = None, prefix: Optional[str] = None, step: Optional[str] = None, name: Optional[str] = None ) -> str: """Return file name. Args: path (str): Path to the file directory. prefix (str): Prefix added to the name of the file. step (str): String added between the **prefix** arg and the **name** arg of the file. name (str): Name of the file. Returns: str: Composed file name. """ name = "" if name is None else name.strip() if step: if name: name = step + "_" + name else: name = step if prefix: prefix = prefix.replace("/", "_") if name: name = prefix + "_" + name else: name = prefix if path: if name: name = str(Path(path).joinpath(name)) else: name = str(path) return name
[docs] def write_failed_output(file_name: str): with open(file_name, "w") as f: f.write("Error\n")
[docs] def rm(file_name: Union[str, Path]) -> Optional[Union[str, Path]]: try: file_path = pathlib.Path(file_name) if file_path.exists(): if file_path.is_dir(): shutil.rmtree(file_name) return file_name if file_path.is_file(): Path(file_name).unlink() return file_name except Exception: pass return None
[docs] def rm_file_list( file_list: typing.Sequence[Union[str, Path]], out_log: Optional[logging.Logger] = None ) -> list[str]: removed_files = [str(f) for f in file_list if rm(f)] if len(removed_files) > 0 and out_log: log("Removed: %s" % str(removed_files), out_log) return removed_files
[docs] def check_complete_files(output_file_list: list[Union[str, Path]]) -> bool: for output_file in filter(None, output_file_list): output_file = Path(str(output_file)) file_exists = output_file.is_file() and output_file.stat().st_size > 0 dir_exists = output_file.is_dir() and any(output_file.iterdir()) if not file_exists and not dir_exists: return False return True
[docs] def copytree_new_files_only(source, destination): """ Recursively copies files from source to destination only if they don't already exist in the destination. """ if not os.path.exists(destination): os.makedirs(destination) for dirpath, dirnames, filenames in os.walk(source): # Create a corresponding directory in the destination relative_path = os.path.relpath(dirpath, source) dest_dir = os.path.join(destination, relative_path) if not os.path.exists(dest_dir): os.makedirs(dest_dir) # Copy files that do not exist or have newer modification times for filename in filenames: src_file_path = os.path.join(dirpath, filename) dest_file_path = os.path.join(dest_dir, filename) if not os.path.exists(dest_file_path) or os.path.getmtime(src_file_path) > os.path.getmtime(dest_file_path): shutil.copy2(src_file_path, dest_file_path)
[docs] def copy_to_container(container_path: Optional[Union[str, Path]], container_volume_path: str, io_dict: dict, out_log: Optional[logging.Logger] = None) -> dict: if not container_path: return io_dict unique_dir = str(Path(create_unique_dir()).resolve()) container_io_dict: dict = {"in": {}, "out": {}, "unique_dir": unique_dir} # IN files COPY and assign INTERNAL PATH for file_ref, file_path in io_dict["in"].items(): if file_path: if Path(file_path).exists(): shutil.copy2(file_path, unique_dir) log(f"Copy: {file_path} to {unique_dir}") container_io_dict["in"][file_ref] = str( Path(container_volume_path).joinpath(Path(file_path).name) ) else: # Default files in GMXLIB path like gmx_solvate -> input_solvent_gro_path (spc216.gro) container_io_dict["in"][file_ref] = file_path # OUT files assign INTERNAL PATH for file_ref, file_path in io_dict["out"].items(): if file_path: container_io_dict["out"][file_ref] = str( Path(container_volume_path).joinpath(Path(file_path).name) ) return container_io_dict
[docs] def copy_to_host(container_path: str, container_io_dict: dict, io_dict: dict): if not container_path: return # OUT files COPY for file_ref, file_path in container_io_dict["out"].items(): if file_path: container_file_path = str( Path(container_io_dict["unique_dir"]).joinpath(Path(file_path).name) ) if Path(container_file_path).exists(): shutil.copy2(container_file_path, io_dict["out"][file_ref])
[docs] def create_cmd_line( cmd: list[str], container_path: Optional[Union[str, Path]] = "", host_volume: Optional[Union[str, Path]] = None, container_volume: Optional[Union[str, Path]] = None, container_working_dir: Optional[Union[str, Path]] = None, container_user_uid: Optional[str] = None, container_shell_path: Optional[Union[str, Path]] = None, container_image: Optional[Union[str, Path]] = None, out_log: Optional[logging.Logger] = None, global_log: Optional[logging.Logger] = None ) -> list[str]: container_path = container_path or "" if str(container_path).endswith("singularity"): log("Using Singularity image %s" % container_image, out_log, global_log) if not Path(str(container_image)).exists(): log( f"{container_image} does not exist trying to pull it", out_log, global_log, ) container_image_name = str(Path(str(container_image)).with_suffix(".sif").name) singularity_pull_cmd = [ str(container_path), "pull", "--name", str(container_image_name), str(container_image), ] try: from biobb_common.command_wrapper import cmd_wrapper cmd_wrapper.CmdWrapper(cmd=singularity_pull_cmd, out_log=out_log).launch() if Path(container_image_name).exists(): container_image = container_image_name else: raise FileNotFoundError except FileNotFoundError: log(f"{' '.join(singularity_pull_cmd)} not found", out_log, global_log) raise FileNotFoundError singularity_cmd: list[str] = [ str(container_path), "exec", "-e", "--bind", str(host_volume) + ":" + str(container_volume), str(container_image), ] # If we are working on a mac remove -e option because is still no available if platform == "darwin": if "-e" in singularity_cmd: singularity_cmd.remove("-e") cmd = ['"' + " ".join(cmd) + '"'] singularity_cmd.extend([str(container_shell_path), "-c"]) return singularity_cmd + cmd elif str(container_path).endswith("docker"): log("Using Docker image %s" % container_image, out_log, global_log) docker_cmd = [str(container_path), "run"] if container_working_dir: docker_cmd.append("-w") docker_cmd.append(str(container_working_dir)) if container_volume: docker_cmd.append("-v") docker_cmd.append(str(host_volume) + ":" + str(container_volume)) if container_user_uid: docker_cmd.append("--user") docker_cmd.append(container_user_uid) docker_cmd.append(str(container_image)) cmd = ['"' + " ".join(cmd) + '"'] docker_cmd.extend([str(container_shell_path), "-c"]) return docker_cmd + cmd elif str(container_path).endswith("pcocc"): # pcocc run -I racov56:pmx cli.py mutate -h log("Using pcocc image %s" % container_image, out_log, global_log) pcocc_cmd = [str(container_path), "run", "-I", str(container_image)] if container_working_dir: pcocc_cmd.append("--cwd") pcocc_cmd.append(str(container_working_dir)) if container_volume: pcocc_cmd.append("--mount") pcocc_cmd.append(str(host_volume) + ":" + str(container_volume)) if container_user_uid: pcocc_cmd.append("--user") pcocc_cmd.append(container_user_uid) cmd = ['\\"' + " ".join(cmd) + '\\"'] pcocc_cmd.extend([str(container_shell_path), "-c"]) return pcocc_cmd + cmd else: # log('Not using any container', out_log, global_log) return cmd
[docs] def get_doc_dicts(doc: Optional[str]): regex_argument = re.compile( r"(?P<argument>\w*)\ *(?:\()(?P<type>\w*)(?:\)):?\ *(?P<optional>\(\w*\):)?\ *(?P<description>.*?)(?:\.)\ *(?:File type:\ *)(?P<input_output>\w+)\.\ *(\`(?:.+)\<(?P<sample_file>.*?)\>\`\_\.)?\ *(?:Accepted formats:\ *)(?P<formats>.+)(?:\.)?" ) regex_argument_formats = re.compile( r"(?P<extension>\w*)\ *(\(\ *)\ *edam\ *:\ *(?P<edam>\w*)" ) regex_property = re.compile( r"(?:\*\ *\*\*)(?P<property>.*?)(?:\*\*)\ *(?:\(\*)(?P<type>\w*)(?:\*\))\ *\-\ ?(?:\()(?P<default_value>.*?)(?:\))\ *(?:(?:\[)(?P<wf_property>WF property)(?:\]))?\ *(?:(?:\[)(?P<range_start>[\-]?\d+(?:\.\d+)?)\~(?P<range_stop>[\-]?\d+(?:\.\d+)?)(?:\|)?(?P<range_step>\d+(?:\.\d+)?)?(?:\]))?\ *(?:(?:\[)(.*?)(?:\]))?\ *(?P<description>.*)" ) regex_property_value = re.compile( r"(?P<value>\w*)\ *(?:(?:\()(?P<description>.*?)?(?:\)))?" ) doc_lines = list( map(str.strip, filter(lambda line: line.strip(), str(doc).splitlines())) ) args_index = doc_lines.index( next(filter(lambda line: line.lower().startswith("args"), doc_lines)) ) properties_index = doc_lines.index( next(filter(lambda line: line.lower().startswith("properties"), doc_lines)) ) examples_index = doc_lines.index( next(filter(lambda line: line.lower().startswith("examples"), doc_lines)) ) arguments_lines_list = doc_lines[args_index + 1: properties_index] properties_lines_list = doc_lines[properties_index + 1: examples_index] doc_arguments_dict = {} for argument_line in arguments_lines_list: match_argument = regex_argument.match(argument_line) argument_dict = match_argument.groupdict() if match_argument is not None else {} argument_dict["formats"] = { match.group("extension"): match.group("edam") for match in regex_argument_formats.finditer(argument_dict["formats"]) } doc_arguments_dict[argument_dict.pop("argument")] = argument_dict doc_properties_dict = {} for property_line in properties_lines_list: match_property = regex_property.match(property_line) property_dict = match_property.groupdict() if match_property is not None else {} property_dict["values"] = None if "Values:" in property_dict["description"]: property_dict["description"], property_dict["values"] = property_dict[ "description" ].split("Values:") property_dict["values"] = { match.group("value"): match.group("description") for match in regex_property_value.finditer(property_dict["values"]) if match.group("value") } doc_properties_dict[property_dict.pop("property")] = property_dict return doc_arguments_dict, doc_properties_dict
[docs] def check_argument( path: Optional[pathlib.Path], argument: str, optional: bool, module_name: str, input_output: Optional[str] = None, output_files_created: bool = False, type: Optional[str] = None, extension_list: Optional[list[str]] = None, raise_exception: bool = True, check_extensions: bool = True, out_log: Optional[logging.Logger] = None, ) -> None: if optional and not path: return None if input_output in ["in", "input"]: input_file = True elif input_output in ["out", "output"]: input_file = False else: unable_to_determine_string = ( f"{module_name} {argument}: Unable to determine if input or output file." ) log(unable_to_determine_string, out_log) if raise_exception: raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), unable_to_determine_string ) warnings.warn(unable_to_determine_string) if input_file or output_files_created: not_found_error_string = ( f"Path {path} --- {module_name}: Unexisting {argument} file." ) if not Path(str(path)).exists(): log(not_found_error_string, out_log) if raise_exception: raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), not_found_error_string ) warnings.warn(not_found_error_string) # else: # if not path.parent.exists(): # not_found_dir_error_string = f"Path {path.parent} --- {module_name}: Unexisting {argument} directory." # log(not_found_dir_error_string, out_log) # if raise_exception: # raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), not_found_dir_error_string) # warnings.warn(not_found_dir_error_string) if check_extensions and extension_list and type != "dir": no_extension_error_string = f"{module_name} {argument}: {path} has no extension. If you want to suppress this message, please set the check_extensions property to False" if not Path(str(path)).suffix: log(no_extension_error_string) warnings.warn(no_extension_error_string) else: not_valid_extension_error_string = f"{module_name} {argument}: {path} extension is not in the valid extensions list: {extension_list}. If you want to suppress this message, please set the check_extensions property to False" if not Path(str(path)).suffix[1:].lower() in extension_list: log(not_valid_extension_error_string) warnings.warn(not_valid_extension_error_string)
[docs] @contextmanager def change_dir(destination): """Context manager for changing directory.""" cwd = os.getcwd() if not Path(destination).exists(): os.makedirs(destination) try: os.chdir(destination) yield finally: os.chdir(cwd)