Source code for tools.test_fixtures

"""Boiler plate functions for testsys
"""
import os
import pickle
from typing import Optional, Union, Any
from pathlib import Path
import sys
import shutil
import hashlib
from Bio.PDB import Superimposer, PDBParser  # type: ignore
import codecs
from biobb_common.configuration import settings
from biobb_common.tools import file_utils as fu
import numpy as np
import json
import jsonschema


[docs] def test_setup(test_object, dict_key: Optional[str] = None, config: Optional[str] = None): """Add the unitest_dir, test_dir, conf_file_path, properties and path as attributes to the **test_object** and create a directory to launch the unitest. Args: test_object (:obj:`test`): The test object. dict_key (str): Key of the test parameters in the yaml config file. config (str): Path to the configuration file. """ test_object.testfile_dir = str(Path(Path(str(sys.modules[test_object.__module__].__file__)).resolve()).parent) test_object.unitest_dir = str(Path(test_object.testfile_dir).parent) test_object.test_dir = str(Path(test_object.unitest_dir).parent) test_object.data_dir = str(Path(test_object.test_dir).joinpath('data')) test_object.reference_dir = str(Path(test_object.test_dir).joinpath('reference')) if config: test_object.conf_file_path = config else: test_object.conf_file_path = str(Path(test_object.test_dir).joinpath('conf.yml')) conf = settings.ConfReader(test_object.conf_file_path) if dict_key: test_object.properties = conf.get_prop_dic()[dict_key] test_object.paths = {k: v.replace('test_data_dir', test_object.data_dir, 1).replace('test_reference_dir', test_object.reference_dir, 1) for k, v in conf.get_paths_dic()[dict_key].items()} else: test_object.properties = conf.get_prop_dic() test_object.paths = {k: v.replace('test_data_dir', test_object.data_dir, 1).replace('test_reference_dir', test_object.reference_dir, 1) for k, v in conf.get_paths_dic().items()} fu.create_dir(test_object.properties['path']) os.chdir(test_object.properties['path'])
[docs] def test_teardown(test_object): """Remove the **test_object.properties['working_dir_path']** Args: test_object (:obj:`test`): The test object. """ if test_object.properties.get('remove_tmp', True): unitests_path = Path(test_object.properties['path']).resolve().parent print(f"\nRemoving: {unitests_path}") shutil.rmtree(unitests_path)
[docs] def exe_success(return_code: int) -> bool: """Check if **return_code** is 0 Args: return_code (int): Return code of a process. Returns: bool: True if return code is equal to 0 """ return return_code == 0
[docs] def not_empty(file_path: str) -> bool: """Check if file exists and is not empty. Args: file_path (str): Path to the file. Returns: bool: True if **file_path** exists and is not empty. """ if file_path.endswith('.zip'): print("Checking if empty zip: "+file_path) dst = file_path[:-4] + '_unzipped' os.makedirs(dst, exist_ok=True) # Extract zip and get list of files unzipped_files = fu.unzip_list(file_path, dest_dir=dst) # Check if there are any files in the zip return len(unzipped_files) > 0 elif Path(file_path).is_dir(): print("Checking if empty dir: "+file_path) return len(os.listdir(file_path)) > 0 print("Checking if empty file: "+file_path) return Path(file_path).is_file() and Path(file_path).stat().st_size > 0
[docs] def compare_hash(file_a: str, file_b: str) -> bool: """Compute and compare the hashes of two files""" print("Comparing: ") print(" File_A: "+file_a) print(" File_B: "+file_b) file_a_hash = hashlib.sha256(open(file_a, 'rb').read()).digest() file_b_hash = hashlib.sha256(open(file_b, 'rb').read()).digest() print(" File_A hash: "+str(file_a_hash)) print(" File_B hash: "+str(file_b_hash)) return file_a_hash == file_b_hash
[docs] def equal(file_a: str, file_b: str, ignore_list: Optional[list[Union[str, int]]] = None, **kwargs) -> bool: """Check if two files are equal""" if ignore_list: # Line by line comparison return compare_line_by_line(file_a, file_b, ignore_list) if file_a.endswith(".zip") and file_b.endswith(".zip"): return compare_zip(file_a, file_b) if file_a.endswith(".pdb") and file_b.endswith(".pdb"): return compare_pdb(file_a, file_b, **kwargs) if file_a.endswith(".top") and file_b.endswith(".top"): return compare_top_itp(file_a, file_b) if file_a.endswith(".itp") and file_b.endswith(".itp"): return compare_top_itp(file_a, file_b) if file_a.endswith(".gro") and file_b.endswith(".gro"): return compare_ignore_first(file_a, file_b) if file_a.endswith(".prmtop") and file_b.endswith(".prmtop"): return compare_ignore_first(file_a, file_b) if file_a.endswith(".inp") and file_b.endswith(".inp"): return compare_ignore_first(file_a, file_b) if file_a.endswith(".par") and file_b.endswith(".par"): return compare_ignore_first(file_a, file_b) if file_a.endswith((".nc", ".netcdf", ".xtc")) and file_b.endswith((".nc", ".netcdf", ".xtc")): return compare_size(file_a, file_b, kwargs.get('percent_tolerance', 1.0)) if file_a.endswith(".xvg") and file_b.endswith(".xvg"): return compare_xvg(file_a, file_b, kwargs.get('percent_tolerance', 1.0)) image_extensions = ('.png', '.jfif', '.ppm', '.tiff', '.jpg', '.dib', '.pgm', '.bmp', '.jpeg', '.pbm', '.jpe', '.apng', '.pnm', '.gif', '.tif') if file_a.endswith(image_extensions) and file_b.endswith(image_extensions): return compare_images(file_a, file_b, kwargs.get('percent_tolerance', 1.0)) return compare_hash(file_a, file_b)
[docs] def compare_line_by_line(file_a: str, file_b: str, ignore_list: list[Union[str, int]]) -> bool: print(f"Comparing ignoring lines containing this words: {ignore_list}") print(" FILE_A: "+file_a) print(" FILE_B: "+file_b) with open(file_a) as fa, open(file_b) as fb: for index, (line_a, line_b) in enumerate(zip(fa, fb)): if index in ignore_list or any(word in line_a for word in ignore_list if isinstance(word, str)): continue elif line_a != line_b: return False return True
[docs] def equal_txt(file_a: str, file_b: str) -> bool: """Check if two text files are equal""" return compare_hash(file_a, file_b)
[docs] def compare_zip(zip_a: str, zip_b: str) -> bool: """ Compare zip files """ print("This is a ZIP comparison!") print("Unzipping:") print("Creating a unique_dir for: %s" % zip_a) zip_a_dir = fu.create_unique_dir() zip_a_list = fu.unzip_list(zip_a, dest_dir=zip_a_dir) print("Creating a unique_dir for: %s" % zip_b) zip_b_dir = fu.create_unique_dir() zip_b_list = fu.unzip_list(zip_b, dest_dir=zip_b_dir) if not len(zip_a_list) == len(zip_b_list): return False for uncompressed_zip_a in zip_a_list: uncompressed_zip_b = str(Path(zip_b_dir).joinpath(Path(uncompressed_zip_a).name)) if not equal(uncompressed_zip_a, uncompressed_zip_b): return False return True
[docs] def compare_pdb(pdb_a: str, pdb_b: str, rmsd_cutoff: int = 1, remove_hetatm: bool = True, remove_hydrogen: bool = True, **kwargs): """ Compare pdb files """ print("Checking RMSD between:") print(" PDB_A: "+pdb_a) print(" PDB_B: "+pdb_b) pdb_parser = PDBParser(PERMISSIVE=True, QUIET=True) st_a = pdb_parser.get_structure("st_a", pdb_a) st_b = pdb_parser.get_structure("st_b", pdb_b) if st_a is None or st_b is None: print(" One of the PDB structures could not be parsed.") return False st_a = st_a[0] st_b = st_b[0] if remove_hetatm: print(" Ignoring HETAMT in RMSD") residues_a = [list(res.get_atoms()) for res in st_a.get_residues() if not res.id[0].startswith('H_')] residues_b = [list(res.get_atoms()) for res in st_b.get_residues() if not res.id[0].startswith('H_')] atoms_a = [atom for residue in residues_a for atom in residue] atoms_b = [atom for residue in residues_b for atom in residue] else: atoms_a = st_a.get_atoms() atoms_b = st_b.get_atoms() if remove_hydrogen: print(" Ignoring Hydrogen atoms in RMSD") atoms_a = [atom for atom in atoms_a if not atom.get_name().startswith('H')] atoms_b = [atom for atom in atoms_b if not atom.get_name().startswith('H')] atoms_a_list = list(atoms_a) atoms_b_list = list(atoms_b) print(" Atoms ALIGNED in PDB_A: "+str(len(atoms_a_list))) print(" Atoms ALIGNED in PDB_B: "+str(len(atoms_b_list))) super_imposer = Superimposer() super_imposer.set_atoms(atoms_a, atoms_b) super_imposer.apply(atoms_b) super_imposer_rms = super_imposer.rms if super_imposer.rms is not None else float('inf') print(' RMS: '+str(super_imposer_rms)) print(' RMS_CUTOFF: '+str(rmsd_cutoff)) return super_imposer_rms < rmsd_cutoff
[docs] def compare_top_itp(file_a: str, file_b: str) -> bool: """ Compare top/itp files """ print("Comparing TOP/ITP:") print(" FILE_A: "+file_a) print(" FILE_B: "+file_b) with codecs.open(file_a, 'r', encoding='utf-8', errors='ignore') as f_a: next(f_a) with codecs.open(file_b, 'r', encoding='utf-8', errors='ignore') as f_b: next(f_b) return [line.strip() for line in f_a if not line.strip().startswith(';')] == [line.strip() for line in f_b if not line.strip().startswith(';')]
[docs] def compare_ignore_first(file_a: str, file_b: str) -> bool: """ Compare two files ignoring the first line """ print("Comparing ignoring first line of both files:") print(" FILE_A: "+file_a) print(" FILE_B: "+file_b) with open(file_a) as f_a: next(f_a) with open(file_b) as f_b: next(f_b) return [line.strip() for line in f_a] == [line.strip() for line in f_b]
[docs] def compare_size(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool: """ Compare two files using size """ print("Comparing size of both files:") print(f" FILE_A: {file_a}") print(f" FILE_B: {file_b}") size_a = Path(file_a).stat().st_size size_b = Path(file_b).stat().st_size average_size = (size_a + size_b) / 2 tolerance = average_size * percent_tolerance / 100 tolerance_low = average_size - tolerance tolerance_high = average_size + tolerance print(f" SIZE_A: {size_a} bytes") print(f" SIZE_B: {size_b} bytes") print(f" TOLERANCE: {percent_tolerance}%, Low: {tolerance_low} bytes, High: {tolerance_high} bytes") return (tolerance_low <= size_a <= tolerance_high) and (tolerance_low <= size_b <= tolerance_high)
[docs] def compare_xvg(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool: """ Compare two files using size """ print("Comparing size of both files:") print(f" FILE_A: {file_a}") print(f" FILE_B: {file_b}") arrays_tuple_a = np.loadtxt(file_a, comments=["@", '#'], unpack=True) arrays_tuple_b = np.loadtxt(file_b, comments=["@", '#'], unpack=True) for array_a, array_b in zip(arrays_tuple_a, arrays_tuple_b): if not np.allclose(array_a, array_b, rtol=percent_tolerance / 100): return False return True
[docs] def compare_images(file_a: str, file_b: str, percent_tolerance: float = 1.0) -> bool: try: from PIL import Image # type: ignore import imagehash except ImportError: print("To compare images, please install the following packages: Pillow, imagehash") return False """ Compare two files using size """ print("Comparing images of both files:") print(f" IMAGE_A: {file_a}") print(f" IMAGE_B: {file_b}") hash_a = imagehash.average_hash(Image.open(file_a)) hash_b = imagehash.average_hash(Image.open(file_b)) tolerance = (len(hash_a) + len(hash_b)) / 2 * percent_tolerance / 100 if tolerance < 1: tolerance = 1 difference = hash_a - hash_b print(f" IMAGE_A HASH: {hash_a} SIZE: {len(hash_a)} bits") print(f" IMAGE_B HASH: {hash_b} SIZE: {len(hash_b)} bits") print(f" TOLERANCE: {percent_tolerance}%, ABS TOLERANCE: {tolerance} bits, DIFFERENCE: {difference} bits") if difference > tolerance: return False return True
[docs] def compare_object_pickle(python_object: Any, pickle_file_path: Union[str, Path], **kwargs) -> bool: """ Compare a python object with a pickle file """ print(f"Loading pickle file: {pickle_file_path}") with open(pickle_file_path, 'rb') as f: pickle_object = pickle.load(f) # Special case for dictionaries if isinstance(python_object, dict) and isinstance(pickle_object, dict): differences = compare_dictionaries(python_object, pickle_object, ignore_keys=kwargs.get('ignore_keys', []), compare_values=kwargs.get('compare_values', True), ignore_substring=kwargs.get('ignore_substring', "")) if differences: print(50*'*') print("OBJECT:") print(python_object) print(50*'*') print() print(50*'*') print("EXPECTED OBJECT:") print(pickle_object) print(50*'*') print("Differences found:") for difference in differences: print(f" {difference}") return False return True return python_object == pickle_object
[docs] def compare_dictionaries(dict1: dict, dict2: dict, path: str = "", ignore_keys: Optional[list[str]] = None, compare_values: bool = True, ignore_substring: str = "") -> list[str]: """Compare two dictionaries and print only the differences, ignoring specified keys.""" if ignore_keys is None: ignore_keys = [] differences = [] # Get all keys from both dictionaries all_keys = set(dict1.keys()).union(set(dict2.keys())) for key in all_keys: if key in ignore_keys: continue if key not in dict1: differences.append(f"Key '{path + key}' found in dict2 but not in dict1") elif key not in dict2: differences.append(f"Key '{path + key}' found in dict1 but not in dict2") else: value1 = dict1[key] value2 = dict2[key] if isinstance(value1, dict) and isinstance(value2, dict): # Recursively compare nested dictionaries nested_differences = compare_dictionaries(value1, value2, path + key + ".", ignore_keys, compare_values, ignore_substring) differences.extend(nested_differences) elif (value1 != value2) and compare_values: if ignore_substring: if (not str(value1).endswith(str(value2).replace(ignore_substring, ""))) and (not str(value2).endswith(str(value1).replace(ignore_substring, ""))): differences.append(f"Difference at '{path + key}': dict1 has {value1}, dict2 has {value2}") else: differences.append(f"Difference at '{path + key}': dict1 has {value1}, dict2 has {value2}") return differences
[docs] def validate_json(json_file_path: Union[str, Path], json_schema_path: Union[str, Path]) -> bool: """ Validates a JSON file against a provided JSON schema. Args: json_file_path (str): Path to the JSON file to validate. json_schema_path (str): Path to the JSON schema file. Returns: bool: True if the JSON is valid, False if invalid. """ print("Validating JSON file:") print(f" JSON file: {json_file_path}") print(f" JSON schema: {json_schema_path}") try: # Load the JSON file with open(json_file_path, 'r') as json_file: json_data = json.load(json_file) # Load the JSON schema with open(json_schema_path, 'r') as schema_file: schema = json.load(schema_file) # Validate the JSON data against the schema jsonschema.validate(instance=json_data, schema=schema) return True except jsonschema.ValidationError as ve: print(f"Validation error: {ve.message}") return False except json.JSONDecodeError as je: print(f"Invalid JSON format: {je.msg}") return False